Skip to main content

omnigraph/
table_store.rs

1use arrow_array::{
2    Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
3};
4use arrow_schema::SchemaRef;
5use datafusion::physical_plan::SendableRecordBatchStream;
6use futures::TryStreamExt;
7use lance::Dataset;
8use lance::blob::BlobArrayBuilder;
9use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
10use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{
13    CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
14    WriteParams,
15};
16use lance::datatypes::{BlobKind, Schema as LanceSchema};
17use lance::index::DatasetIndexExt;
18use lance::index::scalar::IndexDetails;
19use lance_file::version::LanceFileVersion;
20use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
21use lance_index::{IndexType, is_system_index};
22use lance_linalg::distance::MetricType;
23use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
24use lance_table::rowids::{RowIdSequence, write_row_ids};
25use std::sync::Arc;
26
27use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
28use crate::db::{Snapshot, SubTableEntry};
29use crate::error::{OmniError, Result};
30use crate::storage_layer::ForkOutcome;
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct TableState {
34    pub version: u64,
35    pub row_count: u64,
36    pub(crate) version_metadata: TableVersionMetadata,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct DeleteState {
41    pub version: u64,
42    pub row_count: u64,
43    pub deleted_rows: usize,
44    pub(crate) version_metadata: TableVersionMetadata,
45}
46
47/// Whether a `key_col IN (...)` scan on a dataset will be served by the
48/// persisted scalar (BTREE) index, or silently fall back to a full filtered
49/// scan. Detection-only (metadata, no IO); the scan returns the correct rows
50/// either way. Surfaced by the indexed traversal path so the silent perf
51/// fallback is observable, and available to a future cost-based planner.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum IndexCoverage {
54    /// The column has a usable BTREE and every fragment records `physical_rows`.
55    Indexed,
56    /// Lance will not use the scalar index for this scan (correct, full scan).
57    Degraded { reason: String },
58}
59
60/// A Lance write that has produced fragment files on object storage but is
61/// not yet committed to the dataset's manifest. The staged-write primitives
62/// are consumed by `MutationStaging` (`exec/staging.rs`,
63/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
64/// intent: defer Lance commits to end-of-query so a mid-query failure
65/// leaves the touched table at the pre-mutation HEAD instead of
66/// drifting ahead. See `docs/dev/writes.md` for the publisher-CAS contract
67/// this builds on.
68///
69/// `transaction` is opaque from our side — Lance owns its semantics. We
70/// commit it via `CommitBuilder::execute(transaction)` (see
71/// `TableStore::commit_staged`).
72///
73/// For read-your-writes within the same query, `new_fragments` and
74/// `removed_fragment_ids` together describe the post-stage view delta:
75/// `scan_with_staged` (and `count_rows_with_staged`) compose
76/// `committed - removed + new` so subsequent reads see the staged result
77/// without double-counting fragments that `Operation::Update` rewrote.
78/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
79/// existing fragments would yield duplicate rows (the original fragment
80/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
81#[derive(Debug, Clone)]
82pub struct StagedWrite {
83    pub transaction: Transaction,
84    /// Fragments to surface alongside the committed manifest in
85    /// `Scanner::with_fragments(committed - removed + new)`. For
86    /// `Operation::Append` these are the freshly-appended fragments. For
87    /// `Operation::Update` (merge_insert) these are
88    /// `updated_fragments + new_fragments` (rewrites + freshly-inserted
89    /// rows).
90    pub new_fragments: Vec<Fragment>,
91    /// Fragment IDs that this staged write supersedes. The committed
92    /// manifest must filter these out before being combined with
93    /// `new_fragments` for read-your-writes scans, otherwise rewrites
94    /// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
95    /// adds without removing anything); populated from
96    /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
97    pub removed_fragment_ids: Vec<u64>,
98}
99
100#[derive(Debug, Clone)]
101pub struct TableStore {
102    root_uri: String,
103}
104
105impl TableStore {
106    pub fn new(root_uri: &str) -> Self {
107        Self {
108            root_uri: root_uri.trim_end_matches('/').to_string(),
109        }
110    }
111
112    pub fn root_uri(&self) -> &str {
113        &self.root_uri
114    }
115
116    pub fn dataset_uri(&self, table_path: &str) -> String {
117        format!("{}/{}", self.root_uri, table_path)
118    }
119
120    fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
121        let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
122        let table_path = dataset_uri
123            .strip_prefix(&prefix)
124            .map(|path| path.to_string())
125            .ok_or_else(|| {
126                OmniError::manifest_internal(format!(
127                    "dataset uri '{}' is not under root '{}'",
128                    dataset_uri, self.root_uri
129                ))
130            })?;
131        Ok(table_path
132            .split_once("/tree/")
133            .map(|(path, _)| path.to_string())
134            .unwrap_or(table_path))
135    }
136
137    fn dataset_version_metadata(
138        &self,
139        dataset_uri: &str,
140        ds: &Dataset,
141    ) -> Result<TableVersionMetadata> {
142        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
143        TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
144    }
145
146    pub async fn open_snapshot_table(
147        &self,
148        snapshot: &Snapshot,
149        table_key: &str,
150    ) -> Result<Dataset> {
151        snapshot.open(table_key).await
152    }
153
154    pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
155        entry.open(&self.root_uri).await
156    }
157
158    pub async fn open_dataset_head(
159        &self,
160        dataset_uri: &str,
161        branch: Option<&str>,
162    ) -> Result<Dataset> {
163        let ds = Dataset::open(dataset_uri)
164            .await
165            .map_err(|e| OmniError::Lance(e.to_string()))?;
166        match branch {
167            Some(branch) if branch != "main" => ds
168                .checkout_branch(branch)
169                .await
170                .map_err(|e| OmniError::Lance(e.to_string())),
171            _ => Ok(ds),
172        }
173    }
174
175    pub async fn open_dataset_head_for_write(
176        &self,
177        table_key: &str,
178        dataset_uri: &str,
179        branch: Option<&str>,
180    ) -> Result<Dataset> {
181        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
182        open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
183    }
184
185    pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
186        let mut ds = Dataset::open(dataset_uri)
187            .await
188            .map_err(|e| OmniError::Lance(e.to_string()))?;
189        ds.delete_branch(branch)
190            .await
191            .map_err(|e| OmniError::Lance(e.to_string()))
192    }
193
194    /// List the named Lance branches present on the dataset at `dataset_uri`.
195    /// The `cleanup` orphan reconciler diffs this against the manifest branch
196    /// set to find orphaned per-table forks. `main`/default is not a named
197    /// branch and never appears here.
198    pub async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
199        let ds = Dataset::open(dataset_uri)
200            .await
201            .map_err(|e| OmniError::Lance(e.to_string()))?;
202        let branches = ds
203            .list_branches()
204            .await
205            .map_err(|e| OmniError::Lance(e.to_string()))?;
206        Ok(branches.into_keys().collect())
207    }
208
209    /// Idempotently drop `branch` from the dataset at `dataset_uri`.
210    ///
211    /// Unlike [`delete_branch`](Self::delete_branch), this tolerates an
212    /// already-absent branch — both a missing contents ref (Lance's
213    /// `force_delete_branch` handles that) and a missing `tree/{branch}/`
214    /// directory (the local-store `NotFound` quirk pinned by
215    /// `lance_surface_guards::force_delete_branch_semantics`). Safe to call on a
216    /// possibly-orphaned or already-reclaimed fork.
217    ///
218    /// A branch that still has referencing descendants (`RefConflict`) is NOT
219    /// tolerated: that is a real ordering error and surfaces as `OmniError::Lance`.
220    /// Used by the eager best-effort reclaim in `cleanup_deleted_branch_tables`
221    /// and the `cleanup` orphan reconciler.
222    pub async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
223        let mut ds = Dataset::open(dataset_uri)
224            .await
225            .map_err(|e| OmniError::Lance(e.to_string()))?;
226        match ds.force_delete_branch(branch).await {
227            Ok(()) => Ok(()),
228            Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => Ok(()),
229            Err(e) => Err(OmniError::Lance(e.to_string())),
230        }
231    }
232
233    pub async fn open_dataset_at_state(
234        &self,
235        table_path: &str,
236        branch: Option<&str>,
237        version: u64,
238    ) -> Result<Dataset> {
239        let ds = self
240            .open_dataset_head(&self.dataset_uri(table_path), branch)
241            .await?;
242        ds.checkout_version(version)
243            .await
244            .map_err(|e| OmniError::Lance(e.to_string()))
245    }
246
247    pub fn ensure_expected_version(
248        &self,
249        ds: &Dataset,
250        table_key: &str,
251        expected_version: u64,
252    ) -> Result<()> {
253        let actual = ds.version().version;
254        if actual != expected_version {
255            // Use the structured ExpectedVersionMismatch variant so callers
256            // (and the HTTP server) can match on details rather than parsing
257            // the message. This drift is a publisher-style OCC failure: the
258            // caller's pre-write view of the table version is stale relative
259            // to the on-disk Lance head.
260            return Err(OmniError::manifest_expected_version_mismatch(
261                table_key,
262                expected_version,
263                actual,
264            ));
265        }
266        Ok(())
267    }
268
269    pub async fn reopen_for_mutation(
270        &self,
271        dataset_uri: &str,
272        branch: Option<&str>,
273        table_key: &str,
274        expected_version: u64,
275    ) -> Result<Dataset> {
276        let ds = self
277            .open_dataset_head_for_write(table_key, dataset_uri, branch)
278            .await?;
279        self.ensure_expected_version(&ds, table_key, expected_version)?;
280        Ok(ds)
281    }
282
283    pub async fn fork_branch_from_state(
284        &self,
285        dataset_uri: &str,
286        source_branch: Option<&str>,
287        table_key: &str,
288        source_version: u64,
289        target_branch: &str,
290    ) -> Result<ForkOutcome<Dataset>> {
291        let mut source_ds = self
292            .open_dataset_head(dataset_uri, source_branch)
293            .await?
294            .checkout_version(source_version)
295            .await
296            .map_err(|e| OmniError::Lance(e.to_string()))?;
297        self.ensure_expected_version(&source_ds, table_key, source_version)?;
298
299        if let Err(create_err) = source_ds
300            .create_branch(target_branch, source_version, None)
301            .await
302        {
303            // Disambiguate the failure: only a genuinely pre-existing ref is a
304            // reclaim candidate. Mapping EVERY create_branch failure to
305            // `RefAlreadyExists` would route a transient I/O / version / Lance
306            // internal error into the destructive reclaim path. So check whether
307            // the ref actually exists; if not, the failure is real — propagate
308            // it (preserving error fidelity) rather than force-deleting.
309            //
310            // `list_branches` reads `_refs/branches/` from the store, so it sees
311            // a fully-formed manifest-unreferenced fork (our common case — a
312            // create_branch that completed but whose manifest publish did not).
313            // It does NOT see a phase-1-only Lance "zombie" (tree dir written,
314            // no BranchContents) — but neither does `cleanup`'s reconciler, also
315            // list_branches-based. A zombie only forms if create_branch is
316            // interrupted *between its two internal phases* (a far narrower
317            // window than the manifest-publish gap), and it surfaces here as the
318            // propagated create error requiring manual reclaim. We deliberately
319            // do NOT force-delete on a not-found-ref failure: it is
320            // indistinguishable from a transient error on a fresh create, and
321            // force-deleting there is the destructive overreach this guard
322            // removes. The caller holds the per-(table, branch) write queue, so
323            // no in-process writer races this fork; a cross-process create
324            // between our check and now is the documented one-winner-CAS gap and
325            // propagates as a retryable error.
326            let ref_exists = source_ds
327                .list_branches()
328                .await
329                .map(|b| b.contains_key(target_branch))
330                .unwrap_or(false);
331            if ref_exists {
332                return Ok(ForkOutcome::RefAlreadyExists);
333            }
334            return Err(OmniError::Lance(create_err.to_string()));
335        }
336
337        let ds = self
338            .open_dataset_head(dataset_uri, Some(target_branch))
339            .await?;
340        self.ensure_expected_version(&ds, table_key, source_version)?;
341        Ok(ForkOutcome::Created(ds))
342    }
343
344    pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
345        self.scan(ds, None, None, None).await
346    }
347
348    pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
349        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
350        if !has_blob_columns {
351            return self.scan_batches(ds).await;
352        }
353
354        let batches = Self::scan_stream(ds, None, None, None, true)
355            .await?
356            .try_collect::<Vec<RecordBatch>>()
357            .await
358            .map_err(|e| OmniError::Lance(e.to_string()))?;
359        let mut materialized = Vec::with_capacity(batches.len());
360        for batch in batches {
361            materialized.push(Self::materialize_blob_batch(ds, batch).await?);
362        }
363        Ok(materialized)
364    }
365
366    /// Streaming, blob-aware sibling of [`Self::scan_batches_for_rewrite`].
367    /// Yields the dataset's rows lazily as a `SendableRecordBatchStream` so a
368    /// downstream writer (`stage_append_stream`) never materializes the whole
369    /// table in memory. Blob columns still need per-row rebuild, so those tables
370    /// fall back to the materialized path and are re-streamed from the `Vec`
371    /// (rare — only tables with a `Blob` property; bounded-memory blob streaming
372    /// is a follow-up). The non-blob path is a true lazy scan.
373    pub async fn scan_stream_for_rewrite(&self, ds: &Dataset) -> Result<SendableRecordBatchStream> {
374        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
375        if has_blob_columns {
376            let arrow_schema: SchemaRef = Arc::new(ds.schema().into());
377            let batches = self.scan_batches_for_rewrite(ds).await?;
378            let reader = arrow_array::RecordBatchIterator::new(
379                batches.into_iter().map(Ok),
380                arrow_schema,
381            );
382            return Ok(lance_datafusion::utils::reader_to_stream(Box::new(reader)));
383        }
384        // Non-blob: a true lazy scan. `DatasetRecordBatchStream` converts to the
385        // `SendableRecordBatchStream` that `execute_uncommitted_stream` consumes.
386        Ok(Self::scan_stream(ds, None, None, None, false).await?.into())
387    }
388
389    pub(crate) async fn materialize_blob_batch(
390        ds: &Dataset,
391        batch: RecordBatch,
392    ) -> Result<RecordBatch> {
393        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
394        if !has_blob_columns {
395            return Ok(batch);
396        }
397
398        let row_ids = batch
399            .column_by_name("_rowid")
400            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
401            .ok_or_else(|| {
402                OmniError::Lance("expected _rowid column when materializing blobs".to_string())
403            })?
404            .values()
405            .iter()
406            .copied()
407            .collect::<Vec<_>>();
408
409        let schema: SchemaRef = Arc::new(ds.schema().into());
410        let mut columns = Vec::with_capacity(schema.fields().len());
411        for field in schema.fields() {
412            let lance_field = lance::datatypes::Field::try_from(field.as_ref())
413                .map_err(|e| OmniError::Lance(e.to_string()))?;
414            let column = batch.column_by_name(field.name()).ok_or_else(|| {
415                OmniError::Lance(format!("batch missing column '{}'", field.name()))
416            })?;
417            if lance_field.is_blob() {
418                let descriptions =
419                    column
420                        .as_any()
421                        .downcast_ref::<StructArray>()
422                        .ok_or_else(|| {
423                            OmniError::Lance(format!(
424                                "expected blob descriptions for '{}'",
425                                field.name()
426                            ))
427                        })?;
428                columns.push(
429                    Self::rebuild_blob_column(ds, field.name(), descriptions, &row_ids).await?,
430                );
431            } else {
432                columns.push(column.clone());
433            }
434        }
435
436        RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
437    }
438
439    async fn rebuild_blob_column(
440        ds: &Dataset,
441        column_name: &str,
442        descriptions: &StructArray,
443        row_ids: &[u64],
444    ) -> Result<ArrayRef> {
445        let mut builder = BlobArrayBuilder::new(row_ids.len());
446        let mut non_null_row_ids = Vec::new();
447        let mut row_has_blob = Vec::with_capacity(row_ids.len());
448
449        for row in 0..row_ids.len() {
450            let is_null = Self::blob_description_is_null(descriptions, row)?;
451            row_has_blob.push(!is_null);
452            if !is_null {
453                non_null_row_ids.push(row_ids[row]);
454            }
455        }
456
457        let blob_files = if non_null_row_ids.is_empty() {
458            Vec::new()
459        } else {
460            Arc::new(ds.clone())
461                .take_blobs(&non_null_row_ids, column_name)
462                .await
463                .map_err(|e| OmniError::Lance(e.to_string()))?
464        };
465
466        let mut files = blob_files.into_iter();
467        for has_blob in row_has_blob {
468            if !has_blob {
469                builder
470                    .push_null()
471                    .map_err(|e| OmniError::Lance(e.to_string()))?;
472                continue;
473            }
474
475            let blob = files.next().ok_or_else(|| {
476                OmniError::Lance(format!(
477                    "blob rewrite for '{}' lost alignment with source rows",
478                    column_name
479                ))
480            })?;
481            builder
482                .push_bytes(
483                    blob.read()
484                        .await
485                        .map_err(|e| OmniError::Lance(e.to_string()))?,
486                )
487                .map_err(|e| OmniError::Lance(e.to_string()))?;
488        }
489
490        if files.next().is_some() {
491            return Err(OmniError::Lance(format!(
492                "blob rewrite for '{}' produced extra source blobs",
493                column_name
494            )));
495        }
496
497        builder
498            .finish()
499            .map_err(|e| OmniError::Lance(e.to_string()))
500    }
501
502    fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
503        if descriptions.is_null(row) {
504            return Ok(true);
505        }
506
507        let position = descriptions
508            .column_by_name("position")
509            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
510            .ok_or_else(|| {
511                OmniError::Lance(format!(
512                    "unrecognized blob description schema {:?}: missing UInt64 position field",
513                    descriptions.fields()
514                ))
515            })?;
516        let size = descriptions
517            .column_by_name("size")
518            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
519            .ok_or_else(|| {
520                OmniError::Lance(format!(
521                    "unrecognized blob description schema {:?}: missing UInt64 size field",
522                    descriptions.fields()
523                ))
524            })?;
525
526        let Some(kind_column) = descriptions.column_by_name("kind") else {
527            return Ok(position.is_null(row) || size.is_null(row));
528        };
529        let kind = if let Some(kind) = kind_column.as_any().downcast_ref::<UInt8Array>() {
530            if kind.is_null(row) {
531                return Ok(true);
532            }
533            kind.value(row)
534        } else if let Some(kind) = kind_column.as_any().downcast_ref::<UInt32Array>() {
535            if kind.is_null(row) {
536                return Ok(true);
537            }
538            kind.value(row) as u8
539        } else {
540            return Err(OmniError::Lance(format!(
541                "unrecognized blob description schema {:?}: kind field must be UInt8 or UInt32",
542                descriptions.fields()
543            )));
544        };
545
546        let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
547        if kind != BlobKind::Inline {
548            return Ok(false);
549        }
550        let blob_uri = descriptions
551            .column_by_name("blob_uri")
552            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
553            .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
554
555        Ok((position.is_null(row) || position.value(row) == 0)
556            && (size.is_null(row) || size.value(row) == 0)
557            && blob_uri.unwrap_or("").is_empty())
558    }
559
560    pub async fn scan_stream(
561        ds: &Dataset,
562        projection: Option<&[&str]>,
563        filter: Option<&str>,
564        order_by: Option<Vec<ColumnOrdering>>,
565        with_row_id: bool,
566    ) -> Result<DatasetRecordBatchStream> {
567        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
568    }
569
570    pub async fn scan_stream_with<F>(
571        ds: &Dataset,
572        projection: Option<&[&str]>,
573        filter: Option<&str>,
574        order_by: Option<Vec<ColumnOrdering>>,
575        with_row_id: bool,
576        configure: F,
577    ) -> Result<DatasetRecordBatchStream>
578    where
579        F: FnOnce(&mut Scanner) -> Result<()>,
580    {
581        let mut scanner = ds.scan();
582        if with_row_id {
583            scanner.with_row_id();
584        }
585        if let Some(columns) = projection {
586            scanner
587                .project(columns)
588                .map_err(|e| OmniError::Lance(e.to_string()))?;
589        }
590        if let Some(filter_sql) = filter {
591            scanner
592                .filter(filter_sql)
593                .map_err(|e| OmniError::Lance(e.to_string()))?;
594        }
595        if let Some(ordering) = order_by {
596            scanner
597                .order_by(Some(ordering))
598                .map_err(|e| OmniError::Lance(e.to_string()))?;
599        }
600        configure(&mut scanner)?;
601        scanner
602            .try_into_stream()
603            .await
604            .map_err(|e| OmniError::Lance(e.to_string()))
605    }
606
607    pub async fn scan(
608        &self,
609        ds: &Dataset,
610        projection: Option<&[&str]>,
611        filter: Option<&str>,
612        order_by: Option<Vec<ColumnOrdering>>,
613    ) -> Result<Vec<RecordBatch>> {
614        Self::scan_stream(ds, projection, filter, order_by, false)
615            .await?
616            .try_collect()
617            .await
618            .map_err(|e| OmniError::Lance(e.to_string()))
619    }
620
621    pub async fn scan_with<F>(
622        &self,
623        ds: &Dataset,
624        projection: Option<&[&str]>,
625        filter: Option<&str>,
626        order_by: Option<Vec<ColumnOrdering>>,
627        with_row_id: bool,
628        configure: F,
629    ) -> Result<Vec<RecordBatch>>
630    where
631        F: FnOnce(&mut Scanner) -> Result<()>,
632    {
633        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
634            .await?
635            .try_collect()
636            .await
637            .map_err(|e| OmniError::Lance(e.to_string()))
638    }
639
640    /// Indexed neighbor lookup for graph traversal. Given an edge dataset and a
641    /// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for
642    /// in-traversal), return the matching edge rows projected to
643    /// `[key_col, opposite_col]`.
644    ///
645    /// The `key_col IN (keys)` predicate is built as a structured DataFusion
646    /// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through
647    /// the persisted BTREE on `key_col` (index-search → take). Cost scales with
648    /// the frontier size, not |E| — the basis for serving selective traversals
649    /// without building the whole in-memory CSR. Empty `keys` returns empty
650    /// without scanning.
651    ///
652    /// Note: like any indexed scan, this observes only fragments the BTREE
653    /// covers plus an unindexed-fragment scan fallback; it reads the committed
654    /// snapshot `ds` was opened at.
655    pub async fn scan_edges_by_endpoint(
656        ds: &Dataset,
657        key_col: &str,
658        opposite_col: &str,
659        keys: &[String],
660    ) -> Result<Vec<RecordBatch>> {
661        use datafusion::prelude::{col, lit};
662
663        if keys.is_empty() {
664            return Ok(Vec::new());
665        }
666        let key_list: Vec<datafusion::prelude::Expr> =
667            keys.iter().map(|k| lit(k.clone())).collect();
668        let filter_expr = col(key_col).in_list(key_list, false);
669        Self::scan_stream_with(
670            ds,
671            Some(&[key_col, opposite_col]),
672            None,
673            None,
674            false,
675            |scanner| {
676                scanner.filter_expr(filter_expr);
677                Ok(())
678            },
679        )
680        .await?
681        .try_collect()
682        .await
683        .map_err(|e| OmniError::Lance(e.to_string()))
684    }
685
686    /// Metadata-only check (no IO) of whether `scan_edges_by_endpoint` — a
687    /// `key_col IN (...)` filter — on `ds` will be served by the persisted BTREE
688    /// on `column`, or silently fall back to a full filtered scan. Mirrors
689    /// Lance's own decision: scalar indices are disabled for the whole scan if
690    /// ANY fragment lacks `physical_rows` (lance `dataset/scanner.rs`
691    /// `create_filter_plan`), and are obviously unused if no BTREE on the
692    /// column exists. The scan is correct (returns all rows) either way — this
693    /// only surfaces the perf cliff so the indexed traversal can warn on it.
694    pub async fn key_column_index_coverage(ds: &Dataset, column: &str) -> Result<IndexCoverage> {
695        let Some(field_id) = ds.schema().field(column).map(|field| field.id) else {
696            return Ok(IndexCoverage::Degraded {
697                reason: format!("column '{}' not in schema", column),
698            });
699        };
700        let indices = ds
701            .load_indices()
702            .await
703            .map_err(|e| OmniError::Lance(e.to_string()))?;
704        let btree = indices
705            .iter()
706            .filter(|index| !is_system_index(index))
707            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
708            .find(|index| {
709                index
710                    .index_details
711                    .as_ref()
712                    .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
713                    .unwrap_or(false)
714            });
715        let Some(btree) = btree else {
716            return Ok(IndexCoverage::Degraded {
717                reason: format!("no BTREE index on '{}'", column),
718            });
719        };
720        // Same check Lance runs: a fragment missing physical_rows disables
721        // scalar indices for the entire scan (all-or-nothing).
722        if ds.fragments().iter().any(|f| f.physical_rows.is_none()) {
723            return Ok(IndexCoverage::Degraded {
724                reason: "a fragment is missing physical_rows".to_string(),
725            });
726        }
727        // An index only covers the fragments it was built over; fragments
728        // appended afterward (edge-index creation is skipped once a BTREE exists)
729        // are scanned unindexed. If any CURRENT fragment is absent from the
730        // index's `fragment_bitmap`, the scan is partly a full scan — so the
731        // chooser must not price it as fully indexed. A `None` bitmap means Lance
732        // can't report coverage; don't over-degrade in that case.
733        if let Some(bitmap) = btree.fragment_bitmap.as_ref() {
734            let uncovered = ds
735                .fragments()
736                .iter()
737                .filter(|f| !bitmap.contains(f.id as u32))
738                .count();
739            if uncovered > 0 {
740                return Ok(IndexCoverage::Degraded {
741                    reason: format!(
742                        "{} fragment(s) not covered by the index on '{}'",
743                        uncovered, column
744                    ),
745                });
746            }
747        }
748        Ok(IndexCoverage::Indexed)
749    }
750
751    /// True if any non-system index on `ds` leaves at least one current
752    /// fragment uncovered, i.e. rows that the index does not yet account for
753    /// (appended after the index was built, or rewritten by compaction). Such
754    /// fragments are scanned unindexed until a reindex (`optimize_indices`)
755    /// folds them in. Returns false when every index covers every fragment, or
756    /// when the table has no (non-system) indices to optimize. A `None`
757    /// `fragment_bitmap` means Lance cannot report coverage for that index, so
758    /// we do not treat it as uncovered (mirrors `key_column_index_coverage`).
759    ///
760    /// Used by `optimize` to decide whether an otherwise-already-compacted
761    /// table still has index work to do.
762    pub async fn has_unindexed_fragments(ds: &Dataset) -> Result<bool> {
763        let indices = ds
764            .load_indices()
765            .await
766            .map_err(|e| OmniError::Lance(e.to_string()))?;
767        let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
768        for index in indices.iter() {
769            if is_system_index(index) {
770                continue;
771            }
772            if let Some(bitmap) = index.fragment_bitmap.as_ref() {
773                if frag_ids.iter().any(|id| !bitmap.contains(*id)) {
774                    return Ok(true);
775                }
776            }
777        }
778        Ok(false)
779    }
780
781    pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
782        ds.count_rows(filter)
783            .await
784            .map(|count| count as usize)
785            .map_err(|e| OmniError::Lance(e.to_string()))
786    }
787
788    pub fn dataset_version(&self, ds: &Dataset) -> u64 {
789        ds.version().version
790    }
791
792    pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
793        Ok(TableState {
794            version: self.dataset_version(ds),
795            row_count: self.count_rows(ds, None).await? as u64,
796            version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
797        })
798    }
799
800    /// Legacy inline-commit append: writes fragments AND commits in one
801    /// call, advancing Lance HEAD as a side effect. Not on the
802    /// `TableStorage` trait surface — the staged primitive `stage_append`
803    /// + `commit_staged` is the engine write path. This inherent
804    /// `pub(crate)` method survives only for recovery test setup. Do not
805    /// add new engine call sites — they re-introduce the multi-phase
806    /// commit drift the trait surface was designed to eliminate.
807    pub(crate) async fn append_batch(
808        &self,
809        dataset_uri: &str,
810        ds: &mut Dataset,
811        batch: RecordBatch,
812    ) -> Result<TableState> {
813        if batch.num_rows() == 0 {
814            return self.table_state(dataset_uri, ds).await;
815        }
816        let schema = batch.schema();
817        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
818        let params = WriteParams {
819            mode: WriteMode::Append,
820            allow_external_blob_outside_bases: true,
821            auto_cleanup: None,
822            skip_auto_cleanup: true,
823            ..Default::default()
824        };
825        ds.append(reader, Some(params))
826            .await
827            .map_err(|e| OmniError::Lance(e.to_string()))?;
828        self.table_state(dataset_uri, ds).await
829    }
830
831    pub async fn append_or_create_batch(
832        dataset_uri: &str,
833        dataset: Option<Dataset>,
834        batch: RecordBatch,
835    ) -> Result<Dataset> {
836        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
837        match dataset {
838            Some(mut ds) => {
839                let params = WriteParams {
840                    mode: WriteMode::Append,
841                    allow_external_blob_outside_bases: true,
842                    auto_cleanup: None,
843                    skip_auto_cleanup: true,
844                    ..Default::default()
845                };
846                ds.append(reader, Some(params))
847                    .await
848                    .map_err(|e| OmniError::Lance(e.to_string()))?;
849                Ok(ds)
850            }
851            None => {
852                let params = WriteParams {
853                    mode: WriteMode::Create,
854                    enable_stable_row_ids: true,
855                    data_storage_version: Some(LanceFileVersion::V2_2),
856                    allow_external_blob_outside_bases: true,
857                    auto_cleanup: None,
858                    skip_auto_cleanup: true,
859                    ..Default::default()
860                };
861                Dataset::write(reader, dataset_uri, Some(params))
862                    .await
863                    .map_err(|e| OmniError::Lance(e.to_string()))
864            }
865        }
866    }
867
868    pub(crate) async fn delete_where(
869        &self,
870        dataset_uri: &str,
871        ds: &mut Dataset,
872        filter: &str,
873    ) -> Result<DeleteState> {
874        let delete_result = ds
875            .delete(filter)
876            .await
877            .map_err(|e| OmniError::Lance(e.to_string()))?;
878        Ok(DeleteState {
879            version: delete_result.new_dataset.version().version,
880            row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
881            deleted_rows: delete_result.num_deleted_rows as usize,
882            version_metadata: self
883                .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
884        })
885    }
886
887    // ─── Staged-write API ────────────────────────────────────────────────────
888    //
889    // These primitives wrap Lance's distributed-write API: each call writes
890    // fragment files to object storage but does NOT advance the dataset's
891    // HEAD or commit a manifest entry. The returned `Transaction` is held by
892    // the caller (typically `MutationStaging` or the loader's accumulator)
893    // and committed at end-of-query via `commit_staged`. On failure the
894    // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
895    //
896    // The extracted `Vec<Fragment>` is for read-your-writes within the same
897    // query: subsequent ops construct a `Scanner` and call
898    // `scanner.with_fragments(staged.clone())` to see staged data alongside
899    // the committed snapshot. Lance's filter pushdown, vector search, and
900    // FTS all respect the supplied fragment list.
901
902    /// Stage an append: write fragment files for `batch`, return the
903    /// uncommitted Lance transaction plus the new fragments for
904    /// read-your-writes.
905    ///
906    /// `prior_stages` is the slice of staged writes already accumulated
907    /// against the **same dataset** in the same query. Pass `&[]` for the
908    /// first call; pass the accumulated stages for subsequent calls. The
909    /// primitive uses this to offset row-ID assignment so chained
910    /// `stage_append` calls don't produce overlapping `_rowid` ranges.
911    /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
912    /// slice gets passed to both.
913    ///
914    /// On stable-row-id datasets we manually populate `row_id_meta` on
915    /// the cloned `new_fragments` we expose for `scan_with_staged`.
916    /// Lance's `InsertBuilder::execute_uncommitted` produces fragments
917    /// with `row_id_meta = None`; row IDs are normally assigned by
918    /// `Transaction::assign_row_ids` during commit. Because
919    /// `scan_with_staged` reads the staged fragments *before* commit,
920    /// the scanner trips on a stable-row-id dataset
921    /// (`Error::internal("Missing row id meta")` from
922    /// `dataset/rowids.rs:22`). The transaction's internal fragment copy
923    /// stays untouched — Lance assigns IDs there independently at commit
924    /// time, and the two ID assignments don't have to agree because no
925    /// caller threads `_rowid` from the staged scan into the commit
926    /// path.
927    ///
928    /// **Contract: `prior_stages` must contain only previous
929    /// `stage_append` results against the same dataset.** Mixing
930    /// stage_merge_insert into `prior_stages` would over-count because
931    /// merge_insert's `new_fragments` include rewrites that don't add
932    /// rows. The engine's parse-time D₂′ check (per touched table: all
933    /// stage_append OR exactly one stage_merge_insert) guarantees this
934    /// upstream; on the primitive layer it's the caller's responsibility.
935    pub async fn stage_append(
936        &self,
937        ds: &Dataset,
938        batch: RecordBatch,
939        prior_stages: &[StagedWrite],
940    ) -> Result<StagedWrite> {
941        if batch.num_rows() == 0 {
942            return Err(OmniError::manifest_internal(
943                "stage_append called with empty batch".to_string(),
944            ));
945        }
946        let appended_rows = batch.num_rows() as u64;
947        let params = WriteParams {
948            mode: WriteMode::Append,
949            allow_external_blob_outside_bases: true,
950            auto_cleanup: None,
951            skip_auto_cleanup: true,
952            ..Default::default()
953        };
954        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
955            .with_params(&params)
956            .execute_uncommitted(vec![batch])
957            .await
958            .map_err(|e| OmniError::Lance(e.to_string()))?;
959        // Record only after the staging write succeeds, so a failed write does
960        // not inflate the probe (matches `stage_append_stream`'s ordering).
961        crate::instrumentation::record_stage_append(appended_rows);
962        let mut new_fragments = match &transaction.operation {
963            Operation::Append { fragments } => fragments.clone(),
964            Operation::Overwrite { fragments, .. } => fragments.clone(),
965            other => {
966                return Err(OmniError::manifest_internal(format!(
967                    "stage_append: unexpected Lance operation {:?}",
968                    std::mem::discriminant(other)
969                )));
970            }
971        };
972        // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
973        // returns fragments with `id = 0` ("Temporary ID" — see lance-6.0.1
974        // `dataset/write.rs:1044/1712`); the real assignment happens during
975        // commit via `Transaction::fragments_with_ids`. Because we expose
976        // these fragments to `scan_with_staged` *before* commit, two staged
977        // fragments (or one staged + the seed) would collide on `id = 0`,
978        // causing Lance's scanner to mishandle the combined list (silent
979        // duplicates / dropped rows). Mirror the commit-time renumbering
980        // here, using `ds.manifest.max_fragment_id() + 1` as the base and
981        // accounting for prior stages.
982        // ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
983        // Lance's Fragment::id (and the commit-time renumbering counter in
984        // Transaction::fragments_with_ids) operate on u64.
985        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
986            + 1
987            + prior_stages_fragment_count(prior_stages);
988        assign_fragment_ids(&mut new_fragments, next_id_base);
989        if ds.manifest.uses_stable_row_ids() {
990            let prior_rows = prior_stages_row_count(prior_stages)?;
991            let start_row_id = ds.manifest.next_row_id + prior_rows;
992            assign_row_id_meta(&mut new_fragments, start_row_id)?;
993        }
994        Ok(StagedWrite {
995            transaction,
996            new_fragments,
997            // Append never supersedes existing fragments.
998            removed_fragment_ids: Vec::new(),
999        })
1000    }
1001
1002    /// Streaming variant of [`Self::stage_append`]: appends the rows of `source`
1003    /// into `ds` without materializing them in memory. It scans `source` lazily
1004    /// (`scan_stream_for_rewrite`) and hands the stream to Lance's
1005    /// `execute_uncommitted_stream`, which rolls fragments at `max_rows_per_file`
1006    /// — bounded memory, one Append transaction. This is the substrate-blessed
1007    /// bulk-append path (the same one LanceDB's `Table::add` uses). Identical
1008    /// fragment-id / stable-row-id staging as `stage_append`.
1009    ///
1010    /// TRANSITIONAL caller — its only caller is the row-level merge append
1011    /// (`publish_adopted_delta`, see `AdoptDelta`), which the fragment-adopt work
1012    /// (Lance #7263/#7185) removes: a fragment graft re-appends no rows. This
1013    /// primitive and `scan_stream_for_rewrite` are then dead unless re-adopted as
1014    /// a general bulk-append path (the `Table::add` shape makes that plausible).
1015    pub async fn stage_append_stream(
1016        &self,
1017        ds: &Dataset,
1018        source: &Dataset,
1019        prior_stages: &[StagedWrite],
1020    ) -> Result<StagedWrite> {
1021        let stream = self.scan_stream_for_rewrite(source).await?;
1022        let params = WriteParams {
1023            mode: WriteMode::Append,
1024            allow_external_blob_outside_bases: true,
1025            auto_cleanup: None,
1026            skip_auto_cleanup: true,
1027            ..Default::default()
1028        };
1029        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1030            .with_params(&params)
1031            .execute_uncommitted_stream(stream)
1032            .await
1033            .map_err(|e| OmniError::Lance(e.to_string()))?;
1034        let mut new_fragments = match &transaction.operation {
1035            Operation::Append { fragments } => fragments.clone(),
1036            Operation::Overwrite { fragments, .. } => fragments.clone(),
1037            other => {
1038                return Err(OmniError::manifest_internal(format!(
1039                    "stage_append_stream: unexpected Lance operation {:?}",
1040                    std::mem::discriminant(other)
1041                )));
1042            }
1043        };
1044        let appended_rows: u64 = new_fragments
1045            .iter()
1046            .filter_map(|f| f.physical_rows)
1047            .map(|r| r as u64)
1048            .sum();
1049        crate::instrumentation::record_stage_append(appended_rows);
1050        // Same commit-time fragment-id / row-id renumbering as `stage_append`.
1051        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
1052            + 1
1053            + prior_stages_fragment_count(prior_stages);
1054        assign_fragment_ids(&mut new_fragments, next_id_base);
1055        if ds.manifest.uses_stable_row_ids() {
1056            let prior_rows = prior_stages_row_count(prior_stages)?;
1057            let start_row_id = ds.manifest.next_row_id + prior_rows;
1058            assign_row_id_meta(&mut new_fragments, start_row_id)?;
1059        }
1060        Ok(StagedWrite {
1061            transaction,
1062            new_fragments,
1063            removed_fragment_ids: Vec::new(),
1064        })
1065    }
1066
1067    /// Stage a merge_insert (upsert): write fragment files describing the
1068    /// merge result, return the uncommitted transaction plus the new
1069    /// fragments. The transaction's `Operation::Update` carries the
1070    /// fragments-to-remove and fragments-to-add; for read-your-writes we
1071    /// expose `new_fragments` (rows that will be visible after commit).
1072    ///
1073    /// **Contract: do not chain `stage_merge_insert` calls on the same
1074    /// table within one query.** Each call's `MergeInsertBuilder` runs
1075    /// against the supplied dataset's committed view — it does not see
1076    /// fragments produced by a previous staged merge on the same table.
1077    /// Two chained `stage_merge_insert`s whose source rows share keys will
1078    /// each independently produce `Operation::Update` transactions whose
1079    /// `new_fragments` contain a row for the shared key. `scan_with_staged`
1080    /// (and `count_rows_with_staged`) will then return both — i.e.
1081    /// **duplicates by key**.
1082    ///
1083    /// This is intrinsic to the underlying Lance API: there is no public
1084    /// way to make `MergeInsertBuilder` see uncommitted fragments. The
1085    /// engine's `MutationStaging` accumulator works around this by
1086    /// concatenating per-table batches in memory and issuing exactly
1087    /// one `stage_merge_insert` per touched table at end-of-query (with
1088    /// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
1089    /// callers of this primitive must respect the contract themselves.
1090    ///
1091    /// Lift path: either a Lance API extension that lets
1092    /// `MergeInsertBuilder` accept additional staged fragments, or an
1093    /// in-memory pre-merge here that folds prior staged batches into the
1094    /// input stream. See `docs/dev/writes.md`.
1095    pub async fn stage_merge_insert(
1096        &self,
1097        ds: Dataset,
1098        batch: RecordBatch,
1099        key_columns: Vec<String>,
1100        when_matched: WhenMatched,
1101        when_not_matched: WhenNotMatched,
1102    ) -> Result<StagedWrite> {
1103        if batch.num_rows() == 0 {
1104            return Err(OmniError::manifest_internal(
1105                "stage_merge_insert called with empty batch".to_string(),
1106            ));
1107        }
1108        let merged_rows = batch.num_rows() as u64;
1109
1110        // Precondition for the FirstSeen workaround below: every call path that
1111        // reaches stage_merge_insert (load, MutationStaging::finalize,
1112        // branch_merge::publish_rewritten_merge_table) must hand in a source
1113        // batch that is unique by `key_columns`. Without this check,
1114        // `SourceDedupeBehavior::FirstSeen` would silently collapse genuine
1115        // duplicates instead of erroring.
1116        check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
1117
1118        let ds = Arc::new(ds);
1119        let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
1120            .map_err(|e| OmniError::Lance(e.to_string()))?;
1121        builder.when_matched(when_matched);
1122        builder.when_not_matched(when_not_matched);
1123        // Workaround for a Lance bug class where sequential merge_insert calls
1124        // against rows previously rewritten by merge_insert produce a spurious
1125        // "Ambiguous merge inserts: multiple source rows match the same target
1126        // row on (id = ...)" error. Lance's `processed_row_ids:
1127        // Mutex<HashSet<u64>>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`)
1128        // double-processes the same source/target match against datasets
1129        // previously rewritten by merge_insert, and the default
1130        // `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen
1131        // makes Lance skip the duplicate match instead. Correctness-preserving
1132        // because every call path pre-dedupes the source batch by id or surfaces
1133        // a real source dup via `check_batch_unique_by_keys` above (load:
1134        // `enforce_unique_constraints_intra_batch`; mutate:
1135        // `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor`
1136        // walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug
1137        // class. Tracked at MR-957; upstream: lance-format/lance#6877.
1138        builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1139        let job = builder
1140            .try_build()
1141            .map_err(|e| OmniError::Lance(e.to_string()))?;
1142        let schema = batch.schema();
1143        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
1144        let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
1145        let uncommitted = job
1146            .execute_uncommitted(stream)
1147            .await
1148            .map_err(|e| OmniError::Lance(e.to_string()))?;
1149        // Record only after the staging write succeeds, so a failed write does
1150        // not inflate the probe (matches `stage_append`/`stage_append_stream`).
1151        crate::instrumentation::record_stage_merge_insert(merged_rows);
1152        // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
1153        // `new_fragments` are the freshly inserted rows; `updated_fragments`
1154        // are rewrites of existing fragments that include both retained and
1155        // updated rows; `removed_fragment_ids` lists the committed-manifest
1156        // fragments that those rewrites supersede. For read-your-writes we
1157        // expose `updated_fragments + new_fragments` and the
1158        // `removed_fragment_ids` so `scan_with_staged` can filter the
1159        // superseded committed fragments before combining — otherwise a
1160        // single merge_insert appears as duplicate rows (original committed
1161        // version + rewritten staged version).
1162        let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
1163            Operation::Update {
1164                new_fragments,
1165                updated_fragments,
1166                removed_fragment_ids,
1167                ..
1168            } => {
1169                let mut all = updated_fragments.clone();
1170                all.extend(new_fragments.iter().cloned());
1171                (all, removed_fragment_ids.clone())
1172            }
1173            Operation::Append { fragments } => (fragments.clone(), Vec::new()),
1174            other => {
1175                return Err(OmniError::manifest_internal(format!(
1176                    "stage_merge_insert: unexpected Lance operation {:?}",
1177                    std::mem::discriminant(other)
1178                )));
1179            }
1180        };
1181        Ok(StagedWrite {
1182            transaction: uncommitted.transaction,
1183            new_fragments,
1184            removed_fragment_ids,
1185        })
1186    }
1187
1188    /// Commit a previously-staged transaction onto `ds`, returning the new
1189    /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
1190    /// the publisher at end-of-query to materialize all staged writes before
1191    /// the meta-manifest commit.
1192    pub async fn commit_staged(
1193        &self,
1194        ds: Arc<Dataset>,
1195        transaction: Transaction,
1196    ) -> Result<Dataset> {
1197        // Skip Lance's auto-cleanup hook on every commit. OmniGraph owns version
1198        // GC explicitly (optimize.rs::cleanup_all_tables); Lance's hook fires off
1199        // the *dataset's stored* `lance.auto_cleanup.*` config, which graphs
1200        // created before the v7 bump (6.0.1 defaulted auto_cleanup ON) still
1201        // carry — so `WriteParams::auto_cleanup = None` alone does NOT stop it on
1202        // upgraded graphs. Skipping here covers the staged write path (the main
1203        // data path) for new and legacy datasets alike, preventing Lance from
1204        // GC'ing versions the __manifest still pins for snapshots/time-travel.
1205        CommitBuilder::new(ds)
1206            .with_skip_auto_cleanup(true)
1207            .execute(transaction)
1208            .await
1209            .map_err(|e| OmniError::Lance(e.to_string()))
1210    }
1211
1212    /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
1213    /// Returns a StagedWrite carrying the replacement fragments. HEAD does
1214    /// NOT advance.
1215    ///
1216    /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
1217    /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
1218    /// `Operation::Overwrite` carries the new schema + fragments. The
1219    /// transaction is committed via `commit_staged` (same call as
1220    /// `stage_append`).
1221    ///
1222    /// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
1223    /// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
1224    pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
1225        // `enable_stable_row_ids: true` is defensive — empirically Lance 6.0.1
1226        // preserves the source dataset's flag through `Operation::Overwrite`
1227        // when WriteParams omits it (pinned by
1228        // `stage_overwrite_preserves_stable_row_ids` in tests/staged_writes.rs),
1229        // but setting it explicitly keeps the invariant documented at every Overwrite site
1230        // (see docs/storage.md "Stable row IDs"). Setting it on an existing
1231        // dataset that was created without stable row IDs is a no-op per
1232        // Lance's row-id-lineage spec, so this stays correct for legacy
1233        // datasets.
1234        let (transaction, mut new_fragments) = if batch.num_rows() == 0 {
1235            let schema = LanceSchema::try_from(batch.schema().as_ref())
1236                .map_err(|e| OmniError::Lance(e.to_string()))?;
1237            let transaction = TransactionBuilder::new(
1238                ds.manifest.version,
1239                Operation::Overwrite {
1240                    fragments: Vec::new(),
1241                    schema,
1242                    config_upsert_values: None,
1243                    initial_bases: None,
1244                },
1245            )
1246            .build();
1247            (transaction, Vec::new())
1248        } else {
1249            let params = WriteParams {
1250                mode: WriteMode::Overwrite,
1251                enable_stable_row_ids: true,
1252                allow_external_blob_outside_bases: true,
1253                auto_cleanup: None,
1254                skip_auto_cleanup: true,
1255                ..Default::default()
1256            };
1257            let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1258                .with_params(&params)
1259                .execute_uncommitted(vec![batch])
1260                .await
1261                .map_err(|e| OmniError::Lance(e.to_string()))?;
1262            let new_fragments = match &transaction.operation {
1263                Operation::Overwrite { fragments, .. } => fragments.clone(),
1264                other => {
1265                    return Err(OmniError::manifest_internal(format!(
1266                        "stage_overwrite: unexpected Lance operation {:?}",
1267                        std::mem::discriminant(other)
1268                    )));
1269                }
1270            };
1271            (transaction, new_fragments)
1272        };
1273        // Overwrite REPLACES every committed fragment, and Lance restarts
1274        // fragment-ID and row-ID counters at the post-commit version.
1275        // For our pre-commit staged view we need to:
1276        //   1) Renumber temporary fragment IDs (Lance returns them as
1277        //      `id = 0` from `execute_uncommitted` — see stage_append
1278        //      for the same fix). For Overwrite there are no committed
1279        //      fragments to collide with (they're all in
1280        //      removed_fragment_ids below), so start at 1.
1281        //   2) For stable-row-id datasets, assign row_id_meta starting
1282        //      at 0 (Overwrite is a fresh-start) so `scan_with_staged`
1283        //      doesn't hit the "Missing row id meta" panic in
1284        //      lance-6.0.1 dataset/rowids.rs:22.
1285        assign_fragment_ids(&mut new_fragments, 1);
1286        if ds.manifest.uses_stable_row_ids() {
1287            assign_row_id_meta(&mut new_fragments, 0)?;
1288        }
1289        // Overwrite REPLACES every committed fragment. For
1290        // read-your-writes via scan_with_staged, list every committed
1291        // fragment in removed_fragment_ids so the post-stage view shows
1292        // ONLY the staged fragments.
1293        let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
1294        Ok(StagedWrite {
1295            transaction,
1296            new_fragments,
1297            removed_fragment_ids,
1298        })
1299    }
1300
1301    /// Stage a BTREE scalar index build. Returns a StagedWrite whose
1302    /// transaction commits via `commit_staged`. HEAD does NOT advance.
1303    ///
1304    /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
1305    /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
1306    /// { new_indices, removed_indices }` via the public `TransactionBuilder`,
1307    /// replicating the simple (non-segment-commit-path) branch of Lance's
1308    /// `CreateIndexBuilder::execute` (lance-6.0.1 `src/index/create.rs:502-512`).
1309    ///
1310    /// `removed_indices` mirrors `execute()` lines 466-476: when the
1311    /// build replaces an existing same-named index, those entries are
1312    /// listed for tombstoning by the manifest commit.
1313    ///
1314    /// MR-793 Phase 2: scalar index types (BTree, Inverted) are
1315    /// stage-able. Vector indices are NOT (segment-commit-path requires
1316    /// `build_index_metadata_from_segments` which is `pub(crate)` in
1317    /// lance-6.0.1); see `create_vector_index` and Appendix A.3.
1318    pub async fn stage_create_btree_index(
1319        &self,
1320        ds: &Dataset,
1321        columns: &[&str],
1322    ) -> Result<StagedWrite> {
1323        let params = ScalarIndexParams::default();
1324        let mut ds_clone = ds.clone();
1325        let new_idx = ds_clone
1326            .create_index_builder(columns, IndexType::BTree, &params)
1327            .replace(true)
1328            .execute_uncommitted()
1329            .await
1330            .map_err(|e| OmniError::Lance(format!("stage_create_btree_index: {}", e)))?;
1331        let removed_indices: Vec<IndexMetadata> = ds
1332            .load_indices()
1333            .await
1334            .map_err(|e| OmniError::Lance(e.to_string()))?
1335            .iter()
1336            .filter(|idx| idx.name == new_idx.name)
1337            .cloned()
1338            .collect();
1339        let transaction = TransactionBuilder::new(
1340            new_idx.dataset_version,
1341            Operation::CreateIndex {
1342                new_indices: vec![new_idx],
1343                removed_indices,
1344            },
1345        )
1346        .build();
1347        Ok(StagedWrite {
1348            transaction,
1349            new_fragments: Vec::new(),
1350            removed_fragment_ids: Vec::new(),
1351        })
1352    }
1353
1354    /// Stage an INVERTED (FTS) scalar index build. Same shape as
1355    /// `stage_create_btree_index`; see its docs for the Lance API
1356    /// citation and contract notes.
1357    pub async fn stage_create_inverted_index(
1358        &self,
1359        ds: &Dataset,
1360        column: &str,
1361    ) -> Result<StagedWrite> {
1362        let params = InvertedIndexParams::default();
1363        let mut ds_clone = ds.clone();
1364        let new_idx = ds_clone
1365            .create_index_builder(&[column], IndexType::Inverted, &params)
1366            .replace(true)
1367            .execute_uncommitted()
1368            .await
1369            .map_err(|e| OmniError::Lance(format!("stage_create_inverted_index: {}", e)))?;
1370        let removed_indices: Vec<IndexMetadata> = ds
1371            .load_indices()
1372            .await
1373            .map_err(|e| OmniError::Lance(e.to_string()))?
1374            .iter()
1375            .filter(|idx| idx.name == new_idx.name)
1376            .cloned()
1377            .collect();
1378        let transaction = TransactionBuilder::new(
1379            new_idx.dataset_version,
1380            Operation::CreateIndex {
1381                new_indices: vec![new_idx],
1382                removed_indices,
1383            },
1384        )
1385        .build();
1386        Ok(StagedWrite {
1387            transaction,
1388            new_fragments: Vec::new(),
1389            removed_fragment_ids: Vec::new(),
1390        })
1391    }
1392
1393    /// Run a scan with optional uncommitted staged writes visible
1394    /// alongside the committed snapshot. When `staged` is empty this is
1395    /// identical to `scan(...)`.
1396    ///
1397    /// Composes the visible fragment list as `committed - removed + new`:
1398    /// the committed manifest's fragments, minus any fragment IDs that
1399    /// staged `Operation::Update`s (merge_insert rewrites) have superseded,
1400    /// plus the staged new/updated fragments. Without the `removed`
1401    /// filter, a merge_insert that rewrites an existing fragment would
1402    /// surface twice — once via the original committed fragment, once via
1403    /// the rewrite in `new_fragments`.
1404    ///
1405    /// **Filter contract is incomplete on staged fragments.** When `filter`
1406    /// is `Some(...)`, Lance pushes the predicate to per-fragment scans
1407    /// with stats-based pruning. Uncommitted fragments produced by
1408    /// `write_fragments_internal` lack the per-column statistics that
1409    /// committed fragments carry; Lance's optimizer drops them from the
1410    /// filtered scan even when their data would match. Staged-fragment
1411    /// rows are silently absent from the result. `scanner.use_stats(false)`
1412    /// does not fix this in lance 6.0.1. Callers needing correct filtered
1413    /// reads against staged data should use a different strategy — the
1414    /// engine's `MutationStaging` accumulator unions in-memory pending
1415    /// batches with the committed scan via DataFusion `MemTable` (see
1416    /// `scan_with_pending`).
1417    ///
1418    /// This method remains on the surface for primitive-level testing
1419    /// (basic stage + scan correctness without filters works) and for
1420    /// callers that don't need filter pushdown.
1421    pub async fn scan_with_staged(
1422        &self,
1423        ds: &Dataset,
1424        staged: &[StagedWrite],
1425        projection: Option<&[&str]>,
1426        filter: Option<&str>,
1427    ) -> Result<Vec<RecordBatch>> {
1428        if staged.is_empty() {
1429            return self.scan(ds, projection, filter, None).await;
1430        }
1431        let mut scanner = ds.scan();
1432        if let Some(cols) = projection {
1433            let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
1434            scanner
1435                .project(&owned)
1436                .map_err(|e| OmniError::Lance(e.to_string()))?;
1437        }
1438        if let Some(f) = filter {
1439            scanner
1440                .filter(f)
1441                .map_err(|e| OmniError::Lance(e.to_string()))?;
1442        }
1443        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1444        let stream = scanner
1445            .try_into_stream()
1446            .await
1447            .map_err(|e| OmniError::Lance(e.to_string()))?;
1448        stream
1449            .try_collect()
1450            .await
1451            .map_err(|e| OmniError::Lance(e.to_string()))
1452    }
1453
1454    /// Scan committed via Lance + apply the same filter to in-memory
1455    /// pending batches via DataFusion `MemTable`, concat the two result
1456    /// streams. The replacement for `scan_with_staged` in engine code:
1457    /// the staged-write writer accumulates input batches in memory and
1458    /// unions them with the committed snapshot at read time,
1459    /// sidestepping the `Scanner::with_fragments` filter-pushdown
1460    /// limitation documented on `scan_with_staged`.
1461    ///
1462    /// `committed_ds` should be opened at the pre-mutation
1463    /// `expected_version` (the same version captured in `MutationStaging::expected_versions`
1464    /// at first touch of the table). `pending_batches` are the per-table
1465    /// accumulator's batches in their input shape. `pending_schema` is
1466    /// the schema of the accumulated batches; passing `None` falls back
1467    /// to the schema of the first pending batch.
1468    ///
1469    /// `filter` is the Lance / DataFusion SQL predicate. It is applied
1470    /// to both sides — Lance pushes it down on the committed side; the
1471    /// pending side runs it through a fresh DataFusion `SessionContext`
1472    /// with the batches registered as a `MemTable` named `pending`.
1473    ///
1474    /// `key_column` controls how committed and pending are unioned:
1475    /// - **`None` (union semantics)**: every committed row that matches
1476    ///   the filter and every pending row that matches the filter is
1477    ///   returned. Correct when committed and pending cannot share a
1478    ///   primary key — e.g., Append-mode loads with ULID-generated ids,
1479    ///   or any read where pending hasn't been used to update committed
1480    ///   rows.
1481    /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
1482    ///   `col` value appears in any pending batch are EXCLUDED from the
1483    ///   result; only pending's view of those rows is returned. Required
1484    ///   for Merge-mode reads (e.g., `execute_update` on the engine path)
1485    ///   so a chained `update` doesn't see stale committed values that
1486    ///   a prior op already updated in pending. Without this, a predicate
1487    ///   like `where age > 30` can match a row that an earlier
1488    ///   `set age = 20` already moved out of range.
1489    ///
1490    /// When `pending_batches` is empty this delegates to the regular
1491    /// scan path.
1492    pub async fn scan_with_pending(
1493        &self,
1494        committed_ds: &Dataset,
1495        pending_batches: &[RecordBatch],
1496        pending_schema: Option<SchemaRef>,
1497        projection: Option<&[&str]>,
1498        filter: Option<&str>,
1499        key_column: Option<&str>,
1500    ) -> Result<Vec<RecordBatch>> {
1501        // Contract: when merge-shadow semantics are requested via
1502        // `key_column`, the committed-side projection MUST include that
1503        // column so we can filter committed rows whose key appears in
1504        // pending. Silently dropping the shadow when projection omits
1505        // the key would re-introduce union semantics behind the
1506        // caller's back. Reject up front with a clear error so callers
1507        // either (a) include the key in projection or (b) drop
1508        // `key_column` if union is what they wanted.
1509        if let (Some(key_col), Some(cols)) = (key_column, projection) {
1510            if !cols.iter().any(|c| *c == key_col) {
1511                return Err(OmniError::Lance(format!(
1512                    "scan_with_pending: key_column '{}' must appear in projection \
1513                     when merge-shadow semantics are requested (got projection = {:?})",
1514                    key_col, cols
1515                )));
1516            }
1517        }
1518
1519        let committed = self.scan(committed_ds, projection, filter, None).await?;
1520        if pending_batches.is_empty() {
1521            return Ok(committed);
1522        }
1523
1524        // Shadow committed rows whose key value also appears in pending.
1525        // This makes scan_with_pending implement merge semantics rather
1526        // than naive union: any row that has a pending update is
1527        // represented ONLY by its pending value, never by both its
1528        // (stale) committed value and its (current) pending value.
1529        let committed = match key_column {
1530            Some(key_col) => {
1531                let pending_keys = collect_string_column_values(pending_batches, key_col)?;
1532                if pending_keys.is_empty() {
1533                    committed
1534                } else {
1535                    filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
1536                }
1537            }
1538            None => committed,
1539        };
1540
1541        let pending =
1542            scan_pending_batches(pending_batches, pending_schema, projection, filter).await?;
1543
1544        let mut out = committed;
1545        out.extend(pending);
1546        Ok(out)
1547    }
1548
1549    /// `count_rows` variant that respects staged writes. Used for
1550    /// edge-cardinality validation that needs to see staged edges before
1551    /// commit. Same `committed - removed + new` composition as
1552    /// `scan_with_staged`.
1553    pub async fn count_rows_with_staged(
1554        &self,
1555        ds: &Dataset,
1556        staged: &[StagedWrite],
1557        filter: Option<String>,
1558    ) -> Result<usize> {
1559        if staged.is_empty() {
1560            return self.count_rows(ds, filter).await;
1561        }
1562        let mut scanner = ds.scan();
1563        if let Some(f) = filter {
1564            scanner
1565                .filter(&f)
1566                .map_err(|e| OmniError::Lance(e.to_string()))?;
1567        }
1568        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1569        let count = scanner
1570            .count_rows()
1571            .await
1572            .map_err(|e| OmniError::Lance(e.to_string()))?;
1573        Ok(count as usize)
1574    }
1575
1576    async fn user_indices_for_column(
1577        &self,
1578        ds: &Dataset,
1579        column: &str,
1580    ) -> Result<Vec<IndexMetadata>> {
1581        let field_id = ds
1582            .schema()
1583            .field(column)
1584            .map(|field| field.id)
1585            .ok_or_else(|| {
1586                OmniError::manifest_internal(format!(
1587                    "dataset is missing expected index column '{}'",
1588                    column
1589                ))
1590            })?;
1591        let indices = ds
1592            .load_indices()
1593            .await
1594            .map_err(|e| OmniError::Lance(e.to_string()))?;
1595        Ok(indices
1596            .iter()
1597            .filter(|index| !is_system_index(index))
1598            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
1599            .cloned()
1600            .collect())
1601    }
1602
1603    pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1604        let indices = self.user_indices_for_column(ds, column).await?;
1605        Ok(indices.iter().any(|index| {
1606            index
1607                .index_details
1608                .as_ref()
1609                .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
1610                .unwrap_or(false)
1611        }))
1612    }
1613
1614    pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1615        let indices = self.user_indices_for_column(ds, column).await?;
1616        Ok(indices.iter().any(|index| {
1617            index
1618                .index_details
1619                .as_ref()
1620                .map(|details| IndexDetails(details.clone()).supports_fts())
1621                .unwrap_or(false)
1622        }))
1623    }
1624
1625    pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1626        let indices = self.user_indices_for_column(ds, column).await?;
1627        Ok(indices.iter().any(|index| {
1628            index
1629                .index_details
1630                .as_ref()
1631                .map(|details| IndexDetails(details.clone()).is_vector())
1632                .unwrap_or(false)
1633        }))
1634    }
1635
1636    pub(crate) async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1637        let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
1638        ds.create_index_builder(&[column], IndexType::Vector, &params)
1639            .replace(true)
1640            .await
1641            .map_err(|e| OmniError::Lance(e.to_string()))?;
1642        // Record only after the index build succeeds, so a failed build does not
1643        // inflate the probe (matches the `stage_*` probes).
1644        crate::instrumentation::record_create_vector_index();
1645        Ok(())
1646    }
1647
1648    pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
1649        let batch = RecordBatch::new_empty(schema.clone());
1650        Self::write_dataset(dataset_uri, batch).await
1651    }
1652
1653    pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
1654        let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
1655            .await?
1656            .try_collect::<Vec<RecordBatch>>()
1657            .await
1658            .map_err(|e| OmniError::Lance(e.to_string()))?;
1659        Ok(batches.iter().find_map(|batch| {
1660            batch
1661                .column_by_name("_rowid")
1662                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1663                .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
1664        }))
1665    }
1666
1667    pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
1668        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
1669        let params = WriteParams {
1670            mode: WriteMode::Create,
1671            enable_stable_row_ids: true,
1672            data_storage_version: Some(LanceFileVersion::V2_2),
1673            allow_external_blob_outside_bases: true,
1674            auto_cleanup: None,
1675            skip_auto_cleanup: true,
1676            ..Default::default()
1677        };
1678        Dataset::write(reader, dataset_uri, Some(params))
1679            .await
1680            .map_err(|e| OmniError::Lance(e.to_string()))
1681    }
1682}
1683
1684/// Build the `Scanner::with_fragments` argument for read-your-writes:
1685/// committed manifest fragments minus any fragment IDs superseded by the
1686/// staged writes, plus the staged `new_fragments`. Order is:
1687///   1. committed fragments whose IDs are NOT in any staged
1688///      `removed_fragment_ids` (preserves committed order),
1689///   2. all staged `new_fragments` in stage order.
1690///
1691/// Lance's `Scanner` does not require any particular ordering between
1692/// committed and staged fragments — `with_fragments` scopes the scan to
1693/// exactly the supplied list. The dedup matters because merge_insert
1694/// rewrites a fragment in place at the Lance layer: the rewritten
1695/// fragment is in `new_fragments`, the original (which it supersedes) is
1696/// in `committed` until manifest commit, and including both would yield
1697/// duplicate rows.
1698///
1699/// **Inter-stage supersession is not handled here.** Each StagedWrite's
1700/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
1701/// later staged merge cannot know about an earlier staged merge's
1702/// fragments (Lance's `MergeInsertBuilder` runs against the committed
1703/// view). If two `stage_merge_insert`s on the same table produce rows
1704/// with the same key, the combined view returns duplicates by key. The
1705/// engine's mutation path enforces "per touched table: all stage_append
1706/// OR exactly one stage_merge_insert" at parse time (D₂′ in
1707/// `exec/mutation.rs`) so this primitive's caller never chains merges.
1708/// See `stage_merge_insert` for the full contract.
1709/// Sum `physical_rows` across all fragments in the supplied stages.
1710/// Used by `stage_append` to compute the row-ID offset for chained
1711/// `stage_append` calls against the same dataset.
1712///
1713/// Assumes `prior_stages` contains only `stage_append` results — see
1714/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the
1715/// `new_fragments` include rewrites that don't add new rows, so this
1716/// would over-count.
1717fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
1718    prior_stages
1719        .iter()
1720        .map(|s| s.new_fragments.len() as u64)
1721        .sum()
1722}
1723
1724/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
1725/// commit-time `Transaction::fragments_with_ids` (lance-6.0.1
1726/// `dataset/transaction.rs:1456`) — fragments produced by
1727/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
1728/// placeholder; we renumber here so they don't collide with committed
1729/// fragments (or with each other across chained stages) when the slice is
1730/// passed to `Scanner::with_fragments`.
1731fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
1732    for (i, fragment) in fragments.iter_mut().enumerate() {
1733        if fragment.id == 0 {
1734            fragment.id = start_id + i as u64;
1735        }
1736    }
1737}
1738
1739fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
1740    let mut total: u64 = 0;
1741    for stage in prior_stages {
1742        for fragment in &stage.new_fragments {
1743            let physical_rows = fragment.physical_rows.ok_or_else(|| {
1744                OmniError::manifest_internal(
1745                    "prior_stages_row_count: fragment is missing physical_rows".to_string(),
1746                )
1747            })? as u64;
1748            total += physical_rows;
1749        }
1750    }
1751    Ok(total)
1752}
1753
1754/// Assign sequential row IDs to fragments that lack them, starting from
1755/// `start_row_id`. Mirrors the relevant arm of Lance's
1756/// `Transaction::assign_row_ids` (lance-6.0.1 `dataset/transaction.rs:2682`)
1757/// for the `row_id_meta = None` case — fragments produced by
1758/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
1759///
1760/// Used only by `stage_append` for read-your-writes — see its docstring
1761/// for why pre-commit assignment is needed and why diverging from Lance's
1762/// commit-time IDs is safe.
1763fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
1764    let mut next_row_id = start_row_id;
1765    for fragment in fragments {
1766        if fragment.row_id_meta.is_some() {
1767            continue;
1768        }
1769        let physical_rows = fragment.physical_rows.ok_or_else(|| {
1770            OmniError::manifest_internal(
1771                "stage_append: fragment is missing physical_rows".to_string(),
1772            )
1773        })? as u64;
1774        let row_ids = next_row_id..(next_row_id + physical_rows);
1775        let sequence = RowIdSequence::from(row_ids);
1776        let serialized = write_row_ids(&sequence);
1777        fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
1778        next_row_id += physical_rows;
1779    }
1780    Ok(())
1781}
1782
1783/// Collect the set of values in a Utf8 column across multiple batches.
1784/// Used by `scan_with_pending`'s merge-semantic path to identify
1785/// committed rows that are shadowed by pending writes. NULL values are
1786/// skipped.
1787fn collect_string_column_values(
1788    batches: &[RecordBatch],
1789    column: &str,
1790) -> Result<std::collections::HashSet<String>> {
1791    use arrow_array::{Array, StringArray};
1792    let mut out = std::collections::HashSet::new();
1793    for batch in batches {
1794        let Some(col) = batch.column_by_name(column) else {
1795            return Err(OmniError::Lance(format!(
1796                "scan_with_pending: pending batch missing key column '{}'",
1797                column
1798            )));
1799        };
1800        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1801            OmniError::Lance(format!(
1802                "scan_with_pending: key column '{}' is not Utf8",
1803                column
1804            ))
1805        })?;
1806        for i in 0..arr.len() {
1807            if arr.is_valid(i) {
1808                out.insert(arr.value(i).to_string());
1809            }
1810        }
1811    }
1812    Ok(out)
1813}
1814
1815/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
1816/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
1817/// rows that pending has already updated. Returns the surviving rows.
1818///
1819/// `scan_with_pending` validates up front that the projection contains
1820/// `column`, so a missing column here is a programmer error — error
1821/// loudly instead of silently passing batches through (which would
1822/// re-introduce the union semantics the caller asked us to avoid).
1823fn filter_out_rows_where_string_in(
1824    batches: Vec<RecordBatch>,
1825    column: &str,
1826    excluded: &std::collections::HashSet<String>,
1827) -> Result<Vec<RecordBatch>> {
1828    use arrow_array::{Array, BooleanArray, StringArray};
1829    let mut out = Vec::with_capacity(batches.len());
1830    for batch in batches {
1831        if batch.num_rows() == 0 {
1832            out.push(batch);
1833            continue;
1834        }
1835        let col = batch.column_by_name(column).ok_or_else(|| {
1836            OmniError::manifest_internal(format!(
1837                "scan_with_pending: committed batch missing key column '{}' \
1838                 (the up-front projection check should have rejected this)",
1839                column
1840            ))
1841        })?;
1842        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1843            OmniError::Lance(format!(
1844                "scan_with_pending: committed column '{}' is not Utf8",
1845                column
1846            ))
1847        })?;
1848        let mask: BooleanArray = (0..arr.len())
1849            .map(|i| {
1850                if arr.is_valid(i) {
1851                    Some(!excluded.contains(arr.value(i)))
1852                } else {
1853                    Some(true)
1854                }
1855            })
1856            .collect();
1857        let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
1858            .map_err(|e| OmniError::Lance(e.to_string()))?;
1859        out.push(filtered);
1860    }
1861    Ok(out)
1862}
1863
1864/// Apply `projection` and `filter` to in-memory pending batches via a
1865/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
1866/// the read-your-writes side of the in-memory staging accumulator.
1867///
1868/// `pending_batches` must be non-empty (the caller short-circuits on
1869/// empty).
1870///
1871/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
1872/// on the committed side. Lance and DataFusion both accept standard
1873/// SQL comparison predicates (`col op literal`) and OmniGraph's
1874/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
1875/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
1876/// scanner extension (vector search, FTS, `_rowid` references) into
1877/// the filter, this function will need explicit translation — DataFusion
1878/// won't recognize those operators against the in-memory `MemTable`.
1879async fn scan_pending_batches(
1880    pending_batches: &[RecordBatch],
1881    pending_schema: Option<SchemaRef>,
1882    projection: Option<&[&str]>,
1883    filter: Option<&str>,
1884) -> Result<Vec<RecordBatch>> {
1885    let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
1886    // #283: disable SQL identifier normalization so an unquoted camelCase
1887    // column in `filter` (e.g. `repoName = 'acme'`, emitted unquoted by
1888    // `predicate_to_sql` because the committed Lance scan needs it unquoted)
1889    // is matched case-preserving against the case-sensitive MemTable schema.
1890    // Without this, DataFusion lowercases `repoName` → `reponame` and fails to
1891    // resolve. Quoted identifiers (the projection list below) are unaffected.
1892    let mut config = datafusion::execution::context::SessionConfig::new();
1893    config.options_mut().sql_parser.enable_ident_normalization = false;
1894    let ctx = datafusion::execution::context::SessionContext::new_with_config(config);
1895    let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
1896        .map_err(|e| OmniError::Lance(e.to_string()))?;
1897    ctx.register_table("pending", Arc::new(mem))
1898        .map_err(|e| OmniError::Lance(e.to_string()))?;
1899
1900    let proj = projection
1901        .map(|cols| {
1902            cols.iter()
1903                .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
1904                .collect::<Vec<_>>()
1905                .join(", ")
1906        })
1907        .unwrap_or_else(|| "*".to_string());
1908    let where_clause = filter.map(|f| format!("WHERE {f}")).unwrap_or_default();
1909    let sql = format!("SELECT {proj} FROM pending {where_clause}");
1910    let df = ctx
1911        .sql(&sql)
1912        .await
1913        .map_err(|e| OmniError::Lance(e.to_string()))?;
1914    df.collect()
1915        .await
1916        .map_err(|e| OmniError::Lance(e.to_string()))
1917}
1918
1919fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
1920    let removed: std::collections::HashSet<u64> = staged
1921        .iter()
1922        .flat_map(|w| w.removed_fragment_ids.iter().copied())
1923        .collect();
1924    let mut combined: Vec<Fragment> = ds
1925        .manifest
1926        .fragments
1927        .iter()
1928        .filter(|f| !removed.contains(&f.id))
1929        .cloned()
1930        .collect();
1931    for write in staged {
1932        combined.extend(write.new_fragments.iter().cloned());
1933    }
1934    combined
1935}
1936
1937/// Precondition guard for `stage_merge_insert`.
1938/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
1939/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
1940/// collapse genuine duplicate source keys; this check restores fail-fast
1941/// behavior on real dups by erroring before the builder gets a chance to
1942/// silently skip them.
1943///
1944/// Today only single-column string keys are used at the call sites
1945/// (`vec!["id".to_string()]`). The check restricts itself to that shape
1946/// and surfaces an internal error if a future caller passes anything
1947/// else — keeping the assumption explicit instead of silently degrading.
1948fn check_batch_unique_by_keys(
1949    batch: &RecordBatch,
1950    key_columns: &[String],
1951    context: &'static str,
1952) -> Result<()> {
1953    if key_columns.len() != 1 {
1954        return Err(OmniError::manifest_internal(format!(
1955            "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}",
1956            context, key_columns
1957        )));
1958    }
1959    let key_col_name = &key_columns[0];
1960    let column = batch.column_by_name(key_col_name).ok_or_else(|| {
1961        OmniError::manifest_internal(format!(
1962            "{}: source batch missing key column '{}'",
1963            context, key_col_name
1964        ))
1965    })?;
1966    let strs = column
1967        .as_any()
1968        .downcast_ref::<StringArray>()
1969        .ok_or_else(|| {
1970            OmniError::manifest_internal(format!(
1971                "{}: key column '{}' is not a StringArray (got {:?})",
1972                context,
1973                key_col_name,
1974                column.data_type()
1975            ))
1976        })?;
1977
1978    let mut seen: std::collections::HashSet<&str> =
1979        std::collections::HashSet::with_capacity(batch.num_rows());
1980    for i in 0..strs.len() {
1981        if !strs.is_valid(i) {
1982            continue;
1983        }
1984        let v = strs.value(i);
1985        if !seen.insert(v) {
1986            return Err(OmniError::manifest(format!(
1987                "{}: duplicate source row for key '{}' (column '{}'); \
1988                 callers must hand in a batch unique by `key_columns` \
1989                 — see MR-957",
1990                context, v, key_col_name
1991            )));
1992        }
1993    }
1994    Ok(())
1995}
1996
1997#[cfg(test)]
1998mod tests {
1999    use super::*;
2000    use arrow_array::StringArray;
2001    use arrow_schema::{DataType, Field, Schema};
2002
2003    fn batch_with_ids(ids: &[&str]) -> RecordBatch {
2004        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
2005        let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef;
2006        RecordBatch::try_new(schema, vec![col]).unwrap()
2007    }
2008
2009    #[test]
2010    fn check_batch_unique_by_keys_passes_when_all_unique() {
2011        let batch = batch_with_ids(&["a", "b", "c"]);
2012        check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap();
2013    }
2014
2015    #[test]
2016    fn check_batch_unique_by_keys_errors_on_duplicate_id() {
2017        let batch = batch_with_ids(&["a", "b", "a"]);
2018        let err = check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err();
2019        let msg = err.to_string();
2020        assert!(
2021            msg.contains("duplicate source row for key 'a'"),
2022            "unexpected error: {msg}"
2023        );
2024        assert!(
2025            msg.contains("MR-957"),
2026            "error should reference MR-957: {msg}"
2027        );
2028    }
2029
2030    #[test]
2031    fn check_batch_unique_by_keys_rejects_multi_column_keys() {
2032        let batch = batch_with_ids(&["a"]);
2033        let err =
2034            check_batch_unique_by_keys(&batch, &["id".to_string(), "other".to_string()], "test")
2035                .unwrap_err();
2036        assert!(err.to_string().contains("single-column keys only"));
2037    }
2038}