Skip to main content

omnigraph/
table_store.rs

1use arrow_array::{
2    Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
3};
4use arrow_schema::SchemaRef;
5use futures::TryStreamExt;
6use lance::Dataset;
7use lance::blob::BlobArrayBuilder;
8use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
9use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
10use lance::dataset::write::merge_insert::SourceDedupeBehavior;
11use lance::dataset::{
12    CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
13    WriteParams,
14};
15use lance::datatypes::{BlobKind, Schema as LanceSchema};
16use lance::index::DatasetIndexExt;
17use lance::index::scalar::IndexDetails;
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
20use lance_index::{IndexType, is_system_index};
21use lance_linalg::distance::MetricType;
22use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
23use lance_table::rowids::{RowIdSequence, write_row_ids};
24use std::sync::Arc;
25
26use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
27use crate::db::{Snapshot, SubTableEntry};
28use crate::error::{OmniError, Result};
29use crate::storage_layer::ForkOutcome;
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct TableState {
33    pub version: u64,
34    pub row_count: u64,
35    pub(crate) version_metadata: TableVersionMetadata,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct DeleteState {
40    pub version: u64,
41    pub row_count: u64,
42    pub deleted_rows: usize,
43    pub(crate) version_metadata: TableVersionMetadata,
44}
45
46/// Whether a `key_col IN (...)` scan on a dataset will be served by the
47/// persisted scalar (BTREE) index, or silently fall back to a full filtered
48/// scan. Detection-only (metadata, no IO); the scan returns the correct rows
49/// either way. Surfaced by the indexed traversal path so the silent perf
50/// fallback is observable, and available to a future cost-based planner.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum IndexCoverage {
53    /// The column has a usable BTREE and every fragment records `physical_rows`.
54    Indexed,
55    /// Lance will not use the scalar index for this scan (correct, full scan).
56    Degraded { reason: String },
57}
58
59/// A Lance write that has produced fragment files on object storage but is
60/// not yet committed to the dataset's manifest. The staged-write primitives
61/// are consumed by `MutationStaging` (`exec/staging.rs`,
62/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
63/// intent: defer Lance commits to end-of-query so a mid-query failure
64/// leaves the touched table at the pre-mutation HEAD instead of
65/// drifting ahead. See `docs/dev/writes.md` for the publisher-CAS contract
66/// this builds on.
67///
68/// `transaction` is opaque from our side — Lance owns its semantics. We
69/// commit it via `CommitBuilder::execute(transaction)` (see
70/// `TableStore::commit_staged`).
71///
72/// For read-your-writes within the same query, `new_fragments` and
73/// `removed_fragment_ids` together describe the post-stage view delta:
74/// `scan_with_staged` (and `count_rows_with_staged`) compose
75/// `committed - removed + new` so subsequent reads see the staged result
76/// without double-counting fragments that `Operation::Update` rewrote.
77/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
78/// existing fragments would yield duplicate rows (the original fragment
79/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
80#[derive(Debug, Clone)]
81pub struct StagedWrite {
82    pub transaction: Transaction,
83    /// Fragments to surface alongside the committed manifest in
84    /// `Scanner::with_fragments(committed - removed + new)`. For
85    /// `Operation::Append` these are the freshly-appended fragments. For
86    /// `Operation::Update` (merge_insert) these are
87    /// `updated_fragments + new_fragments` (rewrites + freshly-inserted
88    /// rows).
89    pub new_fragments: Vec<Fragment>,
90    /// Fragment IDs that this staged write supersedes. The committed
91    /// manifest must filter these out before being combined with
92    /// `new_fragments` for read-your-writes scans, otherwise rewrites
93    /// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
94    /// adds without removing anything); populated from
95    /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
96    pub removed_fragment_ids: Vec<u64>,
97}
98
99#[derive(Debug, Clone)]
100pub struct TableStore {
101    root_uri: String,
102}
103
104impl TableStore {
105    pub fn new(root_uri: &str) -> Self {
106        Self {
107            root_uri: root_uri.trim_end_matches('/').to_string(),
108        }
109    }
110
111    pub fn root_uri(&self) -> &str {
112        &self.root_uri
113    }
114
115    pub fn dataset_uri(&self, table_path: &str) -> String {
116        format!("{}/{}", self.root_uri, table_path)
117    }
118
119    fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
120        let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
121        let table_path = dataset_uri
122            .strip_prefix(&prefix)
123            .map(|path| path.to_string())
124            .ok_or_else(|| {
125                OmniError::manifest_internal(format!(
126                    "dataset uri '{}' is not under root '{}'",
127                    dataset_uri, self.root_uri
128                ))
129            })?;
130        Ok(table_path
131            .split_once("/tree/")
132            .map(|(path, _)| path.to_string())
133            .unwrap_or(table_path))
134    }
135
136    fn dataset_version_metadata(
137        &self,
138        dataset_uri: &str,
139        ds: &Dataset,
140    ) -> Result<TableVersionMetadata> {
141        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
142        TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
143    }
144
145    pub async fn open_snapshot_table(
146        &self,
147        snapshot: &Snapshot,
148        table_key: &str,
149    ) -> Result<Dataset> {
150        snapshot.open(table_key).await
151    }
152
153    pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
154        entry.open(&self.root_uri).await
155    }
156
157    pub async fn open_dataset_head(
158        &self,
159        dataset_uri: &str,
160        branch: Option<&str>,
161    ) -> Result<Dataset> {
162        let ds = Dataset::open(dataset_uri)
163            .await
164            .map_err(|e| OmniError::Lance(e.to_string()))?;
165        match branch {
166            Some(branch) if branch != "main" => ds
167                .checkout_branch(branch)
168                .await
169                .map_err(|e| OmniError::Lance(e.to_string())),
170            _ => Ok(ds),
171        }
172    }
173
174    pub async fn open_dataset_head_for_write(
175        &self,
176        table_key: &str,
177        dataset_uri: &str,
178        branch: Option<&str>,
179    ) -> Result<Dataset> {
180        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
181        open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
182    }
183
184    pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
185        let mut ds = Dataset::open(dataset_uri)
186            .await
187            .map_err(|e| OmniError::Lance(e.to_string()))?;
188        ds.delete_branch(branch)
189            .await
190            .map_err(|e| OmniError::Lance(e.to_string()))
191    }
192
193    /// List the named Lance branches present on the dataset at `dataset_uri`.
194    /// The `cleanup` orphan reconciler diffs this against the manifest branch
195    /// set to find orphaned per-table forks. `main`/default is not a named
196    /// branch and never appears here.
197    pub async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
198        let ds = Dataset::open(dataset_uri)
199            .await
200            .map_err(|e| OmniError::Lance(e.to_string()))?;
201        let branches = ds
202            .list_branches()
203            .await
204            .map_err(|e| OmniError::Lance(e.to_string()))?;
205        Ok(branches.into_keys().collect())
206    }
207
208    /// Idempotently drop `branch` from the dataset at `dataset_uri`.
209    ///
210    /// Unlike [`delete_branch`](Self::delete_branch), this tolerates an
211    /// already-absent branch — both a missing contents ref (Lance's
212    /// `force_delete_branch` handles that) and a missing `tree/{branch}/`
213    /// directory (the local-store `NotFound` quirk pinned by
214    /// `lance_surface_guards::force_delete_branch_semantics`). Safe to call on a
215    /// possibly-orphaned or already-reclaimed fork.
216    ///
217    /// A branch that still has referencing descendants (`RefConflict`) is NOT
218    /// tolerated: that is a real ordering error and surfaces as `OmniError::Lance`.
219    /// Used by the eager best-effort reclaim in `cleanup_deleted_branch_tables`
220    /// and the `cleanup` orphan reconciler.
221    pub async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
222        let mut ds = Dataset::open(dataset_uri)
223            .await
224            .map_err(|e| OmniError::Lance(e.to_string()))?;
225        match ds.force_delete_branch(branch).await {
226            Ok(()) => Ok(()),
227            Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => Ok(()),
228            Err(e) => Err(OmniError::Lance(e.to_string())),
229        }
230    }
231
232    pub async fn open_dataset_at_state(
233        &self,
234        table_path: &str,
235        branch: Option<&str>,
236        version: u64,
237    ) -> Result<Dataset> {
238        let ds = self
239            .open_dataset_head(&self.dataset_uri(table_path), branch)
240            .await?;
241        ds.checkout_version(version)
242            .await
243            .map_err(|e| OmniError::Lance(e.to_string()))
244    }
245
246    pub fn ensure_expected_version(
247        &self,
248        ds: &Dataset,
249        table_key: &str,
250        expected_version: u64,
251    ) -> Result<()> {
252        let actual = ds.version().version;
253        if actual != expected_version {
254            // Use the structured ExpectedVersionMismatch variant so callers
255            // (and the HTTP server) can match on details rather than parsing
256            // the message. This drift is a publisher-style OCC failure: the
257            // caller's pre-write view of the table version is stale relative
258            // to the on-disk Lance head.
259            return Err(OmniError::manifest_expected_version_mismatch(
260                table_key,
261                expected_version,
262                actual,
263            ));
264        }
265        Ok(())
266    }
267
268    pub async fn reopen_for_mutation(
269        &self,
270        dataset_uri: &str,
271        branch: Option<&str>,
272        table_key: &str,
273        expected_version: u64,
274    ) -> Result<Dataset> {
275        let ds = self
276            .open_dataset_head_for_write(table_key, dataset_uri, branch)
277            .await?;
278        self.ensure_expected_version(&ds, table_key, expected_version)?;
279        Ok(ds)
280    }
281
282    pub async fn fork_branch_from_state(
283        &self,
284        dataset_uri: &str,
285        source_branch: Option<&str>,
286        table_key: &str,
287        source_version: u64,
288        target_branch: &str,
289    ) -> Result<ForkOutcome<Dataset>> {
290        let mut source_ds = self
291            .open_dataset_head(dataset_uri, source_branch)
292            .await?
293            .checkout_version(source_version)
294            .await
295            .map_err(|e| OmniError::Lance(e.to_string()))?;
296        self.ensure_expected_version(&source_ds, table_key, source_version)?;
297
298        if let Err(create_err) = source_ds
299            .create_branch(target_branch, source_version, None)
300            .await
301        {
302            // Disambiguate the failure: only a genuinely pre-existing ref is a
303            // reclaim candidate. Mapping EVERY create_branch failure to
304            // `RefAlreadyExists` would route a transient I/O / version / Lance
305            // internal error into the destructive reclaim path. So check whether
306            // the ref actually exists; if not, the failure is real — propagate
307            // it (preserving error fidelity) rather than force-deleting.
308            //
309            // `list_branches` reads `_refs/branches/` from the store, so it sees
310            // a fully-formed manifest-unreferenced fork (our common case — a
311            // create_branch that completed but whose manifest publish did not).
312            // It does NOT see a phase-1-only Lance "zombie" (tree dir written,
313            // no BranchContents) — but neither does `cleanup`'s reconciler, also
314            // list_branches-based. A zombie only forms if create_branch is
315            // interrupted *between its two internal phases* (a far narrower
316            // window than the manifest-publish gap), and it surfaces here as the
317            // propagated create error requiring manual reclaim. We deliberately
318            // do NOT force-delete on a not-found-ref failure: it is
319            // indistinguishable from a transient error on a fresh create, and
320            // force-deleting there is the destructive overreach this guard
321            // removes. The caller holds the per-(table, branch) write queue, so
322            // no in-process writer races this fork; a cross-process create
323            // between our check and now is the documented one-winner-CAS gap and
324            // propagates as a retryable error.
325            let ref_exists = source_ds
326                .list_branches()
327                .await
328                .map(|b| b.contains_key(target_branch))
329                .unwrap_or(false);
330            if ref_exists {
331                return Ok(ForkOutcome::RefAlreadyExists);
332            }
333            return Err(OmniError::Lance(create_err.to_string()));
334        }
335
336        let ds = self
337            .open_dataset_head(dataset_uri, Some(target_branch))
338            .await?;
339        self.ensure_expected_version(&ds, table_key, source_version)?;
340        Ok(ForkOutcome::Created(ds))
341    }
342
343    pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
344        self.scan(ds, None, None, None).await
345    }
346
347    pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
348        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
349        if !has_blob_columns {
350            return self.scan_batches(ds).await;
351        }
352
353        let batches = Self::scan_stream(ds, None, None, None, true)
354            .await?
355            .try_collect::<Vec<RecordBatch>>()
356            .await
357            .map_err(|e| OmniError::Lance(e.to_string()))?;
358        let mut materialized = Vec::with_capacity(batches.len());
359        for batch in batches {
360            materialized.push(Self::materialize_blob_batch(ds, batch).await?);
361        }
362        Ok(materialized)
363    }
364
365    pub(crate) async fn materialize_blob_batch(
366        ds: &Dataset,
367        batch: RecordBatch,
368    ) -> Result<RecordBatch> {
369        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
370        if !has_blob_columns {
371            return Ok(batch);
372        }
373
374        let row_ids = batch
375            .column_by_name("_rowid")
376            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
377            .ok_or_else(|| {
378                OmniError::Lance("expected _rowid column when materializing blobs".to_string())
379            })?
380            .values()
381            .iter()
382            .copied()
383            .collect::<Vec<_>>();
384
385        let schema: SchemaRef = Arc::new(ds.schema().into());
386        let mut columns = Vec::with_capacity(schema.fields().len());
387        for field in schema.fields() {
388            let lance_field = lance::datatypes::Field::try_from(field.as_ref())
389                .map_err(|e| OmniError::Lance(e.to_string()))?;
390            let column = batch.column_by_name(field.name()).ok_or_else(|| {
391                OmniError::Lance(format!("batch missing column '{}'", field.name()))
392            })?;
393            if lance_field.is_blob() {
394                let descriptions =
395                    column
396                        .as_any()
397                        .downcast_ref::<StructArray>()
398                        .ok_or_else(|| {
399                            OmniError::Lance(format!(
400                                "expected blob descriptions for '{}'",
401                                field.name()
402                            ))
403                        })?;
404                columns.push(
405                    Self::rebuild_blob_column(ds, field.name(), descriptions, &row_ids).await?,
406                );
407            } else {
408                columns.push(column.clone());
409            }
410        }
411
412        RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
413    }
414
415    async fn rebuild_blob_column(
416        ds: &Dataset,
417        column_name: &str,
418        descriptions: &StructArray,
419        row_ids: &[u64],
420    ) -> Result<ArrayRef> {
421        let mut builder = BlobArrayBuilder::new(row_ids.len());
422        let mut non_null_row_ids = Vec::new();
423        let mut row_has_blob = Vec::with_capacity(row_ids.len());
424
425        for row in 0..row_ids.len() {
426            let is_null = Self::blob_description_is_null(descriptions, row)?;
427            row_has_blob.push(!is_null);
428            if !is_null {
429                non_null_row_ids.push(row_ids[row]);
430            }
431        }
432
433        let blob_files = if non_null_row_ids.is_empty() {
434            Vec::new()
435        } else {
436            Arc::new(ds.clone())
437                .take_blobs(&non_null_row_ids, column_name)
438                .await
439                .map_err(|e| OmniError::Lance(e.to_string()))?
440        };
441
442        let mut files = blob_files.into_iter();
443        for has_blob in row_has_blob {
444            if !has_blob {
445                builder
446                    .push_null()
447                    .map_err(|e| OmniError::Lance(e.to_string()))?;
448                continue;
449            }
450
451            let blob = files.next().ok_or_else(|| {
452                OmniError::Lance(format!(
453                    "blob rewrite for '{}' lost alignment with source rows",
454                    column_name
455                ))
456            })?;
457            builder
458                .push_bytes(
459                    blob.read()
460                        .await
461                        .map_err(|e| OmniError::Lance(e.to_string()))?,
462                )
463                .map_err(|e| OmniError::Lance(e.to_string()))?;
464        }
465
466        if files.next().is_some() {
467            return Err(OmniError::Lance(format!(
468                "blob rewrite for '{}' produced extra source blobs",
469                column_name
470            )));
471        }
472
473        builder
474            .finish()
475            .map_err(|e| OmniError::Lance(e.to_string()))
476    }
477
478    fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
479        if descriptions.is_null(row) {
480            return Ok(true);
481        }
482
483        let position = descriptions
484            .column_by_name("position")
485            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
486            .ok_or_else(|| {
487                OmniError::Lance(format!(
488                    "unrecognized blob description schema {:?}: missing UInt64 position field",
489                    descriptions.fields()
490                ))
491            })?;
492        let size = descriptions
493            .column_by_name("size")
494            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
495            .ok_or_else(|| {
496                OmniError::Lance(format!(
497                    "unrecognized blob description schema {:?}: missing UInt64 size field",
498                    descriptions.fields()
499                ))
500            })?;
501
502        let Some(kind_column) = descriptions.column_by_name("kind") else {
503            return Ok(position.is_null(row) || size.is_null(row));
504        };
505        let kind = if let Some(kind) = kind_column.as_any().downcast_ref::<UInt8Array>() {
506            if kind.is_null(row) {
507                return Ok(true);
508            }
509            kind.value(row)
510        } else if let Some(kind) = kind_column.as_any().downcast_ref::<UInt32Array>() {
511            if kind.is_null(row) {
512                return Ok(true);
513            }
514            kind.value(row) as u8
515        } else {
516            return Err(OmniError::Lance(format!(
517                "unrecognized blob description schema {:?}: kind field must be UInt8 or UInt32",
518                descriptions.fields()
519            )));
520        };
521
522        let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
523        if kind != BlobKind::Inline {
524            return Ok(false);
525        }
526        let blob_uri = descriptions
527            .column_by_name("blob_uri")
528            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
529            .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
530
531        Ok((position.is_null(row) || position.value(row) == 0)
532            && (size.is_null(row) || size.value(row) == 0)
533            && blob_uri.unwrap_or("").is_empty())
534    }
535
536    pub async fn scan_stream(
537        ds: &Dataset,
538        projection: Option<&[&str]>,
539        filter: Option<&str>,
540        order_by: Option<Vec<ColumnOrdering>>,
541        with_row_id: bool,
542    ) -> Result<DatasetRecordBatchStream> {
543        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
544    }
545
546    pub async fn scan_stream_with<F>(
547        ds: &Dataset,
548        projection: Option<&[&str]>,
549        filter: Option<&str>,
550        order_by: Option<Vec<ColumnOrdering>>,
551        with_row_id: bool,
552        configure: F,
553    ) -> Result<DatasetRecordBatchStream>
554    where
555        F: FnOnce(&mut Scanner) -> Result<()>,
556    {
557        let mut scanner = ds.scan();
558        if with_row_id {
559            scanner.with_row_id();
560        }
561        if let Some(columns) = projection {
562            scanner
563                .project(columns)
564                .map_err(|e| OmniError::Lance(e.to_string()))?;
565        }
566        if let Some(filter_sql) = filter {
567            scanner
568                .filter(filter_sql)
569                .map_err(|e| OmniError::Lance(e.to_string()))?;
570        }
571        if let Some(ordering) = order_by {
572            scanner
573                .order_by(Some(ordering))
574                .map_err(|e| OmniError::Lance(e.to_string()))?;
575        }
576        configure(&mut scanner)?;
577        scanner
578            .try_into_stream()
579            .await
580            .map_err(|e| OmniError::Lance(e.to_string()))
581    }
582
583    pub async fn scan(
584        &self,
585        ds: &Dataset,
586        projection: Option<&[&str]>,
587        filter: Option<&str>,
588        order_by: Option<Vec<ColumnOrdering>>,
589    ) -> Result<Vec<RecordBatch>> {
590        Self::scan_stream(ds, projection, filter, order_by, false)
591            .await?
592            .try_collect()
593            .await
594            .map_err(|e| OmniError::Lance(e.to_string()))
595    }
596
597    pub async fn scan_with<F>(
598        &self,
599        ds: &Dataset,
600        projection: Option<&[&str]>,
601        filter: Option<&str>,
602        order_by: Option<Vec<ColumnOrdering>>,
603        with_row_id: bool,
604        configure: F,
605    ) -> Result<Vec<RecordBatch>>
606    where
607        F: FnOnce(&mut Scanner) -> Result<()>,
608    {
609        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
610            .await?
611            .try_collect()
612            .await
613            .map_err(|e| OmniError::Lance(e.to_string()))
614    }
615
616    /// Indexed neighbor lookup for graph traversal. Given an edge dataset and a
617    /// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for
618    /// in-traversal), return the matching edge rows projected to
619    /// `[key_col, opposite_col]`.
620    ///
621    /// The `key_col IN (keys)` predicate is built as a structured DataFusion
622    /// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through
623    /// the persisted BTREE on `key_col` (index-search → take). Cost scales with
624    /// the frontier size, not |E| — the basis for serving selective traversals
625    /// without building the whole in-memory CSR. Empty `keys` returns empty
626    /// without scanning.
627    ///
628    /// Note: like any indexed scan, this observes only fragments the BTREE
629    /// covers plus an unindexed-fragment scan fallback; it reads the committed
630    /// snapshot `ds` was opened at.
631    pub async fn scan_edges_by_endpoint(
632        ds: &Dataset,
633        key_col: &str,
634        opposite_col: &str,
635        keys: &[String],
636    ) -> Result<Vec<RecordBatch>> {
637        use datafusion::prelude::{col, lit};
638
639        if keys.is_empty() {
640            return Ok(Vec::new());
641        }
642        let key_list: Vec<datafusion::prelude::Expr> =
643            keys.iter().map(|k| lit(k.clone())).collect();
644        let filter_expr = col(key_col).in_list(key_list, false);
645        Self::scan_stream_with(
646            ds,
647            Some(&[key_col, opposite_col]),
648            None,
649            None,
650            false,
651            |scanner| {
652                scanner.filter_expr(filter_expr);
653                Ok(())
654            },
655        )
656        .await?
657        .try_collect()
658        .await
659        .map_err(|e| OmniError::Lance(e.to_string()))
660    }
661
662    /// Metadata-only check (no IO) of whether `scan_edges_by_endpoint` — a
663    /// `key_col IN (...)` filter — on `ds` will be served by the persisted BTREE
664    /// on `column`, or silently fall back to a full filtered scan. Mirrors
665    /// Lance's own decision: scalar indices are disabled for the whole scan if
666    /// ANY fragment lacks `physical_rows` (lance `dataset/scanner.rs`
667    /// `create_filter_plan`), and are obviously unused if no BTREE on the
668    /// column exists. The scan is correct (returns all rows) either way — this
669    /// only surfaces the perf cliff so the indexed traversal can warn on it.
670    pub async fn key_column_index_coverage(ds: &Dataset, column: &str) -> Result<IndexCoverage> {
671        let Some(field_id) = ds.schema().field(column).map(|field| field.id) else {
672            return Ok(IndexCoverage::Degraded {
673                reason: format!("column '{}' not in schema", column),
674            });
675        };
676        let indices = ds
677            .load_indices()
678            .await
679            .map_err(|e| OmniError::Lance(e.to_string()))?;
680        let btree = indices
681            .iter()
682            .filter(|index| !is_system_index(index))
683            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
684            .find(|index| {
685                index
686                    .index_details
687                    .as_ref()
688                    .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
689                    .unwrap_or(false)
690            });
691        let Some(btree) = btree else {
692            return Ok(IndexCoverage::Degraded {
693                reason: format!("no BTREE index on '{}'", column),
694            });
695        };
696        // Same check Lance runs: a fragment missing physical_rows disables
697        // scalar indices for the entire scan (all-or-nothing).
698        if ds.fragments().iter().any(|f| f.physical_rows.is_none()) {
699            return Ok(IndexCoverage::Degraded {
700                reason: "a fragment is missing physical_rows".to_string(),
701            });
702        }
703        // An index only covers the fragments it was built over; fragments
704        // appended afterward (edge-index creation is skipped once a BTREE exists)
705        // are scanned unindexed. If any CURRENT fragment is absent from the
706        // index's `fragment_bitmap`, the scan is partly a full scan — so the
707        // chooser must not price it as fully indexed. A `None` bitmap means Lance
708        // can't report coverage; don't over-degrade in that case.
709        if let Some(bitmap) = btree.fragment_bitmap.as_ref() {
710            let uncovered = ds
711                .fragments()
712                .iter()
713                .filter(|f| !bitmap.contains(f.id as u32))
714                .count();
715            if uncovered > 0 {
716                return Ok(IndexCoverage::Degraded {
717                    reason: format!(
718                        "{} fragment(s) not covered by the index on '{}'",
719                        uncovered, column
720                    ),
721                });
722            }
723        }
724        Ok(IndexCoverage::Indexed)
725    }
726
727    /// True if any non-system index on `ds` leaves at least one current
728    /// fragment uncovered, i.e. rows that the index does not yet account for
729    /// (appended after the index was built, or rewritten by compaction). Such
730    /// fragments are scanned unindexed until a reindex (`optimize_indices`)
731    /// folds them in. Returns false when every index covers every fragment, or
732    /// when the table has no (non-system) indices to optimize. A `None`
733    /// `fragment_bitmap` means Lance cannot report coverage for that index, so
734    /// we do not treat it as uncovered (mirrors `key_column_index_coverage`).
735    ///
736    /// Used by `optimize` to decide whether an otherwise-already-compacted
737    /// table still has index work to do.
738    pub async fn has_unindexed_fragments(ds: &Dataset) -> Result<bool> {
739        let indices = ds
740            .load_indices()
741            .await
742            .map_err(|e| OmniError::Lance(e.to_string()))?;
743        let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
744        for index in indices.iter() {
745            if is_system_index(index) {
746                continue;
747            }
748            if let Some(bitmap) = index.fragment_bitmap.as_ref() {
749                if frag_ids.iter().any(|id| !bitmap.contains(*id)) {
750                    return Ok(true);
751                }
752            }
753        }
754        Ok(false)
755    }
756
757    pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
758        ds.count_rows(filter)
759            .await
760            .map(|count| count as usize)
761            .map_err(|e| OmniError::Lance(e.to_string()))
762    }
763
764    pub fn dataset_version(&self, ds: &Dataset) -> u64 {
765        ds.version().version
766    }
767
768    pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
769        Ok(TableState {
770            version: self.dataset_version(ds),
771            row_count: self.count_rows(ds, None).await? as u64,
772            version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
773        })
774    }
775
776    /// Legacy inline-commit append: writes fragments AND commits in one
777    /// call, advancing Lance HEAD as a side effect. Not on the
778    /// `TableStorage` trait surface — the staged primitive `stage_append`
779    /// + `commit_staged` is the engine write path. This inherent
780    /// `pub(crate)` method survives only for recovery test setup. Do not
781    /// add new engine call sites — they re-introduce the multi-phase
782    /// commit drift the trait surface was designed to eliminate.
783    pub(crate) async fn append_batch(
784        &self,
785        dataset_uri: &str,
786        ds: &mut Dataset,
787        batch: RecordBatch,
788    ) -> Result<TableState> {
789        if batch.num_rows() == 0 {
790            return self.table_state(dataset_uri, ds).await;
791        }
792        let schema = batch.schema();
793        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
794        let params = WriteParams {
795            mode: WriteMode::Append,
796            allow_external_blob_outside_bases: true,
797            auto_cleanup: None,
798            skip_auto_cleanup: true,
799            ..Default::default()
800        };
801        ds.append(reader, Some(params))
802            .await
803            .map_err(|e| OmniError::Lance(e.to_string()))?;
804        self.table_state(dataset_uri, ds).await
805    }
806
807    pub async fn append_or_create_batch(
808        dataset_uri: &str,
809        dataset: Option<Dataset>,
810        batch: RecordBatch,
811    ) -> Result<Dataset> {
812        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
813        match dataset {
814            Some(mut ds) => {
815                let params = WriteParams {
816                    mode: WriteMode::Append,
817                    allow_external_blob_outside_bases: true,
818                    auto_cleanup: None,
819                    skip_auto_cleanup: true,
820                    ..Default::default()
821                };
822                ds.append(reader, Some(params))
823                    .await
824                    .map_err(|e| OmniError::Lance(e.to_string()))?;
825                Ok(ds)
826            }
827            None => {
828                let params = WriteParams {
829                    mode: WriteMode::Create,
830                    enable_stable_row_ids: true,
831                    data_storage_version: Some(LanceFileVersion::V2_2),
832                    allow_external_blob_outside_bases: true,
833                    auto_cleanup: None,
834                    skip_auto_cleanup: true,
835                    ..Default::default()
836                };
837                Dataset::write(reader, dataset_uri, Some(params))
838                    .await
839                    .map_err(|e| OmniError::Lance(e.to_string()))
840            }
841        }
842    }
843
844    pub(crate) async fn delete_where(
845        &self,
846        dataset_uri: &str,
847        ds: &mut Dataset,
848        filter: &str,
849    ) -> Result<DeleteState> {
850        let delete_result = ds
851            .delete(filter)
852            .await
853            .map_err(|e| OmniError::Lance(e.to_string()))?;
854        Ok(DeleteState {
855            version: delete_result.new_dataset.version().version,
856            row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
857            deleted_rows: delete_result.num_deleted_rows as usize,
858            version_metadata: self
859                .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
860        })
861    }
862
863    // ─── Staged-write API ────────────────────────────────────────────────────
864    //
865    // These primitives wrap Lance's distributed-write API: each call writes
866    // fragment files to object storage but does NOT advance the dataset's
867    // HEAD or commit a manifest entry. The returned `Transaction` is held by
868    // the caller (typically `MutationStaging` or the loader's accumulator)
869    // and committed at end-of-query via `commit_staged`. On failure the
870    // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
871    //
872    // The extracted `Vec<Fragment>` is for read-your-writes within the same
873    // query: subsequent ops construct a `Scanner` and call
874    // `scanner.with_fragments(staged.clone())` to see staged data alongside
875    // the committed snapshot. Lance's filter pushdown, vector search, and
876    // FTS all respect the supplied fragment list.
877
878    /// Stage an append: write fragment files for `batch`, return the
879    /// uncommitted Lance transaction plus the new fragments for
880    /// read-your-writes.
881    ///
882    /// `prior_stages` is the slice of staged writes already accumulated
883    /// against the **same dataset** in the same query. Pass `&[]` for the
884    /// first call; pass the accumulated stages for subsequent calls. The
885    /// primitive uses this to offset row-ID assignment so chained
886    /// `stage_append` calls don't produce overlapping `_rowid` ranges.
887    /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
888    /// slice gets passed to both.
889    ///
890    /// On stable-row-id datasets we manually populate `row_id_meta` on
891    /// the cloned `new_fragments` we expose for `scan_with_staged`.
892    /// Lance's `InsertBuilder::execute_uncommitted` produces fragments
893    /// with `row_id_meta = None`; row IDs are normally assigned by
894    /// `Transaction::assign_row_ids` during commit. Because
895    /// `scan_with_staged` reads the staged fragments *before* commit,
896    /// the scanner trips on a stable-row-id dataset
897    /// (`Error::internal("Missing row id meta")` from
898    /// `dataset/rowids.rs:22`). The transaction's internal fragment copy
899    /// stays untouched — Lance assigns IDs there independently at commit
900    /// time, and the two ID assignments don't have to agree because no
901    /// caller threads `_rowid` from the staged scan into the commit
902    /// path.
903    ///
904    /// **Contract: `prior_stages` must contain only previous
905    /// `stage_append` results against the same dataset.** Mixing
906    /// stage_merge_insert into `prior_stages` would over-count because
907    /// merge_insert's `new_fragments` include rewrites that don't add
908    /// rows. The engine's parse-time D₂′ check (per touched table: all
909    /// stage_append OR exactly one stage_merge_insert) guarantees this
910    /// upstream; on the primitive layer it's the caller's responsibility.
911    pub async fn stage_append(
912        &self,
913        ds: &Dataset,
914        batch: RecordBatch,
915        prior_stages: &[StagedWrite],
916    ) -> Result<StagedWrite> {
917        if batch.num_rows() == 0 {
918            return Err(OmniError::manifest_internal(
919                "stage_append called with empty batch".to_string(),
920            ));
921        }
922        let params = WriteParams {
923            mode: WriteMode::Append,
924            allow_external_blob_outside_bases: true,
925            auto_cleanup: None,
926            skip_auto_cleanup: true,
927            ..Default::default()
928        };
929        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
930            .with_params(&params)
931            .execute_uncommitted(vec![batch])
932            .await
933            .map_err(|e| OmniError::Lance(e.to_string()))?;
934        let mut new_fragments = match &transaction.operation {
935            Operation::Append { fragments } => fragments.clone(),
936            Operation::Overwrite { fragments, .. } => fragments.clone(),
937            other => {
938                return Err(OmniError::manifest_internal(format!(
939                    "stage_append: unexpected Lance operation {:?}",
940                    std::mem::discriminant(other)
941                )));
942            }
943        };
944        // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
945        // returns fragments with `id = 0` ("Temporary ID" — see lance-6.0.1
946        // `dataset/write.rs:1044/1712`); the real assignment happens during
947        // commit via `Transaction::fragments_with_ids`. Because we expose
948        // these fragments to `scan_with_staged` *before* commit, two staged
949        // fragments (or one staged + the seed) would collide on `id = 0`,
950        // causing Lance's scanner to mishandle the combined list (silent
951        // duplicates / dropped rows). Mirror the commit-time renumbering
952        // here, using `ds.manifest.max_fragment_id() + 1` as the base and
953        // accounting for prior stages.
954        // ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
955        // Lance's Fragment::id (and the commit-time renumbering counter in
956        // Transaction::fragments_with_ids) operate on u64.
957        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
958            + 1
959            + prior_stages_fragment_count(prior_stages);
960        assign_fragment_ids(&mut new_fragments, next_id_base);
961        if ds.manifest.uses_stable_row_ids() {
962            let prior_rows = prior_stages_row_count(prior_stages)?;
963            let start_row_id = ds.manifest.next_row_id + prior_rows;
964            assign_row_id_meta(&mut new_fragments, start_row_id)?;
965        }
966        Ok(StagedWrite {
967            transaction,
968            new_fragments,
969            // Append never supersedes existing fragments.
970            removed_fragment_ids: Vec::new(),
971        })
972    }
973
974    /// Stage a merge_insert (upsert): write fragment files describing the
975    /// merge result, return the uncommitted transaction plus the new
976    /// fragments. The transaction's `Operation::Update` carries the
977    /// fragments-to-remove and fragments-to-add; for read-your-writes we
978    /// expose `new_fragments` (rows that will be visible after commit).
979    ///
980    /// **Contract: do not chain `stage_merge_insert` calls on the same
981    /// table within one query.** Each call's `MergeInsertBuilder` runs
982    /// against the supplied dataset's committed view — it does not see
983    /// fragments produced by a previous staged merge on the same table.
984    /// Two chained `stage_merge_insert`s whose source rows share keys will
985    /// each independently produce `Operation::Update` transactions whose
986    /// `new_fragments` contain a row for the shared key. `scan_with_staged`
987    /// (and `count_rows_with_staged`) will then return both — i.e.
988    /// **duplicates by key**.
989    ///
990    /// This is intrinsic to the underlying Lance API: there is no public
991    /// way to make `MergeInsertBuilder` see uncommitted fragments. The
992    /// engine's `MutationStaging` accumulator works around this by
993    /// concatenating per-table batches in memory and issuing exactly
994    /// one `stage_merge_insert` per touched table at end-of-query (with
995    /// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
996    /// callers of this primitive must respect the contract themselves.
997    ///
998    /// Lift path: either a Lance API extension that lets
999    /// `MergeInsertBuilder` accept additional staged fragments, or an
1000    /// in-memory pre-merge here that folds prior staged batches into the
1001    /// input stream. See `docs/dev/writes.md`.
1002    pub async fn stage_merge_insert(
1003        &self,
1004        ds: Dataset,
1005        batch: RecordBatch,
1006        key_columns: Vec<String>,
1007        when_matched: WhenMatched,
1008        when_not_matched: WhenNotMatched,
1009    ) -> Result<StagedWrite> {
1010        if batch.num_rows() == 0 {
1011            return Err(OmniError::manifest_internal(
1012                "stage_merge_insert called with empty batch".to_string(),
1013            ));
1014        }
1015
1016        // Precondition for the FirstSeen workaround below: every call path that
1017        // reaches stage_merge_insert (load, MutationStaging::finalize,
1018        // branch_merge::publish_rewritten_merge_table) must hand in a source
1019        // batch that is unique by `key_columns`. Without this check,
1020        // `SourceDedupeBehavior::FirstSeen` would silently collapse genuine
1021        // duplicates instead of erroring.
1022        check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
1023
1024        let ds = Arc::new(ds);
1025        let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
1026            .map_err(|e| OmniError::Lance(e.to_string()))?;
1027        builder.when_matched(when_matched);
1028        builder.when_not_matched(when_not_matched);
1029        // Workaround for a Lance bug class where sequential merge_insert calls
1030        // against rows previously rewritten by merge_insert produce a spurious
1031        // "Ambiguous merge inserts: multiple source rows match the same target
1032        // row on (id = ...)" error. Lance's `processed_row_ids:
1033        // Mutex<HashSet<u64>>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`)
1034        // double-processes the same source/target match against datasets
1035        // previously rewritten by merge_insert, and the default
1036        // `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen
1037        // makes Lance skip the duplicate match instead. Correctness-preserving
1038        // because every call path pre-dedupes the source batch by id or surfaces
1039        // a real source dup via `check_batch_unique_by_keys` above (load:
1040        // `enforce_unique_constraints_intra_batch`; mutate:
1041        // `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor`
1042        // walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug
1043        // class. Tracked at MR-957; upstream: lance-format/lance#6877.
1044        builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1045        let job = builder
1046            .try_build()
1047            .map_err(|e| OmniError::Lance(e.to_string()))?;
1048        let schema = batch.schema();
1049        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
1050        let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
1051        let uncommitted = job
1052            .execute_uncommitted(stream)
1053            .await
1054            .map_err(|e| OmniError::Lance(e.to_string()))?;
1055        // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
1056        // `new_fragments` are the freshly inserted rows; `updated_fragments`
1057        // are rewrites of existing fragments that include both retained and
1058        // updated rows; `removed_fragment_ids` lists the committed-manifest
1059        // fragments that those rewrites supersede. For read-your-writes we
1060        // expose `updated_fragments + new_fragments` and the
1061        // `removed_fragment_ids` so `scan_with_staged` can filter the
1062        // superseded committed fragments before combining — otherwise a
1063        // single merge_insert appears as duplicate rows (original committed
1064        // version + rewritten staged version).
1065        let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
1066            Operation::Update {
1067                new_fragments,
1068                updated_fragments,
1069                removed_fragment_ids,
1070                ..
1071            } => {
1072                let mut all = updated_fragments.clone();
1073                all.extend(new_fragments.iter().cloned());
1074                (all, removed_fragment_ids.clone())
1075            }
1076            Operation::Append { fragments } => (fragments.clone(), Vec::new()),
1077            other => {
1078                return Err(OmniError::manifest_internal(format!(
1079                    "stage_merge_insert: unexpected Lance operation {:?}",
1080                    std::mem::discriminant(other)
1081                )));
1082            }
1083        };
1084        Ok(StagedWrite {
1085            transaction: uncommitted.transaction,
1086            new_fragments,
1087            removed_fragment_ids,
1088        })
1089    }
1090
1091    /// Commit a previously-staged transaction onto `ds`, returning the new
1092    /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
1093    /// the publisher at end-of-query to materialize all staged writes before
1094    /// the meta-manifest commit.
1095    pub async fn commit_staged(
1096        &self,
1097        ds: Arc<Dataset>,
1098        transaction: Transaction,
1099    ) -> Result<Dataset> {
1100        // Skip Lance's auto-cleanup hook on every commit. OmniGraph owns version
1101        // GC explicitly (optimize.rs::cleanup_all_tables); Lance's hook fires off
1102        // the *dataset's stored* `lance.auto_cleanup.*` config, which graphs
1103        // created before the v7 bump (6.0.1 defaulted auto_cleanup ON) still
1104        // carry — so `WriteParams::auto_cleanup = None` alone does NOT stop it on
1105        // upgraded graphs. Skipping here covers the staged write path (the main
1106        // data path) for new and legacy datasets alike, preventing Lance from
1107        // GC'ing versions the __manifest still pins for snapshots/time-travel.
1108        CommitBuilder::new(ds)
1109            .with_skip_auto_cleanup(true)
1110            .execute(transaction)
1111            .await
1112            .map_err(|e| OmniError::Lance(e.to_string()))
1113    }
1114
1115    /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
1116    /// Returns a StagedWrite carrying the replacement fragments. HEAD does
1117    /// NOT advance.
1118    ///
1119    /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
1120    /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
1121    /// `Operation::Overwrite` carries the new schema + fragments. The
1122    /// transaction is committed via `commit_staged` (same call as
1123    /// `stage_append`).
1124    ///
1125    /// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
1126    /// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
1127    pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
1128        // `enable_stable_row_ids: true` is defensive — empirically Lance 6.0.1
1129        // preserves the source dataset's flag through `Operation::Overwrite`
1130        // when WriteParams omits it (pinned by
1131        // `stage_overwrite_preserves_stable_row_ids` in tests/staged_writes.rs),
1132        // but setting it explicitly keeps the invariant documented at every Overwrite site
1133        // (see docs/storage.md "Stable row IDs"). Setting it on an existing
1134        // dataset that was created without stable row IDs is a no-op per
1135        // Lance's row-id-lineage spec, so this stays correct for legacy
1136        // datasets.
1137        let (transaction, mut new_fragments) = if batch.num_rows() == 0 {
1138            let schema = LanceSchema::try_from(batch.schema().as_ref())
1139                .map_err(|e| OmniError::Lance(e.to_string()))?;
1140            let transaction = TransactionBuilder::new(
1141                ds.manifest.version,
1142                Operation::Overwrite {
1143                    fragments: Vec::new(),
1144                    schema,
1145                    config_upsert_values: None,
1146                    initial_bases: None,
1147                },
1148            )
1149            .build();
1150            (transaction, Vec::new())
1151        } else {
1152            let params = WriteParams {
1153                mode: WriteMode::Overwrite,
1154                enable_stable_row_ids: true,
1155                allow_external_blob_outside_bases: true,
1156                auto_cleanup: None,
1157                skip_auto_cleanup: true,
1158                ..Default::default()
1159            };
1160            let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1161                .with_params(&params)
1162                .execute_uncommitted(vec![batch])
1163                .await
1164                .map_err(|e| OmniError::Lance(e.to_string()))?;
1165            let new_fragments = match &transaction.operation {
1166                Operation::Overwrite { fragments, .. } => fragments.clone(),
1167                other => {
1168                    return Err(OmniError::manifest_internal(format!(
1169                        "stage_overwrite: unexpected Lance operation {:?}",
1170                        std::mem::discriminant(other)
1171                    )));
1172                }
1173            };
1174            (transaction, new_fragments)
1175        };
1176        // Overwrite REPLACES every committed fragment, and Lance restarts
1177        // fragment-ID and row-ID counters at the post-commit version.
1178        // For our pre-commit staged view we need to:
1179        //   1) Renumber temporary fragment IDs (Lance returns them as
1180        //      `id = 0` from `execute_uncommitted` — see stage_append
1181        //      for the same fix). For Overwrite there are no committed
1182        //      fragments to collide with (they're all in
1183        //      removed_fragment_ids below), so start at 1.
1184        //   2) For stable-row-id datasets, assign row_id_meta starting
1185        //      at 0 (Overwrite is a fresh-start) so `scan_with_staged`
1186        //      doesn't hit the "Missing row id meta" panic in
1187        //      lance-6.0.1 dataset/rowids.rs:22.
1188        assign_fragment_ids(&mut new_fragments, 1);
1189        if ds.manifest.uses_stable_row_ids() {
1190            assign_row_id_meta(&mut new_fragments, 0)?;
1191        }
1192        // Overwrite REPLACES every committed fragment. For
1193        // read-your-writes via scan_with_staged, list every committed
1194        // fragment in removed_fragment_ids so the post-stage view shows
1195        // ONLY the staged fragments.
1196        let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
1197        Ok(StagedWrite {
1198            transaction,
1199            new_fragments,
1200            removed_fragment_ids,
1201        })
1202    }
1203
1204    /// Stage a BTREE scalar index build. Returns a StagedWrite whose
1205    /// transaction commits via `commit_staged`. HEAD does NOT advance.
1206    ///
1207    /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
1208    /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
1209    /// { new_indices, removed_indices }` via the public `TransactionBuilder`,
1210    /// replicating the simple (non-segment-commit-path) branch of Lance's
1211    /// `CreateIndexBuilder::execute` (lance-6.0.1 `src/index/create.rs:502-512`).
1212    ///
1213    /// `removed_indices` mirrors `execute()` lines 466-476: when the
1214    /// build replaces an existing same-named index, those entries are
1215    /// listed for tombstoning by the manifest commit.
1216    ///
1217    /// MR-793 Phase 2: scalar index types (BTree, Inverted) are
1218    /// stage-able. Vector indices are NOT (segment-commit-path requires
1219    /// `build_index_metadata_from_segments` which is `pub(crate)` in
1220    /// lance-6.0.1); see `create_vector_index` and Appendix A.3.
1221    pub async fn stage_create_btree_index(
1222        &self,
1223        ds: &Dataset,
1224        columns: &[&str],
1225    ) -> Result<StagedWrite> {
1226        let params = ScalarIndexParams::default();
1227        let mut ds_clone = ds.clone();
1228        let new_idx = ds_clone
1229            .create_index_builder(columns, IndexType::BTree, &params)
1230            .replace(true)
1231            .execute_uncommitted()
1232            .await
1233            .map_err(|e| OmniError::Lance(format!("stage_create_btree_index: {}", e)))?;
1234        let removed_indices: Vec<IndexMetadata> = ds
1235            .load_indices()
1236            .await
1237            .map_err(|e| OmniError::Lance(e.to_string()))?
1238            .iter()
1239            .filter(|idx| idx.name == new_idx.name)
1240            .cloned()
1241            .collect();
1242        let transaction = TransactionBuilder::new(
1243            new_idx.dataset_version,
1244            Operation::CreateIndex {
1245                new_indices: vec![new_idx],
1246                removed_indices,
1247            },
1248        )
1249        .build();
1250        Ok(StagedWrite {
1251            transaction,
1252            new_fragments: Vec::new(),
1253            removed_fragment_ids: Vec::new(),
1254        })
1255    }
1256
1257    /// Stage an INVERTED (FTS) scalar index build. Same shape as
1258    /// `stage_create_btree_index`; see its docs for the Lance API
1259    /// citation and contract notes.
1260    pub async fn stage_create_inverted_index(
1261        &self,
1262        ds: &Dataset,
1263        column: &str,
1264    ) -> Result<StagedWrite> {
1265        let params = InvertedIndexParams::default();
1266        let mut ds_clone = ds.clone();
1267        let new_idx = ds_clone
1268            .create_index_builder(&[column], IndexType::Inverted, &params)
1269            .replace(true)
1270            .execute_uncommitted()
1271            .await
1272            .map_err(|e| OmniError::Lance(format!("stage_create_inverted_index: {}", e)))?;
1273        let removed_indices: Vec<IndexMetadata> = ds
1274            .load_indices()
1275            .await
1276            .map_err(|e| OmniError::Lance(e.to_string()))?
1277            .iter()
1278            .filter(|idx| idx.name == new_idx.name)
1279            .cloned()
1280            .collect();
1281        let transaction = TransactionBuilder::new(
1282            new_idx.dataset_version,
1283            Operation::CreateIndex {
1284                new_indices: vec![new_idx],
1285                removed_indices,
1286            },
1287        )
1288        .build();
1289        Ok(StagedWrite {
1290            transaction,
1291            new_fragments: Vec::new(),
1292            removed_fragment_ids: Vec::new(),
1293        })
1294    }
1295
1296    /// Run a scan with optional uncommitted staged writes visible
1297    /// alongside the committed snapshot. When `staged` is empty this is
1298    /// identical to `scan(...)`.
1299    ///
1300    /// Composes the visible fragment list as `committed - removed + new`:
1301    /// the committed manifest's fragments, minus any fragment IDs that
1302    /// staged `Operation::Update`s (merge_insert rewrites) have superseded,
1303    /// plus the staged new/updated fragments. Without the `removed`
1304    /// filter, a merge_insert that rewrites an existing fragment would
1305    /// surface twice — once via the original committed fragment, once via
1306    /// the rewrite in `new_fragments`.
1307    ///
1308    /// **Filter contract is incomplete on staged fragments.** When `filter`
1309    /// is `Some(...)`, Lance pushes the predicate to per-fragment scans
1310    /// with stats-based pruning. Uncommitted fragments produced by
1311    /// `write_fragments_internal` lack the per-column statistics that
1312    /// committed fragments carry; Lance's optimizer drops them from the
1313    /// filtered scan even when their data would match. Staged-fragment
1314    /// rows are silently absent from the result. `scanner.use_stats(false)`
1315    /// does not fix this in lance 6.0.1. Callers needing correct filtered
1316    /// reads against staged data should use a different strategy — the
1317    /// engine's `MutationStaging` accumulator unions in-memory pending
1318    /// batches with the committed scan via DataFusion `MemTable` (see
1319    /// `scan_with_pending`).
1320    ///
1321    /// This method remains on the surface for primitive-level testing
1322    /// (basic stage + scan correctness without filters works) and for
1323    /// callers that don't need filter pushdown.
1324    pub async fn scan_with_staged(
1325        &self,
1326        ds: &Dataset,
1327        staged: &[StagedWrite],
1328        projection: Option<&[&str]>,
1329        filter: Option<&str>,
1330    ) -> Result<Vec<RecordBatch>> {
1331        if staged.is_empty() {
1332            return self.scan(ds, projection, filter, None).await;
1333        }
1334        let mut scanner = ds.scan();
1335        if let Some(cols) = projection {
1336            let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
1337            scanner
1338                .project(&owned)
1339                .map_err(|e| OmniError::Lance(e.to_string()))?;
1340        }
1341        if let Some(f) = filter {
1342            scanner
1343                .filter(f)
1344                .map_err(|e| OmniError::Lance(e.to_string()))?;
1345        }
1346        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1347        let stream = scanner
1348            .try_into_stream()
1349            .await
1350            .map_err(|e| OmniError::Lance(e.to_string()))?;
1351        stream
1352            .try_collect()
1353            .await
1354            .map_err(|e| OmniError::Lance(e.to_string()))
1355    }
1356
1357    /// Scan committed via Lance + apply the same filter to in-memory
1358    /// pending batches via DataFusion `MemTable`, concat the two result
1359    /// streams. The replacement for `scan_with_staged` in engine code:
1360    /// the staged-write writer accumulates input batches in memory and
1361    /// unions them with the committed snapshot at read time,
1362    /// sidestepping the `Scanner::with_fragments` filter-pushdown
1363    /// limitation documented on `scan_with_staged`.
1364    ///
1365    /// `committed_ds` should be opened at the pre-mutation
1366    /// `expected_version` (the same version captured in `MutationStaging::expected_versions`
1367    /// at first touch of the table). `pending_batches` are the per-table
1368    /// accumulator's batches in their input shape. `pending_schema` is
1369    /// the schema of the accumulated batches; passing `None` falls back
1370    /// to the schema of the first pending batch.
1371    ///
1372    /// `filter` is the Lance / DataFusion SQL predicate. It is applied
1373    /// to both sides — Lance pushes it down on the committed side; the
1374    /// pending side runs it through a fresh DataFusion `SessionContext`
1375    /// with the batches registered as a `MemTable` named `pending`.
1376    ///
1377    /// `key_column` controls how committed and pending are unioned:
1378    /// - **`None` (union semantics)**: every committed row that matches
1379    ///   the filter and every pending row that matches the filter is
1380    ///   returned. Correct when committed and pending cannot share a
1381    ///   primary key — e.g., Append-mode loads with ULID-generated ids,
1382    ///   or any read where pending hasn't been used to update committed
1383    ///   rows.
1384    /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
1385    ///   `col` value appears in any pending batch are EXCLUDED from the
1386    ///   result; only pending's view of those rows is returned. Required
1387    ///   for Merge-mode reads (e.g., `execute_update` on the engine path)
1388    ///   so a chained `update` doesn't see stale committed values that
1389    ///   a prior op already updated in pending. Without this, a predicate
1390    ///   like `where age > 30` can match a row that an earlier
1391    ///   `set age = 20` already moved out of range.
1392    ///
1393    /// When `pending_batches` is empty this delegates to the regular
1394    /// scan path.
1395    pub async fn scan_with_pending(
1396        &self,
1397        committed_ds: &Dataset,
1398        pending_batches: &[RecordBatch],
1399        pending_schema: Option<SchemaRef>,
1400        projection: Option<&[&str]>,
1401        filter: Option<&str>,
1402        key_column: Option<&str>,
1403    ) -> Result<Vec<RecordBatch>> {
1404        // Contract: when merge-shadow semantics are requested via
1405        // `key_column`, the committed-side projection MUST include that
1406        // column so we can filter committed rows whose key appears in
1407        // pending. Silently dropping the shadow when projection omits
1408        // the key would re-introduce union semantics behind the
1409        // caller's back. Reject up front with a clear error so callers
1410        // either (a) include the key in projection or (b) drop
1411        // `key_column` if union is what they wanted.
1412        if let (Some(key_col), Some(cols)) = (key_column, projection) {
1413            if !cols.iter().any(|c| *c == key_col) {
1414                return Err(OmniError::Lance(format!(
1415                    "scan_with_pending: key_column '{}' must appear in projection \
1416                     when merge-shadow semantics are requested (got projection = {:?})",
1417                    key_col, cols
1418                )));
1419            }
1420        }
1421
1422        let committed = self.scan(committed_ds, projection, filter, None).await?;
1423        if pending_batches.is_empty() {
1424            return Ok(committed);
1425        }
1426
1427        // Shadow committed rows whose key value also appears in pending.
1428        // This makes scan_with_pending implement merge semantics rather
1429        // than naive union: any row that has a pending update is
1430        // represented ONLY by its pending value, never by both its
1431        // (stale) committed value and its (current) pending value.
1432        let committed = match key_column {
1433            Some(key_col) => {
1434                let pending_keys = collect_string_column_values(pending_batches, key_col)?;
1435                if pending_keys.is_empty() {
1436                    committed
1437                } else {
1438                    filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
1439                }
1440            }
1441            None => committed,
1442        };
1443
1444        let pending =
1445            scan_pending_batches(pending_batches, pending_schema, projection, filter).await?;
1446
1447        let mut out = committed;
1448        out.extend(pending);
1449        Ok(out)
1450    }
1451
1452    /// `count_rows` variant that respects staged writes. Used for
1453    /// edge-cardinality validation that needs to see staged edges before
1454    /// commit. Same `committed - removed + new` composition as
1455    /// `scan_with_staged`.
1456    pub async fn count_rows_with_staged(
1457        &self,
1458        ds: &Dataset,
1459        staged: &[StagedWrite],
1460        filter: Option<String>,
1461    ) -> Result<usize> {
1462        if staged.is_empty() {
1463            return self.count_rows(ds, filter).await;
1464        }
1465        let mut scanner = ds.scan();
1466        if let Some(f) = filter {
1467            scanner
1468                .filter(&f)
1469                .map_err(|e| OmniError::Lance(e.to_string()))?;
1470        }
1471        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1472        let count = scanner
1473            .count_rows()
1474            .await
1475            .map_err(|e| OmniError::Lance(e.to_string()))?;
1476        Ok(count as usize)
1477    }
1478
1479    async fn user_indices_for_column(
1480        &self,
1481        ds: &Dataset,
1482        column: &str,
1483    ) -> Result<Vec<IndexMetadata>> {
1484        let field_id = ds
1485            .schema()
1486            .field(column)
1487            .map(|field| field.id)
1488            .ok_or_else(|| {
1489                OmniError::manifest_internal(format!(
1490                    "dataset is missing expected index column '{}'",
1491                    column
1492                ))
1493            })?;
1494        let indices = ds
1495            .load_indices()
1496            .await
1497            .map_err(|e| OmniError::Lance(e.to_string()))?;
1498        Ok(indices
1499            .iter()
1500            .filter(|index| !is_system_index(index))
1501            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
1502            .cloned()
1503            .collect())
1504    }
1505
1506    pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1507        let indices = self.user_indices_for_column(ds, column).await?;
1508        Ok(indices.iter().any(|index| {
1509            index
1510                .index_details
1511                .as_ref()
1512                .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
1513                .unwrap_or(false)
1514        }))
1515    }
1516
1517    pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1518        let indices = self.user_indices_for_column(ds, column).await?;
1519        Ok(indices.iter().any(|index| {
1520            index
1521                .index_details
1522                .as_ref()
1523                .map(|details| IndexDetails(details.clone()).supports_fts())
1524                .unwrap_or(false)
1525        }))
1526    }
1527
1528    pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1529        let indices = self.user_indices_for_column(ds, column).await?;
1530        Ok(indices.iter().any(|index| {
1531            index
1532                .index_details
1533                .as_ref()
1534                .map(|details| IndexDetails(details.clone()).is_vector())
1535                .unwrap_or(false)
1536        }))
1537    }
1538
1539    pub(crate) async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1540        let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
1541        ds.create_index_builder(&[column], IndexType::Vector, &params)
1542            .replace(true)
1543            .await
1544            .map(|_| ())
1545            .map_err(|e| OmniError::Lance(e.to_string()))
1546    }
1547
1548    pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
1549        let batch = RecordBatch::new_empty(schema.clone());
1550        Self::write_dataset(dataset_uri, batch).await
1551    }
1552
1553    pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
1554        let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
1555            .await?
1556            .try_collect::<Vec<RecordBatch>>()
1557            .await
1558            .map_err(|e| OmniError::Lance(e.to_string()))?;
1559        Ok(batches.iter().find_map(|batch| {
1560            batch
1561                .column_by_name("_rowid")
1562                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1563                .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
1564        }))
1565    }
1566
1567    pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
1568        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
1569        let params = WriteParams {
1570            mode: WriteMode::Create,
1571            enable_stable_row_ids: true,
1572            data_storage_version: Some(LanceFileVersion::V2_2),
1573            allow_external_blob_outside_bases: true,
1574            auto_cleanup: None,
1575            skip_auto_cleanup: true,
1576            ..Default::default()
1577        };
1578        Dataset::write(reader, dataset_uri, Some(params))
1579            .await
1580            .map_err(|e| OmniError::Lance(e.to_string()))
1581    }
1582}
1583
1584/// Build the `Scanner::with_fragments` argument for read-your-writes:
1585/// committed manifest fragments minus any fragment IDs superseded by the
1586/// staged writes, plus the staged `new_fragments`. Order is:
1587///   1. committed fragments whose IDs are NOT in any staged
1588///      `removed_fragment_ids` (preserves committed order),
1589///   2. all staged `new_fragments` in stage order.
1590///
1591/// Lance's `Scanner` does not require any particular ordering between
1592/// committed and staged fragments — `with_fragments` scopes the scan to
1593/// exactly the supplied list. The dedup matters because merge_insert
1594/// rewrites a fragment in place at the Lance layer: the rewritten
1595/// fragment is in `new_fragments`, the original (which it supersedes) is
1596/// in `committed` until manifest commit, and including both would yield
1597/// duplicate rows.
1598///
1599/// **Inter-stage supersession is not handled here.** Each StagedWrite's
1600/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
1601/// later staged merge cannot know about an earlier staged merge's
1602/// fragments (Lance's `MergeInsertBuilder` runs against the committed
1603/// view). If two `stage_merge_insert`s on the same table produce rows
1604/// with the same key, the combined view returns duplicates by key. The
1605/// engine's mutation path enforces "per touched table: all stage_append
1606/// OR exactly one stage_merge_insert" at parse time (D₂′ in
1607/// `exec/mutation.rs`) so this primitive's caller never chains merges.
1608/// See `stage_merge_insert` for the full contract.
1609/// Sum `physical_rows` across all fragments in the supplied stages.
1610/// Used by `stage_append` to compute the row-ID offset for chained
1611/// `stage_append` calls against the same dataset.
1612///
1613/// Assumes `prior_stages` contains only `stage_append` results — see
1614/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the
1615/// `new_fragments` include rewrites that don't add new rows, so this
1616/// would over-count.
1617fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
1618    prior_stages
1619        .iter()
1620        .map(|s| s.new_fragments.len() as u64)
1621        .sum()
1622}
1623
1624/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
1625/// commit-time `Transaction::fragments_with_ids` (lance-6.0.1
1626/// `dataset/transaction.rs:1456`) — fragments produced by
1627/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
1628/// placeholder; we renumber here so they don't collide with committed
1629/// fragments (or with each other across chained stages) when the slice is
1630/// passed to `Scanner::with_fragments`.
1631fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
1632    for (i, fragment) in fragments.iter_mut().enumerate() {
1633        if fragment.id == 0 {
1634            fragment.id = start_id + i as u64;
1635        }
1636    }
1637}
1638
1639fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
1640    let mut total: u64 = 0;
1641    for stage in prior_stages {
1642        for fragment in &stage.new_fragments {
1643            let physical_rows = fragment.physical_rows.ok_or_else(|| {
1644                OmniError::manifest_internal(
1645                    "prior_stages_row_count: fragment is missing physical_rows".to_string(),
1646                )
1647            })? as u64;
1648            total += physical_rows;
1649        }
1650    }
1651    Ok(total)
1652}
1653
1654/// Assign sequential row IDs to fragments that lack them, starting from
1655/// `start_row_id`. Mirrors the relevant arm of Lance's
1656/// `Transaction::assign_row_ids` (lance-6.0.1 `dataset/transaction.rs:2682`)
1657/// for the `row_id_meta = None` case — fragments produced by
1658/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
1659///
1660/// Used only by `stage_append` for read-your-writes — see its docstring
1661/// for why pre-commit assignment is needed and why diverging from Lance's
1662/// commit-time IDs is safe.
1663fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
1664    let mut next_row_id = start_row_id;
1665    for fragment in fragments {
1666        if fragment.row_id_meta.is_some() {
1667            continue;
1668        }
1669        let physical_rows = fragment.physical_rows.ok_or_else(|| {
1670            OmniError::manifest_internal(
1671                "stage_append: fragment is missing physical_rows".to_string(),
1672            )
1673        })? as u64;
1674        let row_ids = next_row_id..(next_row_id + physical_rows);
1675        let sequence = RowIdSequence::from(row_ids);
1676        let serialized = write_row_ids(&sequence);
1677        fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
1678        next_row_id += physical_rows;
1679    }
1680    Ok(())
1681}
1682
1683/// Collect the set of values in a Utf8 column across multiple batches.
1684/// Used by `scan_with_pending`'s merge-semantic path to identify
1685/// committed rows that are shadowed by pending writes. NULL values are
1686/// skipped.
1687fn collect_string_column_values(
1688    batches: &[RecordBatch],
1689    column: &str,
1690) -> Result<std::collections::HashSet<String>> {
1691    use arrow_array::{Array, StringArray};
1692    let mut out = std::collections::HashSet::new();
1693    for batch in batches {
1694        let Some(col) = batch.column_by_name(column) else {
1695            return Err(OmniError::Lance(format!(
1696                "scan_with_pending: pending batch missing key column '{}'",
1697                column
1698            )));
1699        };
1700        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1701            OmniError::Lance(format!(
1702                "scan_with_pending: key column '{}' is not Utf8",
1703                column
1704            ))
1705        })?;
1706        for i in 0..arr.len() {
1707            if arr.is_valid(i) {
1708                out.insert(arr.value(i).to_string());
1709            }
1710        }
1711    }
1712    Ok(out)
1713}
1714
1715/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
1716/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
1717/// rows that pending has already updated. Returns the surviving rows.
1718///
1719/// `scan_with_pending` validates up front that the projection contains
1720/// `column`, so a missing column here is a programmer error — error
1721/// loudly instead of silently passing batches through (which would
1722/// re-introduce the union semantics the caller asked us to avoid).
1723fn filter_out_rows_where_string_in(
1724    batches: Vec<RecordBatch>,
1725    column: &str,
1726    excluded: &std::collections::HashSet<String>,
1727) -> Result<Vec<RecordBatch>> {
1728    use arrow_array::{Array, BooleanArray, StringArray};
1729    let mut out = Vec::with_capacity(batches.len());
1730    for batch in batches {
1731        if batch.num_rows() == 0 {
1732            out.push(batch);
1733            continue;
1734        }
1735        let col = batch.column_by_name(column).ok_or_else(|| {
1736            OmniError::manifest_internal(format!(
1737                "scan_with_pending: committed batch missing key column '{}' \
1738                 (the up-front projection check should have rejected this)",
1739                column
1740            ))
1741        })?;
1742        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1743            OmniError::Lance(format!(
1744                "scan_with_pending: committed column '{}' is not Utf8",
1745                column
1746            ))
1747        })?;
1748        let mask: BooleanArray = (0..arr.len())
1749            .map(|i| {
1750                if arr.is_valid(i) {
1751                    Some(!excluded.contains(arr.value(i)))
1752                } else {
1753                    Some(true)
1754                }
1755            })
1756            .collect();
1757        let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
1758            .map_err(|e| OmniError::Lance(e.to_string()))?;
1759        out.push(filtered);
1760    }
1761    Ok(out)
1762}
1763
1764/// Apply `projection` and `filter` to in-memory pending batches via a
1765/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
1766/// the read-your-writes side of the in-memory staging accumulator.
1767///
1768/// `pending_batches` must be non-empty (the caller short-circuits on
1769/// empty).
1770///
1771/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
1772/// on the committed side. Lance and DataFusion both accept standard
1773/// SQL comparison predicates (`col op literal`) and OmniGraph's
1774/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
1775/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
1776/// scanner extension (vector search, FTS, `_rowid` references) into
1777/// the filter, this function will need explicit translation — DataFusion
1778/// won't recognize those operators against the in-memory `MemTable`.
1779async fn scan_pending_batches(
1780    pending_batches: &[RecordBatch],
1781    pending_schema: Option<SchemaRef>,
1782    projection: Option<&[&str]>,
1783    filter: Option<&str>,
1784) -> Result<Vec<RecordBatch>> {
1785    let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
1786    let ctx = datafusion::execution::context::SessionContext::new();
1787    let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
1788        .map_err(|e| OmniError::Lance(e.to_string()))?;
1789    ctx.register_table("pending", Arc::new(mem))
1790        .map_err(|e| OmniError::Lance(e.to_string()))?;
1791
1792    let proj = projection
1793        .map(|cols| {
1794            cols.iter()
1795                .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
1796                .collect::<Vec<_>>()
1797                .join(", ")
1798        })
1799        .unwrap_or_else(|| "*".to_string());
1800    let where_clause = filter.map(|f| format!("WHERE {f}")).unwrap_or_default();
1801    let sql = format!("SELECT {proj} FROM pending {where_clause}");
1802    let df = ctx
1803        .sql(&sql)
1804        .await
1805        .map_err(|e| OmniError::Lance(e.to_string()))?;
1806    df.collect()
1807        .await
1808        .map_err(|e| OmniError::Lance(e.to_string()))
1809}
1810
1811fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
1812    let removed: std::collections::HashSet<u64> = staged
1813        .iter()
1814        .flat_map(|w| w.removed_fragment_ids.iter().copied())
1815        .collect();
1816    let mut combined: Vec<Fragment> = ds
1817        .manifest
1818        .fragments
1819        .iter()
1820        .filter(|f| !removed.contains(&f.id))
1821        .cloned()
1822        .collect();
1823    for write in staged {
1824        combined.extend(write.new_fragments.iter().cloned());
1825    }
1826    combined
1827}
1828
1829/// Precondition guard for `stage_merge_insert`.
1830/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
1831/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
1832/// collapse genuine duplicate source keys; this check restores fail-fast
1833/// behavior on real dups by erroring before the builder gets a chance to
1834/// silently skip them.
1835///
1836/// Today only single-column string keys are used at the call sites
1837/// (`vec!["id".to_string()]`). The check restricts itself to that shape
1838/// and surfaces an internal error if a future caller passes anything
1839/// else — keeping the assumption explicit instead of silently degrading.
1840fn check_batch_unique_by_keys(
1841    batch: &RecordBatch,
1842    key_columns: &[String],
1843    context: &'static str,
1844) -> Result<()> {
1845    if key_columns.len() != 1 {
1846        return Err(OmniError::manifest_internal(format!(
1847            "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}",
1848            context, key_columns
1849        )));
1850    }
1851    let key_col_name = &key_columns[0];
1852    let column = batch.column_by_name(key_col_name).ok_or_else(|| {
1853        OmniError::manifest_internal(format!(
1854            "{}: source batch missing key column '{}'",
1855            context, key_col_name
1856        ))
1857    })?;
1858    let strs = column
1859        .as_any()
1860        .downcast_ref::<StringArray>()
1861        .ok_or_else(|| {
1862            OmniError::manifest_internal(format!(
1863                "{}: key column '{}' is not a StringArray (got {:?})",
1864                context,
1865                key_col_name,
1866                column.data_type()
1867            ))
1868        })?;
1869
1870    let mut seen: std::collections::HashSet<&str> =
1871        std::collections::HashSet::with_capacity(batch.num_rows());
1872    for i in 0..strs.len() {
1873        if !strs.is_valid(i) {
1874            continue;
1875        }
1876        let v = strs.value(i);
1877        if !seen.insert(v) {
1878            return Err(OmniError::manifest(format!(
1879                "{}: duplicate source row for key '{}' (column '{}'); \
1880                 callers must hand in a batch unique by `key_columns` \
1881                 — see MR-957",
1882                context, v, key_col_name
1883            )));
1884        }
1885    }
1886    Ok(())
1887}
1888
1889#[cfg(test)]
1890mod tests {
1891    use super::*;
1892    use arrow_array::StringArray;
1893    use arrow_schema::{DataType, Field, Schema};
1894
1895    fn batch_with_ids(ids: &[&str]) -> RecordBatch {
1896        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
1897        let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef;
1898        RecordBatch::try_new(schema, vec![col]).unwrap()
1899    }
1900
1901    #[test]
1902    fn check_batch_unique_by_keys_passes_when_all_unique() {
1903        let batch = batch_with_ids(&["a", "b", "c"]);
1904        check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap();
1905    }
1906
1907    #[test]
1908    fn check_batch_unique_by_keys_errors_on_duplicate_id() {
1909        let batch = batch_with_ids(&["a", "b", "a"]);
1910        let err = check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err();
1911        let msg = err.to_string();
1912        assert!(
1913            msg.contains("duplicate source row for key 'a'"),
1914            "unexpected error: {msg}"
1915        );
1916        assert!(
1917            msg.contains("MR-957"),
1918            "error should reference MR-957: {msg}"
1919        );
1920    }
1921
1922    #[test]
1923    fn check_batch_unique_by_keys_rejects_multi_column_keys() {
1924        let batch = batch_with_ids(&["a"]);
1925        let err =
1926            check_batch_unique_by_keys(&batch, &["id".to_string(), "other".to_string()], "test")
1927                .unwrap_err();
1928        assert!(err.to_string().contains("single-column keys only"));
1929    }
1930}