Skip to main content

omnigraph/
table_store.rs

1use arrow_array::{
2    Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
3};
4use arrow_schema::SchemaRef;
5use datafusion::physical_plan::SendableRecordBatchStream;
6use futures::TryStreamExt;
7use lance::Dataset;
8use lance::blob::BlobArrayBuilder;
9use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
10use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{
13    CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
14    WriteParams,
15};
16use lance::datatypes::{BlobKind, Schema as LanceSchema};
17use lance::index::DatasetIndexExt;
18use lance::index::scalar::IndexDetails;
19use lance_file::version::LanceFileVersion;
20use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
21use lance_index::{IndexType, is_system_index};
22use lance_linalg::distance::MetricType;
23use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
24use lance_table::rowids::{RowIdSequence, write_row_ids};
25use std::sync::Arc;
26
27use crate::db::manifest::TableVersionMetadata;
28use crate::db::{Snapshot, SubTableEntry};
29use crate::error::{OmniError, Result};
30use crate::storage_layer::ForkOutcome;
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct TableState {
34    pub version: u64,
35    pub row_count: u64,
36    pub(crate) version_metadata: TableVersionMetadata,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct DeleteState {
41    pub version: u64,
42    pub row_count: u64,
43    pub deleted_rows: usize,
44    pub(crate) version_metadata: TableVersionMetadata,
45}
46
47/// Whether a `key_col IN (...)` scan on a dataset will be served by the
48/// persisted scalar (BTREE) index, or silently fall back to a full filtered
49/// scan. Detection-only (metadata, no IO); the scan returns the correct rows
50/// either way. Surfaced by the indexed traversal path so the silent perf
51/// fallback is observable, and available to a future cost-based planner.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum IndexCoverage {
54    /// The column has a usable BTREE and every fragment records `physical_rows`.
55    Indexed,
56    /// Lance will not use the scalar index for this scan (correct, full scan).
57    Degraded { reason: String },
58}
59
60/// A Lance write that has produced fragment files on object storage but is
61/// not yet committed to the dataset's manifest. The staged-write primitives
62/// are consumed by `MutationStaging` (`exec/staging.rs`,
63/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
64/// intent: defer Lance commits to end-of-query so a mid-query failure
65/// leaves the touched table at the pre-mutation HEAD instead of
66/// drifting ahead. See `docs/dev/writes.md` for the publisher-CAS contract
67/// this builds on.
68///
69/// `transaction` is opaque from our side — Lance owns its semantics. We
70/// commit it via `CommitBuilder::execute(transaction)` (see
71/// `TableStore::commit_staged`).
72///
73/// For read-your-writes within the same query, `new_fragments` and
74/// `removed_fragment_ids` together describe the post-stage view delta:
75/// `scan_with_staged` (and `count_rows_with_staged`) compose
76/// `committed - removed + new` so subsequent reads see the staged result
77/// without double-counting fragments that `Operation::Update` rewrote.
78/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
79/// existing fragments would yield duplicate rows (the original fragment
80/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
81#[derive(Debug, Clone)]
82pub struct StagedWrite {
83    pub transaction: Transaction,
84    /// Fragments to surface alongside the committed manifest in
85    /// `Scanner::with_fragments(committed - removed + new)`. For
86    /// `Operation::Append` these are the freshly-appended fragments. For
87    /// `Operation::Update` (merge_insert) these are
88    /// `updated_fragments + new_fragments` (rewrites + freshly-inserted
89    /// rows).
90    pub new_fragments: Vec<Fragment>,
91    /// Fragment IDs that this staged write supersedes. The committed
92    /// manifest must filter these out before being combined with
93    /// `new_fragments` for read-your-writes scans, otherwise rewrites
94    /// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
95    /// adds without removing anything); populated from
96    /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
97    pub removed_fragment_ids: Vec<u64>,
98}
99
100#[derive(Debug, Clone)]
101pub struct TableStore {
102    root_uri: String,
103}
104
105impl TableStore {
106    pub fn new(root_uri: &str) -> Self {
107        Self {
108            root_uri: root_uri.trim_end_matches('/').to_string(),
109        }
110    }
111
112    pub fn root_uri(&self) -> &str {
113        &self.root_uri
114    }
115
116    pub fn dataset_uri(&self, table_path: &str) -> String {
117        format!("{}/{}", self.root_uri, table_path)
118    }
119
120    fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
121        let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
122        let table_path = dataset_uri
123            .strip_prefix(&prefix)
124            .map(|path| path.to_string())
125            .ok_or_else(|| {
126                OmniError::manifest_internal(format!(
127                    "dataset uri '{}' is not under root '{}'",
128                    dataset_uri, self.root_uri
129                ))
130            })?;
131        Ok(table_path
132            .split_once("/tree/")
133            .map(|(path, _)| path.to_string())
134            .unwrap_or(table_path))
135    }
136
137    fn dataset_version_metadata(
138        &self,
139        dataset_uri: &str,
140        ds: &Dataset,
141    ) -> Result<TableVersionMetadata> {
142        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
143        TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
144    }
145
146    pub async fn open_snapshot_table(
147        &self,
148        snapshot: &Snapshot,
149        table_key: &str,
150    ) -> Result<Dataset> {
151        snapshot.open(table_key).await
152    }
153
154    pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
155        entry.open(&self.root_uri).await
156    }
157
158    pub async fn open_dataset_head(
159        &self,
160        dataset_uri: &str,
161        branch: Option<&str>,
162    ) -> Result<Dataset> {
163        // Direct open by URI (O(1) latest-resolution). Routed through the tracked
164        // opener so a cost test counts it via the per-query `table_wrapper`
165        // (no-op in production — the task-local is unset, so this is exactly
166        // `Dataset::open(uri)`).
167        let ds = crate::instrumentation::open_dataset_tracked(
168            dataset_uri,
169            crate::instrumentation::table_wrapper(),
170        )
171        .await?;
172        match branch {
173            Some(branch) if branch != "main" => ds
174                .checkout_branch(branch)
175                .await
176                .map_err(|e| OmniError::Lance(e.to_string())),
177            _ => Ok(ds),
178        }
179    }
180
181    pub async fn open_dataset_head_for_write(
182        &self,
183        table_key: &str,
184        dataset_uri: &str,
185        branch: Option<&str>,
186    ) -> Result<Dataset> {
187        // RFC-013 step 3a: open writes via the direct opener (O(1)) instead of the
188        // lance-namespace builder, which re-resolved the table's version chain
189        // O(depth) per write. The namespace is a catalog/discovery layer, not a
190        // per-open hot-path component (RFC §2.4); the manifest already holds the
191        // location, and `ensure_expected_version` still validates head == pinned
192        // for strict ops. `table_key` retained for signature stability.
193        let _ = table_key;
194        self.open_dataset_head(dataset_uri, branch).await
195    }
196
197    pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
198        let mut ds = Dataset::open(dataset_uri)
199            .await
200            .map_err(|e| OmniError::Lance(e.to_string()))?;
201        ds.delete_branch(branch)
202            .await
203            .map_err(|e| OmniError::Lance(e.to_string()))
204    }
205
206    /// List the named Lance branches present on the dataset at `dataset_uri`.
207    /// The `cleanup` orphan reconciler diffs this against the manifest branch
208    /// set to find orphaned per-table forks. `main`/default is not a named
209    /// branch and never appears here.
210    pub async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
211        let ds = Dataset::open(dataset_uri)
212            .await
213            .map_err(|e| OmniError::Lance(e.to_string()))?;
214        let branches = ds
215            .list_branches()
216            .await
217            .map_err(|e| OmniError::Lance(e.to_string()))?;
218        Ok(branches.into_keys().collect())
219    }
220
221    /// Idempotently drop `branch` from the dataset at `dataset_uri`.
222    ///
223    /// Unlike [`delete_branch`](Self::delete_branch), this tolerates an
224    /// already-absent branch — both a missing contents ref (Lance's
225    /// `force_delete_branch` handles that) and a missing `tree/{branch}/`
226    /// directory (the local-store `NotFound` quirk pinned by
227    /// `lance_surface_guards::force_delete_branch_semantics`). Safe to call on a
228    /// possibly-orphaned or already-reclaimed fork.
229    ///
230    /// A branch that still has referencing descendants (`RefConflict`) is NOT
231    /// tolerated: that is a real ordering error and surfaces as `OmniError::Lance`.
232    /// Used by the eager best-effort reclaim in `cleanup_deleted_branch_tables`
233    /// and the `cleanup` orphan reconciler.
234    pub async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
235        let mut ds = Dataset::open(dataset_uri)
236            .await
237            .map_err(|e| OmniError::Lance(e.to_string()))?;
238        match ds.force_delete_branch(branch).await {
239            Ok(()) => Ok(()),
240            Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => Ok(()),
241            Err(e) => Err(OmniError::Lance(e.to_string())),
242        }
243    }
244
245    pub async fn open_dataset_at_state(
246        &self,
247        table_path: &str,
248        branch: Option<&str>,
249        version: u64,
250    ) -> Result<Dataset> {
251        let ds = self
252            .open_dataset_head(&self.dataset_uri(table_path), branch)
253            .await?;
254        ds.checkout_version(version)
255            .await
256            .map_err(|e| OmniError::Lance(e.to_string()))
257    }
258
259    pub fn ensure_expected_version(
260        &self,
261        ds: &Dataset,
262        table_key: &str,
263        expected_version: u64,
264    ) -> Result<()> {
265        let actual = ds.version().version;
266        if actual != expected_version {
267            // Use the structured ExpectedVersionMismatch variant so callers
268            // (and the HTTP server) can match on details rather than parsing
269            // the message. This drift is a publisher-style OCC failure: the
270            // caller's pre-write view of the table version is stale relative
271            // to the on-disk Lance head.
272            return Err(OmniError::manifest_expected_version_mismatch(
273                table_key,
274                expected_version,
275                actual,
276            ));
277        }
278        Ok(())
279    }
280
281    pub async fn reopen_for_mutation(
282        &self,
283        dataset_uri: &str,
284        branch: Option<&str>,
285        table_key: &str,
286        expected_version: u64,
287    ) -> Result<Dataset> {
288        let ds = self
289            .open_dataset_head_for_write(table_key, dataset_uri, branch)
290            .await?;
291        self.ensure_expected_version(&ds, table_key, expected_version)?;
292        Ok(ds)
293    }
294
295    pub async fn fork_branch_from_state(
296        &self,
297        dataset_uri: &str,
298        source_branch: Option<&str>,
299        table_key: &str,
300        source_version: u64,
301        target_branch: &str,
302    ) -> Result<ForkOutcome<Dataset>> {
303        let mut source_ds = self
304            .open_dataset_head(dataset_uri, source_branch)
305            .await?
306            .checkout_version(source_version)
307            .await
308            .map_err(|e| OmniError::Lance(e.to_string()))?;
309        self.ensure_expected_version(&source_ds, table_key, source_version)?;
310
311        if let Err(create_err) = source_ds
312            .create_branch(target_branch, source_version, None)
313            .await
314        {
315            // Disambiguate the failure: only a genuinely pre-existing ref is a
316            // reclaim candidate. Mapping EVERY create_branch failure to
317            // `RefAlreadyExists` would route a transient I/O / version / Lance
318            // internal error into the destructive reclaim path. So check whether
319            // the ref actually exists; if not, the failure is real — propagate
320            // it (preserving error fidelity) rather than force-deleting.
321            //
322            // `list_branches` reads `_refs/branches/` from the store, so it sees
323            // a fully-formed manifest-unreferenced fork (our common case — a
324            // create_branch that completed but whose manifest publish did not).
325            // It does NOT see a phase-1-only Lance "zombie" (tree dir written,
326            // no BranchContents) — but neither does `cleanup`'s reconciler, also
327            // list_branches-based. A zombie only forms if create_branch is
328            // interrupted *between its two internal phases* (a far narrower
329            // window than the manifest-publish gap), and it surfaces here as the
330            // propagated create error requiring manual reclaim. We deliberately
331            // do NOT force-delete on a not-found-ref failure: it is
332            // indistinguishable from a transient error on a fresh create, and
333            // force-deleting there is the destructive overreach this guard
334            // removes. The caller holds the per-(table, branch) write queue, so
335            // no in-process writer races this fork; a cross-process create
336            // between our check and now is the documented one-winner-CAS gap and
337            // propagates as a retryable error.
338            let ref_exists = source_ds
339                .list_branches()
340                .await
341                .map(|b| b.contains_key(target_branch))
342                .unwrap_or(false);
343            if ref_exists {
344                return Ok(ForkOutcome::RefAlreadyExists);
345            }
346            return Err(OmniError::Lance(create_err.to_string()));
347        }
348
349        let ds = self
350            .open_dataset_head(dataset_uri, Some(target_branch))
351            .await?;
352        self.ensure_expected_version(&ds, table_key, source_version)?;
353        Ok(ForkOutcome::Created(ds))
354    }
355
356    pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
357        self.scan(ds, None, None, None).await
358    }
359
360    pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
361        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
362        if !has_blob_columns {
363            return self.scan_batches(ds).await;
364        }
365
366        let batches = Self::scan_stream(ds, None, None, None, true)
367            .await?
368            .try_collect::<Vec<RecordBatch>>()
369            .await
370            .map_err(|e| OmniError::Lance(e.to_string()))?;
371        let mut materialized = Vec::with_capacity(batches.len());
372        for batch in batches {
373            materialized.push(Self::materialize_blob_batch(ds, batch).await?);
374        }
375        Ok(materialized)
376    }
377
378    /// Streaming, blob-aware sibling of [`Self::scan_batches_for_rewrite`].
379    /// Yields the dataset's rows lazily as a `SendableRecordBatchStream` so a
380    /// downstream writer (`stage_append_stream`) never materializes the whole
381    /// table in memory. Blob columns still need per-row rebuild, so those tables
382    /// fall back to the materialized path and are re-streamed from the `Vec`
383    /// (rare — only tables with a `Blob` property; bounded-memory blob streaming
384    /// is a follow-up). The non-blob path is a true lazy scan.
385    pub async fn scan_stream_for_rewrite(&self, ds: &Dataset) -> Result<SendableRecordBatchStream> {
386        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
387        if has_blob_columns {
388            let arrow_schema: SchemaRef = Arc::new(ds.schema().into());
389            let batches = self.scan_batches_for_rewrite(ds).await?;
390            let reader = arrow_array::RecordBatchIterator::new(
391                batches.into_iter().map(Ok),
392                arrow_schema,
393            );
394            return Ok(lance_datafusion::utils::reader_to_stream(Box::new(reader)));
395        }
396        // Non-blob: a true lazy scan. `DatasetRecordBatchStream` converts to the
397        // `SendableRecordBatchStream` that `execute_uncommitted_stream` consumes.
398        Ok(Self::scan_stream(ds, None, None, None, false).await?.into())
399    }
400
401    pub(crate) async fn materialize_blob_batch(
402        ds: &Dataset,
403        batch: RecordBatch,
404    ) -> Result<RecordBatch> {
405        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
406        if !has_blob_columns {
407            return Ok(batch);
408        }
409
410        let row_ids = batch
411            .column_by_name("_rowid")
412            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
413            .ok_or_else(|| {
414                OmniError::Lance("expected _rowid column when materializing blobs".to_string())
415            })?
416            .values()
417            .iter()
418            .copied()
419            .collect::<Vec<_>>();
420
421        let schema: SchemaRef = Arc::new(ds.schema().into());
422        let mut columns = Vec::with_capacity(schema.fields().len());
423        for field in schema.fields() {
424            let lance_field = lance::datatypes::Field::try_from(field.as_ref())
425                .map_err(|e| OmniError::Lance(e.to_string()))?;
426            let column = batch.column_by_name(field.name()).ok_or_else(|| {
427                OmniError::Lance(format!("batch missing column '{}'", field.name()))
428            })?;
429            if lance_field.is_blob() {
430                let descriptions =
431                    column
432                        .as_any()
433                        .downcast_ref::<StructArray>()
434                        .ok_or_else(|| {
435                            OmniError::Lance(format!(
436                                "expected blob descriptions for '{}'",
437                                field.name()
438                            ))
439                        })?;
440                columns.push(
441                    Self::rebuild_blob_column(ds, field.name(), descriptions, &row_ids).await?,
442                );
443            } else {
444                columns.push(column.clone());
445            }
446        }
447
448        RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
449    }
450
451    async fn rebuild_blob_column(
452        ds: &Dataset,
453        column_name: &str,
454        descriptions: &StructArray,
455        row_ids: &[u64],
456    ) -> Result<ArrayRef> {
457        let mut builder = BlobArrayBuilder::new(row_ids.len());
458        let mut non_null_row_ids = Vec::new();
459        let mut row_has_blob = Vec::with_capacity(row_ids.len());
460
461        for row in 0..row_ids.len() {
462            let is_null = Self::blob_description_is_null(descriptions, row)?;
463            row_has_blob.push(!is_null);
464            if !is_null {
465                non_null_row_ids.push(row_ids[row]);
466            }
467        }
468
469        let blob_files = if non_null_row_ids.is_empty() {
470            Vec::new()
471        } else {
472            Arc::new(ds.clone())
473                .take_blobs(&non_null_row_ids, column_name)
474                .await
475                .map_err(|e| OmniError::Lance(e.to_string()))?
476        };
477
478        let mut files = blob_files.into_iter();
479        for has_blob in row_has_blob {
480            if !has_blob {
481                builder
482                    .push_null()
483                    .map_err(|e| OmniError::Lance(e.to_string()))?;
484                continue;
485            }
486
487            let blob = files.next().ok_or_else(|| {
488                OmniError::Lance(format!(
489                    "blob rewrite for '{}' lost alignment with source rows",
490                    column_name
491                ))
492            })?;
493            builder
494                .push_bytes(
495                    blob.read()
496                        .await
497                        .map_err(|e| OmniError::Lance(e.to_string()))?,
498                )
499                .map_err(|e| OmniError::Lance(e.to_string()))?;
500        }
501
502        if files.next().is_some() {
503            return Err(OmniError::Lance(format!(
504                "blob rewrite for '{}' produced extra source blobs",
505                column_name
506            )));
507        }
508
509        builder
510            .finish()
511            .map_err(|e| OmniError::Lance(e.to_string()))
512    }
513
514    fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
515        if descriptions.is_null(row) {
516            return Ok(true);
517        }
518
519        let position = descriptions
520            .column_by_name("position")
521            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
522            .ok_or_else(|| {
523                OmniError::Lance(format!(
524                    "unrecognized blob description schema {:?}: missing UInt64 position field",
525                    descriptions.fields()
526                ))
527            })?;
528        let size = descriptions
529            .column_by_name("size")
530            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
531            .ok_or_else(|| {
532                OmniError::Lance(format!(
533                    "unrecognized blob description schema {:?}: missing UInt64 size field",
534                    descriptions.fields()
535                ))
536            })?;
537
538        let Some(kind_column) = descriptions.column_by_name("kind") else {
539            return Ok(position.is_null(row) || size.is_null(row));
540        };
541        let kind = if let Some(kind) = kind_column.as_any().downcast_ref::<UInt8Array>() {
542            if kind.is_null(row) {
543                return Ok(true);
544            }
545            kind.value(row)
546        } else if let Some(kind) = kind_column.as_any().downcast_ref::<UInt32Array>() {
547            if kind.is_null(row) {
548                return Ok(true);
549            }
550            kind.value(row) as u8
551        } else {
552            return Err(OmniError::Lance(format!(
553                "unrecognized blob description schema {:?}: kind field must be UInt8 or UInt32",
554                descriptions.fields()
555            )));
556        };
557
558        let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
559        if kind != BlobKind::Inline {
560            return Ok(false);
561        }
562        let blob_uri = descriptions
563            .column_by_name("blob_uri")
564            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
565            .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
566
567        Ok((position.is_null(row) || position.value(row) == 0)
568            && (size.is_null(row) || size.value(row) == 0)
569            && blob_uri.unwrap_or("").is_empty())
570    }
571
572    pub async fn scan_stream(
573        ds: &Dataset,
574        projection: Option<&[&str]>,
575        filter: Option<&str>,
576        order_by: Option<Vec<ColumnOrdering>>,
577        with_row_id: bool,
578    ) -> Result<DatasetRecordBatchStream> {
579        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
580    }
581
582    pub async fn scan_stream_with<F>(
583        ds: &Dataset,
584        projection: Option<&[&str]>,
585        filter: Option<&str>,
586        order_by: Option<Vec<ColumnOrdering>>,
587        with_row_id: bool,
588        configure: F,
589    ) -> Result<DatasetRecordBatchStream>
590    where
591        F: FnOnce(&mut Scanner) -> Result<()>,
592    {
593        let mut scanner = ds.scan();
594        if with_row_id {
595            scanner.with_row_id();
596        }
597        if let Some(columns) = projection {
598            scanner
599                .project(columns)
600                .map_err(|e| OmniError::Lance(e.to_string()))?;
601        }
602        if let Some(filter_sql) = filter {
603            scanner
604                .filter(filter_sql)
605                .map_err(|e| OmniError::Lance(e.to_string()))?;
606        }
607        if let Some(ordering) = order_by {
608            scanner
609                .order_by(Some(ordering))
610                .map_err(|e| OmniError::Lance(e.to_string()))?;
611        }
612        configure(&mut scanner)?;
613        scanner
614            .try_into_stream()
615            .await
616            .map_err(|e| OmniError::Lance(e.to_string()))
617    }
618
619    pub async fn scan(
620        &self,
621        ds: &Dataset,
622        projection: Option<&[&str]>,
623        filter: Option<&str>,
624        order_by: Option<Vec<ColumnOrdering>>,
625    ) -> Result<Vec<RecordBatch>> {
626        Self::scan_stream(ds, projection, filter, order_by, false)
627            .await?
628            .try_collect()
629            .await
630            .map_err(|e| OmniError::Lance(e.to_string()))
631    }
632
633    pub async fn scan_with<F>(
634        &self,
635        ds: &Dataset,
636        projection: Option<&[&str]>,
637        filter: Option<&str>,
638        order_by: Option<Vec<ColumnOrdering>>,
639        with_row_id: bool,
640        configure: F,
641    ) -> Result<Vec<RecordBatch>>
642    where
643        F: FnOnce(&mut Scanner) -> Result<()>,
644    {
645        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
646            .await?
647            .try_collect()
648            .await
649            .map_err(|e| OmniError::Lance(e.to_string()))
650    }
651
652    /// Indexed neighbor lookup for graph traversal. Given an edge dataset and a
653    /// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for
654    /// in-traversal), return the matching edge rows projected to
655    /// `[key_col, opposite_col]`.
656    ///
657    /// The `key_col IN (keys)` predicate is built as a structured DataFusion
658    /// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through
659    /// the persisted BTREE on `key_col` (index-search → take). Cost scales with
660    /// the frontier size, not |E| — the basis for serving selective traversals
661    /// without building the whole in-memory CSR. Empty `keys` returns empty
662    /// without scanning.
663    ///
664    /// Note: like any indexed scan, this observes only fragments the BTREE
665    /// covers plus an unindexed-fragment scan fallback; it reads the committed
666    /// snapshot `ds` was opened at.
667    pub async fn scan_edges_by_endpoint(
668        ds: &Dataset,
669        key_col: &str,
670        opposite_col: &str,
671        keys: &[String],
672    ) -> Result<Vec<RecordBatch>> {
673        use datafusion::prelude::{col, lit};
674
675        if keys.is_empty() {
676            return Ok(Vec::new());
677        }
678        let key_list: Vec<datafusion::prelude::Expr> =
679            keys.iter().map(|k| lit(k.clone())).collect();
680        let filter_expr = col(key_col).in_list(key_list, false);
681        Self::scan_stream_with(
682            ds,
683            Some(&[key_col, opposite_col]),
684            None,
685            None,
686            false,
687            |scanner| {
688                scanner.filter_expr(filter_expr);
689                Ok(())
690            },
691        )
692        .await?
693        .try_collect()
694        .await
695        .map_err(|e| OmniError::Lance(e.to_string()))
696    }
697
698    /// Metadata-only check (no IO) of whether `scan_edges_by_endpoint` — a
699    /// `key_col IN (...)` filter — on `ds` will be served by the persisted BTREE
700    /// on `column`, or silently fall back to a full filtered scan. Mirrors
701    /// Lance's own decision: scalar indices are disabled for the whole scan if
702    /// ANY fragment lacks `physical_rows` (lance `dataset/scanner.rs`
703    /// `create_filter_plan`), and are obviously unused if no BTREE on the
704    /// column exists. The scan is correct (returns all rows) either way — this
705    /// only surfaces the perf cliff so the indexed traversal can warn on it.
706    pub async fn key_column_index_coverage(ds: &Dataset, column: &str) -> Result<IndexCoverage> {
707        let Some(field_id) = ds.schema().field(column).map(|field| field.id) else {
708            return Ok(IndexCoverage::Degraded {
709                reason: format!("column '{}' not in schema", column),
710            });
711        };
712        let indices = ds
713            .load_indices()
714            .await
715            .map_err(|e| OmniError::Lance(e.to_string()))?;
716        let btree = indices
717            .iter()
718            .filter(|index| !is_system_index(index))
719            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
720            .find(|index| {
721                index
722                    .index_details
723                    .as_ref()
724                    .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
725                    .unwrap_or(false)
726            });
727        let Some(btree) = btree else {
728            return Ok(IndexCoverage::Degraded {
729                reason: format!("no BTREE index on '{}'", column),
730            });
731        };
732        // Same check Lance runs: a fragment missing physical_rows disables
733        // scalar indices for the entire scan (all-or-nothing).
734        if ds.fragments().iter().any(|f| f.physical_rows.is_none()) {
735            return Ok(IndexCoverage::Degraded {
736                reason: "a fragment is missing physical_rows".to_string(),
737            });
738        }
739        // An index only covers the fragments it was built over; fragments
740        // appended afterward (edge-index creation is skipped once a BTREE exists)
741        // are scanned unindexed. If any CURRENT fragment is absent from the
742        // index's `fragment_bitmap`, the scan is partly a full scan — so the
743        // chooser must not price it as fully indexed. A `None` bitmap means Lance
744        // can't report coverage; don't over-degrade in that case.
745        if let Some(bitmap) = btree.fragment_bitmap.as_ref() {
746            let uncovered = ds
747                .fragments()
748                .iter()
749                .filter(|f| !bitmap.contains(f.id as u32))
750                .count();
751            if uncovered > 0 {
752                return Ok(IndexCoverage::Degraded {
753                    reason: format!(
754                        "{} fragment(s) not covered by the index on '{}'",
755                        uncovered, column
756                    ),
757                });
758            }
759        }
760        Ok(IndexCoverage::Indexed)
761    }
762
763    /// True if any non-system index on `ds` leaves at least one current
764    /// fragment uncovered, i.e. rows that the index does not yet account for
765    /// (appended after the index was built, or rewritten by compaction). Such
766    /// fragments are scanned unindexed until a reindex (`optimize_indices`)
767    /// folds them in. Returns false when every index covers every fragment, or
768    /// when the table has no (non-system) indices to optimize. A `None`
769    /// `fragment_bitmap` means Lance cannot report coverage for that index, so
770    /// we do not treat it as uncovered (mirrors `key_column_index_coverage`).
771    ///
772    /// Used by `optimize` to decide whether an otherwise-already-compacted
773    /// table still has index work to do.
774    pub async fn has_unindexed_fragments(ds: &Dataset) -> Result<bool> {
775        let indices = ds
776            .load_indices()
777            .await
778            .map_err(|e| OmniError::Lance(e.to_string()))?;
779        let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
780        for index in indices.iter() {
781            if is_system_index(index) {
782                continue;
783            }
784            if let Some(bitmap) = index.fragment_bitmap.as_ref() {
785                if frag_ids.iter().any(|id| !bitmap.contains(*id)) {
786                    return Ok(true);
787                }
788            }
789        }
790        Ok(false)
791    }
792
793    pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
794        ds.count_rows(filter)
795            .await
796            .map(|count| count as usize)
797            .map_err(|e| OmniError::Lance(e.to_string()))
798    }
799
800    pub fn dataset_version(&self, ds: &Dataset) -> u64 {
801        ds.version().version
802    }
803
804    pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
805        Ok(TableState {
806            version: self.dataset_version(ds),
807            row_count: self.count_rows(ds, None).await? as u64,
808            version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
809        })
810    }
811
812    /// Legacy inline-commit append: writes fragments AND commits in one
813    /// call, advancing Lance HEAD as a side effect. Not on the
814    /// `TableStorage` trait surface — the staged primitive `stage_append`
815    /// + `commit_staged` is the engine write path. This inherent method
816    /// survives only for in-source recovery test setup, so it is
817    /// `#[cfg(test)]`-gated: engine code physically cannot call it (which
818    /// enforces "no new call sites" by construction and silences the
819    /// dead-code warning the non-test lib build would otherwise emit).
820    #[cfg(test)]
821    pub(crate) async fn append_batch(
822        &self,
823        dataset_uri: &str,
824        ds: &mut Dataset,
825        batch: RecordBatch,
826    ) -> Result<TableState> {
827        if batch.num_rows() == 0 {
828            return self.table_state(dataset_uri, ds).await;
829        }
830        let schema = batch.schema();
831        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
832        let params = WriteParams {
833            mode: WriteMode::Append,
834            allow_external_blob_outside_bases: true,
835            auto_cleanup: None,
836            skip_auto_cleanup: true,
837            ..Default::default()
838        };
839        ds.append(reader, Some(params))
840            .await
841            .map_err(|e| OmniError::Lance(e.to_string()))?;
842        self.table_state(dataset_uri, ds).await
843    }
844
845    pub async fn append_or_create_batch(
846        dataset_uri: &str,
847        dataset: Option<Dataset>,
848        batch: RecordBatch,
849    ) -> Result<Dataset> {
850        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
851        match dataset {
852            Some(mut ds) => {
853                let params = WriteParams {
854                    mode: WriteMode::Append,
855                    allow_external_blob_outside_bases: true,
856                    auto_cleanup: None,
857                    skip_auto_cleanup: true,
858                    ..Default::default()
859                };
860                ds.append(reader, Some(params))
861                    .await
862                    .map_err(|e| OmniError::Lance(e.to_string()))?;
863                Ok(ds)
864            }
865            None => {
866                let params = WriteParams {
867                    mode: WriteMode::Create,
868                    enable_stable_row_ids: true,
869                    data_storage_version: Some(LanceFileVersion::V2_2),
870                    allow_external_blob_outside_bases: true,
871                    auto_cleanup: None,
872                    skip_auto_cleanup: true,
873                    ..Default::default()
874                };
875                Dataset::write(reader, dataset_uri, Some(params))
876                    .await
877                    .map_err(|e| OmniError::Lance(e.to_string()))
878            }
879        }
880    }
881
882    pub(crate) async fn delete_where(
883        &self,
884        dataset_uri: &str,
885        ds: &mut Dataset,
886        filter: &str,
887    ) -> Result<DeleteState> {
888        let delete_result = ds
889            .delete(filter)
890            .await
891            .map_err(|e| OmniError::Lance(e.to_string()))?;
892        Ok(DeleteState {
893            version: delete_result.new_dataset.version().version,
894            row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
895            deleted_rows: delete_result.num_deleted_rows as usize,
896            version_metadata: self
897                .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
898        })
899    }
900
901    // ─── Staged-write API ────────────────────────────────────────────────────
902    //
903    // These primitives wrap Lance's distributed-write API: each call writes
904    // fragment files to object storage but does NOT advance the dataset's
905    // HEAD or commit a manifest entry. The returned `Transaction` is held by
906    // the caller (typically `MutationStaging` or the loader's accumulator)
907    // and committed at end-of-query via `commit_staged`. On failure the
908    // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
909    //
910    // The extracted `Vec<Fragment>` is for read-your-writes within the same
911    // query: subsequent ops construct a `Scanner` and call
912    // `scanner.with_fragments(staged.clone())` to see staged data alongside
913    // the committed snapshot. Lance's filter pushdown, vector search, and
914    // FTS all respect the supplied fragment list.
915
916    /// Stage an append: write fragment files for `batch`, return the
917    /// uncommitted Lance transaction plus the new fragments for
918    /// read-your-writes.
919    ///
920    /// `prior_stages` is the slice of staged writes already accumulated
921    /// against the **same dataset** in the same query. Pass `&[]` for the
922    /// first call; pass the accumulated stages for subsequent calls. The
923    /// primitive uses this to offset row-ID assignment so chained
924    /// `stage_append` calls don't produce overlapping `_rowid` ranges.
925    /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
926    /// slice gets passed to both.
927    ///
928    /// On stable-row-id datasets we manually populate `row_id_meta` on
929    /// the cloned `new_fragments` we expose for `scan_with_staged`.
930    /// Lance's `InsertBuilder::execute_uncommitted` produces fragments
931    /// with `row_id_meta = None`; row IDs are normally assigned by
932    /// `Transaction::assign_row_ids` during commit. Because
933    /// `scan_with_staged` reads the staged fragments *before* commit,
934    /// the scanner trips on a stable-row-id dataset
935    /// (`Error::internal("Missing row id meta")` from
936    /// `dataset/rowids.rs:22`). The transaction's internal fragment copy
937    /// stays untouched — Lance assigns IDs there independently at commit
938    /// time, and the two ID assignments don't have to agree because no
939    /// caller threads `_rowid` from the staged scan into the commit
940    /// path.
941    ///
942    /// **Contract: `prior_stages` must contain only previous
943    /// `stage_append` results against the same dataset.** Mixing
944    /// stage_merge_insert into `prior_stages` would over-count because
945    /// merge_insert's `new_fragments` include rewrites that don't add
946    /// rows. The engine's parse-time D₂′ check (per touched table: all
947    /// stage_append OR exactly one stage_merge_insert) guarantees this
948    /// upstream; on the primitive layer it's the caller's responsibility.
949    pub async fn stage_append(
950        &self,
951        ds: &Dataset,
952        batch: RecordBatch,
953        prior_stages: &[StagedWrite],
954    ) -> Result<StagedWrite> {
955        if batch.num_rows() == 0 {
956            return Err(OmniError::manifest_internal(
957                "stage_append called with empty batch".to_string(),
958            ));
959        }
960        let appended_rows = batch.num_rows() as u64;
961        let params = WriteParams {
962            mode: WriteMode::Append,
963            allow_external_blob_outside_bases: true,
964            auto_cleanup: None,
965            skip_auto_cleanup: true,
966            ..Default::default()
967        };
968        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
969            .with_params(&params)
970            .execute_uncommitted(vec![batch])
971            .await
972            .map_err(|e| OmniError::Lance(e.to_string()))?;
973        // Record only after the staging write succeeds, so a failed write does
974        // not inflate the probe (matches `stage_append_stream`'s ordering).
975        crate::instrumentation::record_stage_append(appended_rows);
976        let mut new_fragments = match &transaction.operation {
977            Operation::Append { fragments } => fragments.clone(),
978            Operation::Overwrite { fragments, .. } => fragments.clone(),
979            other => {
980                return Err(OmniError::manifest_internal(format!(
981                    "stage_append: unexpected Lance operation {:?}",
982                    std::mem::discriminant(other)
983                )));
984            }
985        };
986        // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
987        // returns fragments with `id = 0` ("Temporary ID" — see lance-6.0.1
988        // `dataset/write.rs:1044/1712`); the real assignment happens during
989        // commit via `Transaction::fragments_with_ids`. Because we expose
990        // these fragments to `scan_with_staged` *before* commit, two staged
991        // fragments (or one staged + the seed) would collide on `id = 0`,
992        // causing Lance's scanner to mishandle the combined list (silent
993        // duplicates / dropped rows). Mirror the commit-time renumbering
994        // here, using `ds.manifest.max_fragment_id() + 1` as the base and
995        // accounting for prior stages.
996        // ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
997        // Lance's Fragment::id (and the commit-time renumbering counter in
998        // Transaction::fragments_with_ids) operate on u64.
999        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
1000            + 1
1001            + prior_stages_fragment_count(prior_stages);
1002        assign_fragment_ids(&mut new_fragments, next_id_base);
1003        if ds.manifest.uses_stable_row_ids() {
1004            let prior_rows = prior_stages_row_count(prior_stages)?;
1005            let start_row_id = ds.manifest.next_row_id + prior_rows;
1006            assign_row_id_meta(&mut new_fragments, start_row_id)?;
1007        }
1008        Ok(StagedWrite {
1009            transaction,
1010            new_fragments,
1011            // Append never supersedes existing fragments.
1012            removed_fragment_ids: Vec::new(),
1013        })
1014    }
1015
1016    /// Streaming variant of [`Self::stage_append`]: appends the rows of `source`
1017    /// into `ds` without materializing them in memory. It scans `source` lazily
1018    /// (`scan_stream_for_rewrite`) and hands the stream to Lance's
1019    /// `execute_uncommitted_stream`, which rolls fragments at `max_rows_per_file`
1020    /// — bounded memory, one Append transaction. This is the substrate-blessed
1021    /// bulk-append path (the same one LanceDB's `Table::add` uses). Identical
1022    /// fragment-id / stable-row-id staging as `stage_append`.
1023    ///
1024    /// TRANSITIONAL caller — its only caller is the row-level merge append
1025    /// (`publish_adopted_delta`, see `AdoptDelta`), which the fragment-adopt work
1026    /// (Lance #7263/#7185) removes: a fragment graft re-appends no rows. This
1027    /// primitive and `scan_stream_for_rewrite` are then dead unless re-adopted as
1028    /// a general bulk-append path (the `Table::add` shape makes that plausible).
1029    pub async fn stage_append_stream(
1030        &self,
1031        ds: &Dataset,
1032        source: &Dataset,
1033        prior_stages: &[StagedWrite],
1034    ) -> Result<StagedWrite> {
1035        let stream = self.scan_stream_for_rewrite(source).await?;
1036        let params = WriteParams {
1037            mode: WriteMode::Append,
1038            allow_external_blob_outside_bases: true,
1039            auto_cleanup: None,
1040            skip_auto_cleanup: true,
1041            ..Default::default()
1042        };
1043        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1044            .with_params(&params)
1045            .execute_uncommitted_stream(stream)
1046            .await
1047            .map_err(|e| OmniError::Lance(e.to_string()))?;
1048        let mut new_fragments = match &transaction.operation {
1049            Operation::Append { fragments } => fragments.clone(),
1050            Operation::Overwrite { fragments, .. } => fragments.clone(),
1051            other => {
1052                return Err(OmniError::manifest_internal(format!(
1053                    "stage_append_stream: unexpected Lance operation {:?}",
1054                    std::mem::discriminant(other)
1055                )));
1056            }
1057        };
1058        let appended_rows: u64 = new_fragments
1059            .iter()
1060            .filter_map(|f| f.physical_rows)
1061            .map(|r| r as u64)
1062            .sum();
1063        crate::instrumentation::record_stage_append(appended_rows);
1064        // Same commit-time fragment-id / row-id renumbering as `stage_append`.
1065        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
1066            + 1
1067            + prior_stages_fragment_count(prior_stages);
1068        assign_fragment_ids(&mut new_fragments, next_id_base);
1069        if ds.manifest.uses_stable_row_ids() {
1070            let prior_rows = prior_stages_row_count(prior_stages)?;
1071            let start_row_id = ds.manifest.next_row_id + prior_rows;
1072            assign_row_id_meta(&mut new_fragments, start_row_id)?;
1073        }
1074        Ok(StagedWrite {
1075            transaction,
1076            new_fragments,
1077            removed_fragment_ids: Vec::new(),
1078        })
1079    }
1080
1081    /// Stage a merge_insert (upsert): write fragment files describing the
1082    /// merge result, return the uncommitted transaction plus the new
1083    /// fragments. The transaction's `Operation::Update` carries the
1084    /// fragments-to-remove and fragments-to-add; for read-your-writes we
1085    /// expose `new_fragments` (rows that will be visible after commit).
1086    ///
1087    /// **Contract: do not chain `stage_merge_insert` calls on the same
1088    /// table within one query.** Each call's `MergeInsertBuilder` runs
1089    /// against the supplied dataset's committed view — it does not see
1090    /// fragments produced by a previous staged merge on the same table.
1091    /// Two chained `stage_merge_insert`s whose source rows share keys will
1092    /// each independently produce `Operation::Update` transactions whose
1093    /// `new_fragments` contain a row for the shared key. `scan_with_staged`
1094    /// (and `count_rows_with_staged`) will then return both — i.e.
1095    /// **duplicates by key**.
1096    ///
1097    /// This is intrinsic to the underlying Lance API: there is no public
1098    /// way to make `MergeInsertBuilder` see uncommitted fragments. The
1099    /// engine's `MutationStaging` accumulator works around this by
1100    /// concatenating per-table batches in memory and issuing exactly
1101    /// one `stage_merge_insert` per touched table at end-of-query (with
1102    /// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
1103    /// callers of this primitive must respect the contract themselves.
1104    ///
1105    /// Lift path: either a Lance API extension that lets
1106    /// `MergeInsertBuilder` accept additional staged fragments, or an
1107    /// in-memory pre-merge here that folds prior staged batches into the
1108    /// input stream. See `docs/dev/writes.md`.
1109    pub async fn stage_merge_insert(
1110        &self,
1111        ds: Dataset,
1112        batch: RecordBatch,
1113        key_columns: Vec<String>,
1114        when_matched: WhenMatched,
1115        when_not_matched: WhenNotMatched,
1116    ) -> Result<StagedWrite> {
1117        if batch.num_rows() == 0 {
1118            return Err(OmniError::manifest_internal(
1119                "stage_merge_insert called with empty batch".to_string(),
1120            ));
1121        }
1122        let merged_rows = batch.num_rows() as u64;
1123
1124        // Precondition for the FirstSeen workaround below: every call path that
1125        // reaches stage_merge_insert (load, MutationStaging::finalize,
1126        // branch_merge::publish_rewritten_merge_table) must hand in a source
1127        // batch that is unique by `key_columns`. Without this check,
1128        // `SourceDedupeBehavior::FirstSeen` would silently collapse genuine
1129        // duplicates instead of erroring.
1130        check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
1131
1132        let ds = Arc::new(ds);
1133        let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
1134            .map_err(|e| OmniError::Lance(e.to_string()))?;
1135        builder.when_matched(when_matched);
1136        builder.when_not_matched(when_not_matched);
1137        // Workaround for a Lance bug class where sequential merge_insert calls
1138        // against rows previously rewritten by merge_insert produce a spurious
1139        // "Ambiguous merge inserts: multiple source rows match the same target
1140        // row on (id = ...)" error. Lance's `processed_row_ids:
1141        // Mutex<HashSet<u64>>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`)
1142        // double-processes the same source/target match against datasets
1143        // previously rewritten by merge_insert, and the default
1144        // `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen
1145        // makes Lance skip the duplicate match instead. Correctness-preserving
1146        // because every call path pre-dedupes the source batch by id or surfaces
1147        // a real source dup via `check_batch_unique_by_keys` above (load:
1148        // `enforce_unique_constraints_intra_batch`; mutate:
1149        // `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor`
1150        // walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug
1151        // class. Tracked at MR-957; upstream: lance-format/lance#6877.
1152        builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1153        let job = builder
1154            .try_build()
1155            .map_err(|e| OmniError::Lance(e.to_string()))?;
1156        let schema = batch.schema();
1157        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
1158        let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
1159        let uncommitted = job
1160            .execute_uncommitted(stream)
1161            .await
1162            .map_err(|e| OmniError::Lance(e.to_string()))?;
1163        // Record only after the staging write succeeds, so a failed write does
1164        // not inflate the probe (matches `stage_append`/`stage_append_stream`).
1165        crate::instrumentation::record_stage_merge_insert(merged_rows);
1166        // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
1167        // `new_fragments` are the freshly inserted rows; `updated_fragments`
1168        // are rewrites of existing fragments that include both retained and
1169        // updated rows; `removed_fragment_ids` lists the committed-manifest
1170        // fragments that those rewrites supersede. For read-your-writes we
1171        // expose `updated_fragments + new_fragments` and the
1172        // `removed_fragment_ids` so `scan_with_staged` can filter the
1173        // superseded committed fragments before combining — otherwise a
1174        // single merge_insert appears as duplicate rows (original committed
1175        // version + rewritten staged version).
1176        let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
1177            Operation::Update {
1178                new_fragments,
1179                updated_fragments,
1180                removed_fragment_ids,
1181                ..
1182            } => {
1183                let mut all = updated_fragments.clone();
1184                all.extend(new_fragments.iter().cloned());
1185                (all, removed_fragment_ids.clone())
1186            }
1187            Operation::Append { fragments } => (fragments.clone(), Vec::new()),
1188            other => {
1189                return Err(OmniError::manifest_internal(format!(
1190                    "stage_merge_insert: unexpected Lance operation {:?}",
1191                    std::mem::discriminant(other)
1192                )));
1193            }
1194        };
1195        Ok(StagedWrite {
1196            transaction: uncommitted.transaction,
1197            new_fragments,
1198            removed_fragment_ids,
1199        })
1200    }
1201
1202    /// Commit a previously-staged transaction onto `ds`, returning the new
1203    /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
1204    /// the publisher at end-of-query to materialize all staged writes before
1205    /// the meta-manifest commit.
1206    pub async fn commit_staged(
1207        &self,
1208        ds: Arc<Dataset>,
1209        transaction: Transaction,
1210    ) -> Result<Dataset> {
1211        // Skip Lance's auto-cleanup hook on every commit. OmniGraph owns version
1212        // GC explicitly (optimize.rs::cleanup_all_tables); Lance's hook fires off
1213        // the *dataset's stored* `lance.auto_cleanup.*` config, which graphs
1214        // created before the v7 bump (6.0.1 defaulted auto_cleanup ON) still
1215        // carry — so `WriteParams::auto_cleanup = None` alone does NOT stop it on
1216        // upgraded graphs. Skipping here covers the staged write path (the main
1217        // data path) for new and legacy datasets alike, preventing Lance from
1218        // GC'ing versions the __manifest still pins for snapshots/time-travel.
1219        CommitBuilder::new(ds)
1220            .with_skip_auto_cleanup(true)
1221            .execute(transaction)
1222            .await
1223            .map_err(|e| OmniError::Lance(e.to_string()))
1224    }
1225
1226    /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
1227    /// Returns a StagedWrite carrying the replacement fragments. HEAD does
1228    /// NOT advance.
1229    ///
1230    /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
1231    /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
1232    /// `Operation::Overwrite` carries the new schema + fragments. The
1233    /// transaction is committed via `commit_staged` (same call as
1234    /// `stage_append`).
1235    ///
1236    /// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
1237    /// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
1238    pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
1239        // `enable_stable_row_ids: true` is defensive — empirically Lance 6.0.1
1240        // preserves the source dataset's flag through `Operation::Overwrite`
1241        // when WriteParams omits it (pinned by
1242        // `stage_overwrite_preserves_stable_row_ids` in tests/staged_writes.rs),
1243        // but setting it explicitly keeps the invariant documented at every Overwrite site
1244        // (see docs/storage.md "Stable row IDs"). Setting it on an existing
1245        // dataset that was created without stable row IDs is a no-op per
1246        // Lance's row-id-lineage spec, so this stays correct for legacy
1247        // datasets.
1248        let (transaction, mut new_fragments) = if batch.num_rows() == 0 {
1249            let schema = LanceSchema::try_from(batch.schema().as_ref())
1250                .map_err(|e| OmniError::Lance(e.to_string()))?;
1251            let transaction = TransactionBuilder::new(
1252                ds.manifest.version,
1253                Operation::Overwrite {
1254                    fragments: Vec::new(),
1255                    schema,
1256                    config_upsert_values: None,
1257                    initial_bases: None,
1258                },
1259            )
1260            .build();
1261            (transaction, Vec::new())
1262        } else {
1263            let params = WriteParams {
1264                mode: WriteMode::Overwrite,
1265                enable_stable_row_ids: true,
1266                allow_external_blob_outside_bases: true,
1267                auto_cleanup: None,
1268                skip_auto_cleanup: true,
1269                ..Default::default()
1270            };
1271            let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1272                .with_params(&params)
1273                .execute_uncommitted(vec![batch])
1274                .await
1275                .map_err(|e| OmniError::Lance(e.to_string()))?;
1276            let new_fragments = match &transaction.operation {
1277                Operation::Overwrite { fragments, .. } => fragments.clone(),
1278                other => {
1279                    return Err(OmniError::manifest_internal(format!(
1280                        "stage_overwrite: unexpected Lance operation {:?}",
1281                        std::mem::discriminant(other)
1282                    )));
1283                }
1284            };
1285            (transaction, new_fragments)
1286        };
1287        // Overwrite REPLACES every committed fragment, and Lance restarts
1288        // fragment-ID and row-ID counters at the post-commit version.
1289        // For our pre-commit staged view we need to:
1290        //   1) Renumber temporary fragment IDs (Lance returns them as
1291        //      `id = 0` from `execute_uncommitted` — see stage_append
1292        //      for the same fix). For Overwrite there are no committed
1293        //      fragments to collide with (they're all in
1294        //      removed_fragment_ids below), so start at 1.
1295        //   2) For stable-row-id datasets, assign row_id_meta starting
1296        //      at 0 (Overwrite is a fresh-start) so `scan_with_staged`
1297        //      doesn't hit the "Missing row id meta" panic in
1298        //      lance-6.0.1 dataset/rowids.rs:22.
1299        assign_fragment_ids(&mut new_fragments, 1);
1300        if ds.manifest.uses_stable_row_ids() {
1301            assign_row_id_meta(&mut new_fragments, 0)?;
1302        }
1303        // Overwrite REPLACES every committed fragment. For
1304        // read-your-writes via scan_with_staged, list every committed
1305        // fragment in removed_fragment_ids so the post-stage view shows
1306        // ONLY the staged fragments.
1307        let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
1308        Ok(StagedWrite {
1309            transaction,
1310            new_fragments,
1311            removed_fragment_ids,
1312        })
1313    }
1314
1315    /// Stage a BTREE scalar index build. Returns a StagedWrite whose
1316    /// transaction commits via `commit_staged`. HEAD does NOT advance.
1317    ///
1318    /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
1319    /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
1320    /// { new_indices, removed_indices }` via the public `TransactionBuilder`,
1321    /// replicating the simple (non-segment-commit-path) branch of Lance's
1322    /// `CreateIndexBuilder::execute` (lance-6.0.1 `src/index/create.rs:502-512`).
1323    ///
1324    /// `removed_indices` mirrors `execute()` lines 466-476: when the
1325    /// build replaces an existing same-named index, those entries are
1326    /// listed for tombstoning by the manifest commit.
1327    ///
1328    /// MR-793 Phase 2: scalar index types (BTree, Inverted) are
1329    /// stage-able. Vector indices are NOT (segment-commit-path requires
1330    /// `build_index_metadata_from_segments` which is `pub(crate)` in
1331    /// lance-6.0.1); see `create_vector_index` and Appendix A.3.
1332    pub async fn stage_create_btree_index(
1333        &self,
1334        ds: &Dataset,
1335        columns: &[&str],
1336    ) -> Result<StagedWrite> {
1337        let params = ScalarIndexParams::default();
1338        let mut ds_clone = ds.clone();
1339        let new_idx = ds_clone
1340            .create_index_builder(columns, IndexType::BTree, &params)
1341            .replace(true)
1342            .execute_uncommitted()
1343            .await
1344            .map_err(|e| OmniError::Lance(format!("stage_create_btree_index: {}", e)))?;
1345        let removed_indices: Vec<IndexMetadata> = ds
1346            .load_indices()
1347            .await
1348            .map_err(|e| OmniError::Lance(e.to_string()))?
1349            .iter()
1350            .filter(|idx| idx.name == new_idx.name)
1351            .cloned()
1352            .collect();
1353        let transaction = TransactionBuilder::new(
1354            new_idx.dataset_version,
1355            Operation::CreateIndex {
1356                new_indices: vec![new_idx],
1357                removed_indices,
1358            },
1359        )
1360        .build();
1361        Ok(StagedWrite {
1362            transaction,
1363            new_fragments: Vec::new(),
1364            removed_fragment_ids: Vec::new(),
1365        })
1366    }
1367
1368    /// Stage an INVERTED (FTS) scalar index build. Same shape as
1369    /// `stage_create_btree_index`; see its docs for the Lance API
1370    /// citation and contract notes.
1371    pub async fn stage_create_inverted_index(
1372        &self,
1373        ds: &Dataset,
1374        column: &str,
1375    ) -> Result<StagedWrite> {
1376        let params = InvertedIndexParams::default();
1377        let mut ds_clone = ds.clone();
1378        let new_idx = ds_clone
1379            .create_index_builder(&[column], IndexType::Inverted, &params)
1380            .replace(true)
1381            .execute_uncommitted()
1382            .await
1383            .map_err(|e| OmniError::Lance(format!("stage_create_inverted_index: {}", e)))?;
1384        let removed_indices: Vec<IndexMetadata> = ds
1385            .load_indices()
1386            .await
1387            .map_err(|e| OmniError::Lance(e.to_string()))?
1388            .iter()
1389            .filter(|idx| idx.name == new_idx.name)
1390            .cloned()
1391            .collect();
1392        let transaction = TransactionBuilder::new(
1393            new_idx.dataset_version,
1394            Operation::CreateIndex {
1395                new_indices: vec![new_idx],
1396                removed_indices,
1397            },
1398        )
1399        .build();
1400        Ok(StagedWrite {
1401            transaction,
1402            new_fragments: Vec::new(),
1403            removed_fragment_ids: Vec::new(),
1404        })
1405    }
1406
1407    /// Run a scan with optional uncommitted staged writes visible
1408    /// alongside the committed snapshot. When `staged` is empty this is
1409    /// identical to `scan(...)`.
1410    ///
1411    /// Composes the visible fragment list as `committed - removed + new`:
1412    /// the committed manifest's fragments, minus any fragment IDs that
1413    /// staged `Operation::Update`s (merge_insert rewrites) have superseded,
1414    /// plus the staged new/updated fragments. Without the `removed`
1415    /// filter, a merge_insert that rewrites an existing fragment would
1416    /// surface twice — once via the original committed fragment, once via
1417    /// the rewrite in `new_fragments`.
1418    ///
1419    /// **Filter contract is incomplete on staged fragments.** When `filter`
1420    /// is `Some(...)`, Lance pushes the predicate to per-fragment scans
1421    /// with stats-based pruning. Uncommitted fragments produced by
1422    /// `write_fragments_internal` lack the per-column statistics that
1423    /// committed fragments carry; Lance's optimizer drops them from the
1424    /// filtered scan even when their data would match. Staged-fragment
1425    /// rows are silently absent from the result. `scanner.use_stats(false)`
1426    /// does not fix this in lance 6.0.1. Callers needing correct filtered
1427    /// reads against staged data should use a different strategy — the
1428    /// engine's `MutationStaging` accumulator unions in-memory pending
1429    /// batches with the committed scan via DataFusion `MemTable` (see
1430    /// `scan_with_pending`).
1431    ///
1432    /// This method remains on the surface for primitive-level testing
1433    /// (basic stage + scan correctness without filters works) and for
1434    /// callers that don't need filter pushdown.
1435    pub async fn scan_with_staged(
1436        &self,
1437        ds: &Dataset,
1438        staged: &[StagedWrite],
1439        projection: Option<&[&str]>,
1440        filter: Option<&str>,
1441    ) -> Result<Vec<RecordBatch>> {
1442        if staged.is_empty() {
1443            return self.scan(ds, projection, filter, None).await;
1444        }
1445        let mut scanner = ds.scan();
1446        if let Some(cols) = projection {
1447            let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
1448            scanner
1449                .project(&owned)
1450                .map_err(|e| OmniError::Lance(e.to_string()))?;
1451        }
1452        if let Some(f) = filter {
1453            scanner
1454                .filter(f)
1455                .map_err(|e| OmniError::Lance(e.to_string()))?;
1456        }
1457        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1458        let stream = scanner
1459            .try_into_stream()
1460            .await
1461            .map_err(|e| OmniError::Lance(e.to_string()))?;
1462        stream
1463            .try_collect()
1464            .await
1465            .map_err(|e| OmniError::Lance(e.to_string()))
1466    }
1467
1468    /// Scan committed via Lance + apply the same filter to in-memory
1469    /// pending batches via DataFusion `MemTable`, concat the two result
1470    /// streams. The replacement for `scan_with_staged` in engine code:
1471    /// the staged-write writer accumulates input batches in memory and
1472    /// unions them with the committed snapshot at read time,
1473    /// sidestepping the `Scanner::with_fragments` filter-pushdown
1474    /// limitation documented on `scan_with_staged`.
1475    ///
1476    /// `committed_ds` should be opened at the pre-mutation
1477    /// `expected_version` (the same version captured in `MutationStaging::expected_versions`
1478    /// at first touch of the table). `pending_batches` are the per-table
1479    /// accumulator's batches in their input shape. `pending_schema` is
1480    /// the schema of the accumulated batches; passing `None` falls back
1481    /// to the schema of the first pending batch.
1482    ///
1483    /// `filter` is the Lance / DataFusion SQL predicate. It is applied
1484    /// to both sides — Lance pushes it down on the committed side; the
1485    /// pending side runs it through a fresh DataFusion `SessionContext`
1486    /// with the batches registered as a `MemTable` named `pending`.
1487    ///
1488    /// `key_column` controls how committed and pending are unioned:
1489    /// - **`None` (union semantics)**: every committed row that matches
1490    ///   the filter and every pending row that matches the filter is
1491    ///   returned. Correct when committed and pending cannot share a
1492    ///   primary key — e.g., Append-mode loads with ULID-generated ids,
1493    ///   or any read where pending hasn't been used to update committed
1494    ///   rows.
1495    /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
1496    ///   `col` value appears in any pending batch are EXCLUDED from the
1497    ///   result; only pending's view of those rows is returned. Required
1498    ///   for Merge-mode reads (e.g., `execute_update` on the engine path)
1499    ///   so a chained `update` doesn't see stale committed values that
1500    ///   a prior op already updated in pending. Without this, a predicate
1501    ///   like `where age > 30` can match a row that an earlier
1502    ///   `set age = 20` already moved out of range.
1503    ///
1504    /// When `pending_batches` is empty this delegates to the regular
1505    /// scan path.
1506    pub async fn scan_with_pending(
1507        &self,
1508        committed_ds: &Dataset,
1509        pending_batches: &[RecordBatch],
1510        pending_schema: Option<SchemaRef>,
1511        projection: Option<&[&str]>,
1512        filter: Option<&str>,
1513        key_column: Option<&str>,
1514    ) -> Result<Vec<RecordBatch>> {
1515        // Contract: when merge-shadow semantics are requested via
1516        // `key_column`, the committed-side projection MUST include that
1517        // column so we can filter committed rows whose key appears in
1518        // pending. Silently dropping the shadow when projection omits
1519        // the key would re-introduce union semantics behind the
1520        // caller's back. Reject up front with a clear error so callers
1521        // either (a) include the key in projection or (b) drop
1522        // `key_column` if union is what they wanted.
1523        if let (Some(key_col), Some(cols)) = (key_column, projection) {
1524            if !cols.iter().any(|c| *c == key_col) {
1525                return Err(OmniError::Lance(format!(
1526                    "scan_with_pending: key_column '{}' must appear in projection \
1527                     when merge-shadow semantics are requested (got projection = {:?})",
1528                    key_col, cols
1529                )));
1530            }
1531        }
1532
1533        let committed = self.scan(committed_ds, projection, filter, None).await?;
1534        if pending_batches.is_empty() {
1535            return Ok(committed);
1536        }
1537
1538        // Shadow committed rows whose key value also appears in pending.
1539        // This makes scan_with_pending implement merge semantics rather
1540        // than naive union: any row that has a pending update is
1541        // represented ONLY by its pending value, never by both its
1542        // (stale) committed value and its (current) pending value.
1543        let committed = match key_column {
1544            Some(key_col) => {
1545                let pending_keys = collect_string_column_values(pending_batches, key_col)?;
1546                if pending_keys.is_empty() {
1547                    committed
1548                } else {
1549                    filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
1550                }
1551            }
1552            None => committed,
1553        };
1554
1555        let pending =
1556            scan_pending_batches(pending_batches, pending_schema, projection, filter).await?;
1557
1558        let mut out = committed;
1559        out.extend(pending);
1560        Ok(out)
1561    }
1562
1563    /// `count_rows` variant that respects staged writes. Used for
1564    /// edge-cardinality validation that needs to see staged edges before
1565    /// commit. Same `committed - removed + new` composition as
1566    /// `scan_with_staged`.
1567    pub async fn count_rows_with_staged(
1568        &self,
1569        ds: &Dataset,
1570        staged: &[StagedWrite],
1571        filter: Option<String>,
1572    ) -> Result<usize> {
1573        if staged.is_empty() {
1574            return self.count_rows(ds, filter).await;
1575        }
1576        let mut scanner = ds.scan();
1577        if let Some(f) = filter {
1578            scanner
1579                .filter(&f)
1580                .map_err(|e| OmniError::Lance(e.to_string()))?;
1581        }
1582        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1583        let count = scanner
1584            .count_rows()
1585            .await
1586            .map_err(|e| OmniError::Lance(e.to_string()))?;
1587        Ok(count as usize)
1588    }
1589
1590    async fn user_indices_for_column(
1591        &self,
1592        ds: &Dataset,
1593        column: &str,
1594    ) -> Result<Vec<IndexMetadata>> {
1595        let field_id = ds
1596            .schema()
1597            .field(column)
1598            .map(|field| field.id)
1599            .ok_or_else(|| {
1600                OmniError::manifest_internal(format!(
1601                    "dataset is missing expected index column '{}'",
1602                    column
1603                ))
1604            })?;
1605        let indices = ds
1606            .load_indices()
1607            .await
1608            .map_err(|e| OmniError::Lance(e.to_string()))?;
1609        Ok(indices
1610            .iter()
1611            .filter(|index| !is_system_index(index))
1612            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
1613            .cloned()
1614            .collect())
1615    }
1616
1617    pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1618        let indices = self.user_indices_for_column(ds, column).await?;
1619        Ok(indices.iter().any(|index| {
1620            index
1621                .index_details
1622                .as_ref()
1623                .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
1624                .unwrap_or(false)
1625        }))
1626    }
1627
1628    pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1629        let indices = self.user_indices_for_column(ds, column).await?;
1630        Ok(indices.iter().any(|index| {
1631            index
1632                .index_details
1633                .as_ref()
1634                .map(|details| IndexDetails(details.clone()).supports_fts())
1635                .unwrap_or(false)
1636        }))
1637    }
1638
1639    pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1640        let indices = self.user_indices_for_column(ds, column).await?;
1641        Ok(indices.iter().any(|index| {
1642            index
1643                .index_details
1644                .as_ref()
1645                .map(|details| IndexDetails(details.clone()).is_vector())
1646                .unwrap_or(false)
1647        }))
1648    }
1649
1650    pub(crate) async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1651        let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
1652        ds.create_index_builder(&[column], IndexType::Vector, &params)
1653            .replace(true)
1654            .await
1655            .map_err(|e| OmniError::Lance(e.to_string()))?;
1656        // Record only after the index build succeeds, so a failed build does not
1657        // inflate the probe (matches the `stage_*` probes).
1658        crate::instrumentation::record_create_vector_index();
1659        Ok(())
1660    }
1661
1662    pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
1663        let batch = RecordBatch::new_empty(schema.clone());
1664        Self::write_dataset(dataset_uri, batch).await
1665    }
1666
1667    pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
1668        let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
1669            .await?
1670            .try_collect::<Vec<RecordBatch>>()
1671            .await
1672            .map_err(|e| OmniError::Lance(e.to_string()))?;
1673        Ok(batches.iter().find_map(|batch| {
1674            batch
1675                .column_by_name("_rowid")
1676                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1677                .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
1678        }))
1679    }
1680
1681    pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
1682        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
1683        let params = WriteParams {
1684            mode: WriteMode::Create,
1685            enable_stable_row_ids: true,
1686            data_storage_version: Some(LanceFileVersion::V2_2),
1687            allow_external_blob_outside_bases: true,
1688            auto_cleanup: None,
1689            skip_auto_cleanup: true,
1690            ..Default::default()
1691        };
1692        Dataset::write(reader, dataset_uri, Some(params))
1693            .await
1694            .map_err(|e| OmniError::Lance(e.to_string()))
1695    }
1696}
1697
1698/// Build the `Scanner::with_fragments` argument for read-your-writes:
1699/// committed manifest fragments minus any fragment IDs superseded by the
1700/// staged writes, plus the staged `new_fragments`. Order is:
1701///   1. committed fragments whose IDs are NOT in any staged
1702///      `removed_fragment_ids` (preserves committed order),
1703///   2. all staged `new_fragments` in stage order.
1704///
1705/// Lance's `Scanner` does not require any particular ordering between
1706/// committed and staged fragments — `with_fragments` scopes the scan to
1707/// exactly the supplied list. The dedup matters because merge_insert
1708/// rewrites a fragment in place at the Lance layer: the rewritten
1709/// fragment is in `new_fragments`, the original (which it supersedes) is
1710/// in `committed` until manifest commit, and including both would yield
1711/// duplicate rows.
1712///
1713/// **Inter-stage supersession is not handled here.** Each StagedWrite's
1714/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
1715/// later staged merge cannot know about an earlier staged merge's
1716/// fragments (Lance's `MergeInsertBuilder` runs against the committed
1717/// view). If two `stage_merge_insert`s on the same table produce rows
1718/// with the same key, the combined view returns duplicates by key. The
1719/// engine's mutation path enforces "per touched table: all stage_append
1720/// OR exactly one stage_merge_insert" at parse time (D₂′ in
1721/// `exec/mutation.rs`) so this primitive's caller never chains merges.
1722/// See `stage_merge_insert` for the full contract.
1723/// Sum `physical_rows` across all fragments in the supplied stages.
1724/// Used by `stage_append` to compute the row-ID offset for chained
1725/// `stage_append` calls against the same dataset.
1726///
1727/// Assumes `prior_stages` contains only `stage_append` results — see
1728/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the
1729/// `new_fragments` include rewrites that don't add new rows, so this
1730/// would over-count.
1731fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
1732    prior_stages
1733        .iter()
1734        .map(|s| s.new_fragments.len() as u64)
1735        .sum()
1736}
1737
1738/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
1739/// commit-time `Transaction::fragments_with_ids` (lance-6.0.1
1740/// `dataset/transaction.rs:1456`) — fragments produced by
1741/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
1742/// placeholder; we renumber here so they don't collide with committed
1743/// fragments (or with each other across chained stages) when the slice is
1744/// passed to `Scanner::with_fragments`.
1745fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
1746    for (i, fragment) in fragments.iter_mut().enumerate() {
1747        if fragment.id == 0 {
1748            fragment.id = start_id + i as u64;
1749        }
1750    }
1751}
1752
1753fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
1754    let mut total: u64 = 0;
1755    for stage in prior_stages {
1756        for fragment in &stage.new_fragments {
1757            let physical_rows = fragment.physical_rows.ok_or_else(|| {
1758                OmniError::manifest_internal(
1759                    "prior_stages_row_count: fragment is missing physical_rows".to_string(),
1760                )
1761            })? as u64;
1762            total += physical_rows;
1763        }
1764    }
1765    Ok(total)
1766}
1767
1768/// Assign sequential row IDs to fragments that lack them, starting from
1769/// `start_row_id`. Mirrors the relevant arm of Lance's
1770/// `Transaction::assign_row_ids` (lance-6.0.1 `dataset/transaction.rs:2682`)
1771/// for the `row_id_meta = None` case — fragments produced by
1772/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
1773///
1774/// Used only by `stage_append` for read-your-writes — see its docstring
1775/// for why pre-commit assignment is needed and why diverging from Lance's
1776/// commit-time IDs is safe.
1777fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
1778    let mut next_row_id = start_row_id;
1779    for fragment in fragments {
1780        if fragment.row_id_meta.is_some() {
1781            continue;
1782        }
1783        let physical_rows = fragment.physical_rows.ok_or_else(|| {
1784            OmniError::manifest_internal(
1785                "stage_append: fragment is missing physical_rows".to_string(),
1786            )
1787        })? as u64;
1788        let row_ids = next_row_id..(next_row_id + physical_rows);
1789        let sequence = RowIdSequence::from(row_ids);
1790        let serialized = write_row_ids(&sequence);
1791        fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
1792        next_row_id += physical_rows;
1793    }
1794    Ok(())
1795}
1796
1797/// Collect the set of values in a Utf8 column across multiple batches.
1798/// Used by `scan_with_pending`'s merge-semantic path to identify
1799/// committed rows that are shadowed by pending writes. NULL values are
1800/// skipped.
1801fn collect_string_column_values(
1802    batches: &[RecordBatch],
1803    column: &str,
1804) -> Result<std::collections::HashSet<String>> {
1805    use arrow_array::{Array, StringArray};
1806    let mut out = std::collections::HashSet::new();
1807    for batch in batches {
1808        let Some(col) = batch.column_by_name(column) else {
1809            return Err(OmniError::Lance(format!(
1810                "scan_with_pending: pending batch missing key column '{}'",
1811                column
1812            )));
1813        };
1814        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1815            OmniError::Lance(format!(
1816                "scan_with_pending: key column '{}' is not Utf8",
1817                column
1818            ))
1819        })?;
1820        for i in 0..arr.len() {
1821            if arr.is_valid(i) {
1822                out.insert(arr.value(i).to_string());
1823            }
1824        }
1825    }
1826    Ok(out)
1827}
1828
1829/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
1830/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
1831/// rows that pending has already updated. Returns the surviving rows.
1832///
1833/// `scan_with_pending` validates up front that the projection contains
1834/// `column`, so a missing column here is a programmer error — error
1835/// loudly instead of silently passing batches through (which would
1836/// re-introduce the union semantics the caller asked us to avoid).
1837fn filter_out_rows_where_string_in(
1838    batches: Vec<RecordBatch>,
1839    column: &str,
1840    excluded: &std::collections::HashSet<String>,
1841) -> Result<Vec<RecordBatch>> {
1842    use arrow_array::{Array, BooleanArray, StringArray};
1843    let mut out = Vec::with_capacity(batches.len());
1844    for batch in batches {
1845        if batch.num_rows() == 0 {
1846            out.push(batch);
1847            continue;
1848        }
1849        let col = batch.column_by_name(column).ok_or_else(|| {
1850            OmniError::manifest_internal(format!(
1851                "scan_with_pending: committed batch missing key column '{}' \
1852                 (the up-front projection check should have rejected this)",
1853                column
1854            ))
1855        })?;
1856        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1857            OmniError::Lance(format!(
1858                "scan_with_pending: committed column '{}' is not Utf8",
1859                column
1860            ))
1861        })?;
1862        let mask: BooleanArray = (0..arr.len())
1863            .map(|i| {
1864                if arr.is_valid(i) {
1865                    Some(!excluded.contains(arr.value(i)))
1866                } else {
1867                    Some(true)
1868                }
1869            })
1870            .collect();
1871        let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
1872            .map_err(|e| OmniError::Lance(e.to_string()))?;
1873        out.push(filtered);
1874    }
1875    Ok(out)
1876}
1877
1878/// Apply `projection` and `filter` to in-memory pending batches via a
1879/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
1880/// the read-your-writes side of the in-memory staging accumulator.
1881///
1882/// `pending_batches` must be non-empty (the caller short-circuits on
1883/// empty).
1884///
1885/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
1886/// on the committed side. Lance and DataFusion both accept standard
1887/// SQL comparison predicates (`col op literal`) and OmniGraph's
1888/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
1889/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
1890/// scanner extension (vector search, FTS, `_rowid` references) into
1891/// the filter, this function will need explicit translation — DataFusion
1892/// won't recognize those operators against the in-memory `MemTable`.
1893async fn scan_pending_batches(
1894    pending_batches: &[RecordBatch],
1895    pending_schema: Option<SchemaRef>,
1896    projection: Option<&[&str]>,
1897    filter: Option<&str>,
1898) -> Result<Vec<RecordBatch>> {
1899    let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
1900    // #283: disable SQL identifier normalization so an unquoted camelCase
1901    // column in `filter` (e.g. `repoName = 'acme'`, emitted unquoted by
1902    // `predicate_to_sql` because the committed Lance scan needs it unquoted)
1903    // is matched case-preserving against the case-sensitive MemTable schema.
1904    // Without this, DataFusion lowercases `repoName` → `reponame` and fails to
1905    // resolve. Quoted identifiers (the projection list below) are unaffected.
1906    let mut config = datafusion::execution::context::SessionConfig::new();
1907    config.options_mut().sql_parser.enable_ident_normalization = false;
1908    let ctx = datafusion::execution::context::SessionContext::new_with_config(config);
1909    let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
1910        .map_err(|e| OmniError::Lance(e.to_string()))?;
1911    ctx.register_table("pending", Arc::new(mem))
1912        .map_err(|e| OmniError::Lance(e.to_string()))?;
1913
1914    let proj = projection
1915        .map(|cols| {
1916            cols.iter()
1917                .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
1918                .collect::<Vec<_>>()
1919                .join(", ")
1920        })
1921        .unwrap_or_else(|| "*".to_string());
1922    let where_clause = filter.map(|f| format!("WHERE {f}")).unwrap_or_default();
1923    let sql = format!("SELECT {proj} FROM pending {where_clause}");
1924    let df = ctx
1925        .sql(&sql)
1926        .await
1927        .map_err(|e| OmniError::Lance(e.to_string()))?;
1928    df.collect()
1929        .await
1930        .map_err(|e| OmniError::Lance(e.to_string()))
1931}
1932
1933fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
1934    let removed: std::collections::HashSet<u64> = staged
1935        .iter()
1936        .flat_map(|w| w.removed_fragment_ids.iter().copied())
1937        .collect();
1938    let mut combined: Vec<Fragment> = ds
1939        .manifest
1940        .fragments
1941        .iter()
1942        .filter(|f| !removed.contains(&f.id))
1943        .cloned()
1944        .collect();
1945    for write in staged {
1946        combined.extend(write.new_fragments.iter().cloned());
1947    }
1948    combined
1949}
1950
1951/// Precondition guard for `stage_merge_insert`.
1952/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
1953/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
1954/// collapse genuine duplicate source keys; this check restores fail-fast
1955/// behavior on real dups by erroring before the builder gets a chance to
1956/// silently skip them.
1957///
1958/// Today only single-column string keys are used at the call sites
1959/// (`vec!["id".to_string()]`). The check restricts itself to that shape
1960/// and surfaces an internal error if a future caller passes anything
1961/// else — keeping the assumption explicit instead of silently degrading.
1962fn check_batch_unique_by_keys(
1963    batch: &RecordBatch,
1964    key_columns: &[String],
1965    context: &'static str,
1966) -> Result<()> {
1967    if key_columns.len() != 1 {
1968        return Err(OmniError::manifest_internal(format!(
1969            "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}",
1970            context, key_columns
1971        )));
1972    }
1973    let key_col_name = &key_columns[0];
1974    let column = batch.column_by_name(key_col_name).ok_or_else(|| {
1975        OmniError::manifest_internal(format!(
1976            "{}: source batch missing key column '{}'",
1977            context, key_col_name
1978        ))
1979    })?;
1980    let strs = column
1981        .as_any()
1982        .downcast_ref::<StringArray>()
1983        .ok_or_else(|| {
1984            OmniError::manifest_internal(format!(
1985                "{}: key column '{}' is not a StringArray (got {:?})",
1986                context,
1987                key_col_name,
1988                column.data_type()
1989            ))
1990        })?;
1991
1992    let mut seen: std::collections::HashSet<&str> =
1993        std::collections::HashSet::with_capacity(batch.num_rows());
1994    for i in 0..strs.len() {
1995        if !strs.is_valid(i) {
1996            continue;
1997        }
1998        let v = strs.value(i);
1999        if !seen.insert(v) {
2000            return Err(OmniError::manifest(format!(
2001                "{}: duplicate source row for key '{}' (column '{}'); \
2002                 callers must hand in a batch unique by `key_columns` \
2003                 — see MR-957",
2004                context, v, key_col_name
2005            )));
2006        }
2007    }
2008    Ok(())
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013    use super::*;
2014    use arrow_array::StringArray;
2015    use arrow_schema::{DataType, Field, Schema};
2016
2017    fn batch_with_ids(ids: &[&str]) -> RecordBatch {
2018        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
2019        let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef;
2020        RecordBatch::try_new(schema, vec![col]).unwrap()
2021    }
2022
2023    #[test]
2024    fn check_batch_unique_by_keys_passes_when_all_unique() {
2025        let batch = batch_with_ids(&["a", "b", "c"]);
2026        check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap();
2027    }
2028
2029    #[test]
2030    fn check_batch_unique_by_keys_errors_on_duplicate_id() {
2031        let batch = batch_with_ids(&["a", "b", "a"]);
2032        let err = check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err();
2033        let msg = err.to_string();
2034        assert!(
2035            msg.contains("duplicate source row for key 'a'"),
2036            "unexpected error: {msg}"
2037        );
2038        assert!(
2039            msg.contains("MR-957"),
2040            "error should reference MR-957: {msg}"
2041        );
2042    }
2043
2044    #[test]
2045    fn check_batch_unique_by_keys_rejects_multi_column_keys() {
2046        let batch = batch_with_ids(&["a"]);
2047        let err =
2048            check_batch_unique_by_keys(&batch, &["id".to_string(), "other".to_string()], "test")
2049                .unwrap_err();
2050        assert!(err.to_string().contains("single-column keys only"));
2051    }
2052}