Skip to main content

omnigraph/
table_store.rs

1use arrow_array::{
2    Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
3};
4use arrow_schema::SchemaRef;
5use arrow_select::concat::concat_batches;
6use futures::TryStreamExt;
7use lance::Dataset;
8use lance::blob::BlobArrayBuilder;
9use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
10use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{
13    CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
14    WriteParams,
15};
16use lance::datatypes::BlobKind;
17use lance::index::DatasetIndexExt;
18use lance::index::scalar::IndexDetails;
19use lance_file::version::LanceFileVersion;
20use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
21use lance_index::{IndexType, is_system_index};
22use lance_linalg::distance::MetricType;
23use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
24use lance_table::rowids::{RowIdSequence, write_row_ids};
25use std::sync::Arc;
26
27use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
28use crate::db::{Snapshot, SubTableEntry};
29use crate::error::{OmniError, Result};
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct TableState {
33    pub version: u64,
34    pub row_count: u64,
35    pub(crate) version_metadata: TableVersionMetadata,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct DeleteState {
40    pub version: u64,
41    pub row_count: u64,
42    pub deleted_rows: usize,
43    pub(crate) version_metadata: TableVersionMetadata,
44}
45
46/// A Lance write that has produced fragment files on object storage but is
47/// not yet committed to the dataset's manifest. The staged-write primitives
48/// are consumed by `MutationStaging` (`exec/staging.rs`,
49/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
50/// intent: defer Lance commits to end-of-query so a mid-query failure
51/// leaves the touched table at the pre-mutation HEAD instead of
52/// drifting ahead. See `docs/runs.md` for the publisher-CAS contract
53/// this builds on.
54///
55/// `transaction` is opaque from our side — Lance owns its semantics. We
56/// commit it via `CommitBuilder::execute(transaction)` (see
57/// `TableStore::commit_staged`).
58///
59/// For read-your-writes within the same query, `new_fragments` and
60/// `removed_fragment_ids` together describe the post-stage view delta:
61/// `scan_with_staged` (and `count_rows_with_staged`) compose
62/// `committed - removed + new` so subsequent reads see the staged result
63/// without double-counting fragments that `Operation::Update` rewrote.
64/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
65/// existing fragments would yield duplicate rows (the original fragment
66/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
67#[derive(Debug, Clone)]
68pub struct StagedWrite {
69    pub transaction: Transaction,
70    /// Fragments to surface alongside the committed manifest in
71    /// `Scanner::with_fragments(committed - removed + new)`. For
72    /// `Operation::Append` these are the freshly-appended fragments. For
73    /// `Operation::Update` (merge_insert) these are
74    /// `updated_fragments + new_fragments` (rewrites + freshly-inserted
75    /// rows).
76    pub new_fragments: Vec<Fragment>,
77    /// Fragment IDs that this staged write supersedes. The committed
78    /// manifest must filter these out before being combined with
79    /// `new_fragments` for read-your-writes scans, otherwise rewrites
80    /// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
81    /// adds without removing anything); populated from
82    /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
83    pub removed_fragment_ids: Vec<u64>,
84}
85
86#[derive(Debug, Clone)]
87pub struct TableStore {
88    root_uri: String,
89}
90
91impl TableStore {
92    pub fn new(root_uri: &str) -> Self {
93        Self {
94            root_uri: root_uri.trim_end_matches('/').to_string(),
95        }
96    }
97
98    pub fn root_uri(&self) -> &str {
99        &self.root_uri
100    }
101
102    pub fn dataset_uri(&self, table_path: &str) -> String {
103        format!("{}/{}", self.root_uri, table_path)
104    }
105
106    fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
107        let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
108        let table_path = dataset_uri
109            .strip_prefix(&prefix)
110            .map(|path| path.to_string())
111            .ok_or_else(|| {
112                OmniError::manifest_internal(format!(
113                    "dataset uri '{}' is not under root '{}'",
114                    dataset_uri, self.root_uri
115                ))
116            })?;
117        Ok(table_path
118            .split_once("/tree/")
119            .map(|(path, _)| path.to_string())
120            .unwrap_or(table_path))
121    }
122
123    fn dataset_version_metadata(
124        &self,
125        dataset_uri: &str,
126        ds: &Dataset,
127    ) -> Result<TableVersionMetadata> {
128        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
129        TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
130    }
131
132    pub async fn open_snapshot_table(
133        &self,
134        snapshot: &Snapshot,
135        table_key: &str,
136    ) -> Result<Dataset> {
137        snapshot.open(table_key).await
138    }
139
140    pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
141        entry.open(&self.root_uri).await
142    }
143
144    pub async fn open_dataset_head(
145        &self,
146        dataset_uri: &str,
147        branch: Option<&str>,
148    ) -> Result<Dataset> {
149        let ds = Dataset::open(dataset_uri)
150            .await
151            .map_err(|e| OmniError::Lance(e.to_string()))?;
152        match branch {
153            Some(branch) if branch != "main" => ds
154                .checkout_branch(branch)
155                .await
156                .map_err(|e| OmniError::Lance(e.to_string())),
157            _ => Ok(ds),
158        }
159    }
160
161    pub async fn open_dataset_head_for_write(
162        &self,
163        table_key: &str,
164        dataset_uri: &str,
165        branch: Option<&str>,
166    ) -> Result<Dataset> {
167        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
168        open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
169    }
170
171    pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
172        let mut ds = Dataset::open(dataset_uri)
173            .await
174            .map_err(|e| OmniError::Lance(e.to_string()))?;
175        ds.delete_branch(branch)
176            .await
177            .map_err(|e| OmniError::Lance(e.to_string()))
178    }
179
180    pub async fn open_dataset_at_state(
181        &self,
182        table_path: &str,
183        branch: Option<&str>,
184        version: u64,
185    ) -> Result<Dataset> {
186        let ds = self
187            .open_dataset_head(&self.dataset_uri(table_path), branch)
188            .await?;
189        ds.checkout_version(version)
190            .await
191            .map_err(|e| OmniError::Lance(e.to_string()))
192    }
193
194    pub fn ensure_expected_version(
195        &self,
196        ds: &Dataset,
197        table_key: &str,
198        expected_version: u64,
199    ) -> Result<()> {
200        let actual = ds.version().version;
201        if actual != expected_version {
202            // Use the structured ExpectedVersionMismatch variant so callers
203            // (and the HTTP server) can match on details rather than parsing
204            // the message. This drift is a publisher-style OCC failure: the
205            // caller's pre-write view of the table version is stale relative
206            // to the on-disk Lance head.
207            return Err(OmniError::manifest_expected_version_mismatch(
208                table_key,
209                expected_version,
210                actual,
211            ));
212        }
213        Ok(())
214    }
215
216    pub async fn reopen_for_mutation(
217        &self,
218        dataset_uri: &str,
219        branch: Option<&str>,
220        table_key: &str,
221        expected_version: u64,
222    ) -> Result<Dataset> {
223        let ds = self
224            .open_dataset_head_for_write(table_key, dataset_uri, branch)
225            .await?;
226        self.ensure_expected_version(&ds, table_key, expected_version)?;
227        Ok(ds)
228    }
229
230    pub async fn fork_branch_from_state(
231        &self,
232        dataset_uri: &str,
233        source_branch: Option<&str>,
234        table_key: &str,
235        source_version: u64,
236        target_branch: &str,
237    ) -> Result<Dataset> {
238        let mut source_ds = self
239            .open_dataset_head(dataset_uri, source_branch)
240            .await?
241            .checkout_version(source_version)
242            .await
243            .map_err(|e| OmniError::Lance(e.to_string()))?;
244        self.ensure_expected_version(&source_ds, table_key, source_version)?;
245
246        match source_ds
247            .create_branch(target_branch, source_version, None)
248            .await
249        {
250            Ok(_) => {}
251            Err(create_err) => match self
252                .open_dataset_head(dataset_uri, Some(target_branch))
253                .await
254            {
255                Ok(ds) => {
256                    self.ensure_expected_version(&ds, table_key, source_version)?;
257                    return Ok(ds);
258                }
259                Err(_) => return Err(OmniError::Lance(create_err.to_string())),
260            },
261        }
262
263        let ds = self
264            .open_dataset_head(dataset_uri, Some(target_branch))
265            .await?;
266        self.ensure_expected_version(&ds, table_key, source_version)?;
267        Ok(ds)
268    }
269
270    pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
271        self.scan(ds, None, None, None).await
272    }
273
274    pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
275        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
276        if !has_blob_columns {
277            return self.scan_batches(ds).await;
278        }
279
280        let batches = Self::scan_stream(ds, None, None, None, true)
281            .await?
282            .try_collect::<Vec<RecordBatch>>()
283            .await
284            .map_err(|e| OmniError::Lance(e.to_string()))?;
285        let mut materialized = Vec::with_capacity(batches.len());
286        for batch in batches {
287            materialized.push(Self::materialize_blob_batch(ds, batch).await?);
288        }
289        Ok(materialized)
290    }
291
292    pub(crate) async fn materialize_blob_batch(
293        ds: &Dataset,
294        batch: RecordBatch,
295    ) -> Result<RecordBatch> {
296        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
297        if !has_blob_columns {
298            return Ok(batch);
299        }
300
301        let row_ids = batch
302            .column_by_name("_rowid")
303            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
304            .ok_or_else(|| {
305                OmniError::Lance("expected _rowid column when materializing blobs".to_string())
306            })?
307            .values()
308            .iter()
309            .copied()
310            .collect::<Vec<_>>();
311
312        let schema: SchemaRef = Arc::new(ds.schema().into());
313        let mut columns = Vec::with_capacity(schema.fields().len());
314        for field in schema.fields() {
315            let lance_field = lance::datatypes::Field::try_from(field.as_ref())
316                .map_err(|e| OmniError::Lance(e.to_string()))?;
317            let column = batch.column_by_name(field.name()).ok_or_else(|| {
318                OmniError::Lance(format!("batch missing column '{}'", field.name()))
319            })?;
320            if lance_field.is_blob() {
321                let descriptions =
322                    column
323                        .as_any()
324                        .downcast_ref::<StructArray>()
325                        .ok_or_else(|| {
326                            OmniError::Lance(format!(
327                                "expected blob descriptions for '{}'",
328                                field.name()
329                            ))
330                        })?;
331                columns.push(
332                    Self::rebuild_blob_column(ds, field.name(), descriptions, &row_ids).await?,
333                );
334            } else {
335                columns.push(column.clone());
336            }
337        }
338
339        RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
340    }
341
342    async fn rebuild_blob_column(
343        ds: &Dataset,
344        column_name: &str,
345        descriptions: &StructArray,
346        row_ids: &[u64],
347    ) -> Result<ArrayRef> {
348        let mut builder = BlobArrayBuilder::new(row_ids.len());
349        let mut non_null_row_ids = Vec::new();
350        let mut row_has_blob = Vec::with_capacity(row_ids.len());
351
352        for row in 0..row_ids.len() {
353            let is_null = Self::blob_description_is_null(descriptions, row)?;
354            row_has_blob.push(!is_null);
355            if !is_null {
356                non_null_row_ids.push(row_ids[row]);
357            }
358        }
359
360        let blob_files = if non_null_row_ids.is_empty() {
361            Vec::new()
362        } else {
363            Arc::new(ds.clone())
364                .take_blobs(&non_null_row_ids, column_name)
365                .await
366                .map_err(|e| OmniError::Lance(e.to_string()))?
367        };
368
369        let mut files = blob_files.into_iter();
370        for has_blob in row_has_blob {
371            if !has_blob {
372                builder
373                    .push_null()
374                    .map_err(|e| OmniError::Lance(e.to_string()))?;
375                continue;
376            }
377
378            let blob = files.next().ok_or_else(|| {
379                OmniError::Lance(format!(
380                    "blob rewrite for '{}' lost alignment with source rows",
381                    column_name
382                ))
383            })?;
384            builder
385                .push_bytes(
386                    blob.read()
387                        .await
388                        .map_err(|e| OmniError::Lance(e.to_string()))?,
389                )
390                .map_err(|e| OmniError::Lance(e.to_string()))?;
391        }
392
393        if files.next().is_some() {
394            return Err(OmniError::Lance(format!(
395                "blob rewrite for '{}' produced extra source blobs",
396                column_name
397            )));
398        }
399
400        builder
401            .finish()
402            .map_err(|e| OmniError::Lance(e.to_string()))
403    }
404
405    fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
406        if descriptions.is_null(row) {
407            return Ok(true);
408        }
409
410        let position = descriptions
411            .column_by_name("position")
412            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
413            .ok_or_else(|| {
414                OmniError::Lance(format!(
415                    "unrecognized blob description schema {:?}: missing UInt64 position field",
416                    descriptions.fields()
417                ))
418            })?;
419        let size = descriptions
420            .column_by_name("size")
421            .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
422            .ok_or_else(|| {
423                OmniError::Lance(format!(
424                    "unrecognized blob description schema {:?}: missing UInt64 size field",
425                    descriptions.fields()
426                ))
427            })?;
428
429        let Some(kind_column) = descriptions.column_by_name("kind") else {
430            return Ok(position.is_null(row) || size.is_null(row));
431        };
432        let kind = if let Some(kind) = kind_column.as_any().downcast_ref::<UInt8Array>() {
433            if kind.is_null(row) {
434                return Ok(true);
435            }
436            kind.value(row)
437        } else if let Some(kind) = kind_column.as_any().downcast_ref::<UInt32Array>() {
438            if kind.is_null(row) {
439                return Ok(true);
440            }
441            kind.value(row) as u8
442        } else {
443            return Err(OmniError::Lance(format!(
444                "unrecognized blob description schema {:?}: kind field must be UInt8 or UInt32",
445                descriptions.fields()
446            )));
447        };
448
449        let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
450        if kind != BlobKind::Inline {
451            return Ok(false);
452        }
453        let blob_uri = descriptions
454            .column_by_name("blob_uri")
455            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
456            .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
457
458        Ok((position.is_null(row) || position.value(row) == 0)
459            && (size.is_null(row) || size.value(row) == 0)
460            && blob_uri.unwrap_or("").is_empty())
461    }
462
463    pub async fn scan_stream(
464        ds: &Dataset,
465        projection: Option<&[&str]>,
466        filter: Option<&str>,
467        order_by: Option<Vec<ColumnOrdering>>,
468        with_row_id: bool,
469    ) -> Result<DatasetRecordBatchStream> {
470        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
471    }
472
473    pub async fn scan_stream_with<F>(
474        ds: &Dataset,
475        projection: Option<&[&str]>,
476        filter: Option<&str>,
477        order_by: Option<Vec<ColumnOrdering>>,
478        with_row_id: bool,
479        configure: F,
480    ) -> Result<DatasetRecordBatchStream>
481    where
482        F: FnOnce(&mut Scanner) -> Result<()>,
483    {
484        let mut scanner = ds.scan();
485        if with_row_id {
486            scanner.with_row_id();
487        }
488        if let Some(columns) = projection {
489            scanner
490                .project(columns)
491                .map_err(|e| OmniError::Lance(e.to_string()))?;
492        }
493        if let Some(filter_sql) = filter {
494            scanner
495                .filter(filter_sql)
496                .map_err(|e| OmniError::Lance(e.to_string()))?;
497        }
498        if let Some(ordering) = order_by {
499            scanner
500                .order_by(Some(ordering))
501                .map_err(|e| OmniError::Lance(e.to_string()))?;
502        }
503        configure(&mut scanner)?;
504        scanner
505            .try_into_stream()
506            .await
507            .map_err(|e| OmniError::Lance(e.to_string()))
508    }
509
510    pub async fn scan(
511        &self,
512        ds: &Dataset,
513        projection: Option<&[&str]>,
514        filter: Option<&str>,
515        order_by: Option<Vec<ColumnOrdering>>,
516    ) -> Result<Vec<RecordBatch>> {
517        Self::scan_stream(ds, projection, filter, order_by, false)
518            .await?
519            .try_collect()
520            .await
521            .map_err(|e| OmniError::Lance(e.to_string()))
522    }
523
524    pub async fn scan_with<F>(
525        &self,
526        ds: &Dataset,
527        projection: Option<&[&str]>,
528        filter: Option<&str>,
529        order_by: Option<Vec<ColumnOrdering>>,
530        with_row_id: bool,
531        configure: F,
532    ) -> Result<Vec<RecordBatch>>
533    where
534        F: FnOnce(&mut Scanner) -> Result<()>,
535    {
536        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
537            .await?
538            .try_collect()
539            .await
540            .map_err(|e| OmniError::Lance(e.to_string()))
541    }
542
543    pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
544        ds.count_rows(filter)
545            .await
546            .map(|count| count as usize)
547            .map_err(|e| OmniError::Lance(e.to_string()))
548    }
549
550    pub fn dataset_version(&self, ds: &Dataset) -> u64 {
551        ds.version().version
552    }
553
554    pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
555        Ok(TableState {
556            version: self.dataset_version(ds),
557            row_count: self.count_rows(ds, None).await? as u64,
558            version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
559        })
560    }
561
562    pub async fn append_batch(
563        &self,
564        dataset_uri: &str,
565        ds: &mut Dataset,
566        batch: RecordBatch,
567    ) -> Result<TableState> {
568        if batch.num_rows() == 0 {
569            return self.table_state(dataset_uri, ds).await;
570        }
571        let schema = batch.schema();
572        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
573        let params = WriteParams {
574            mode: WriteMode::Append,
575            allow_external_blob_outside_bases: true,
576            ..Default::default()
577        };
578        ds.append(reader, Some(params))
579            .await
580            .map_err(|e| OmniError::Lance(e.to_string()))?;
581        self.table_state(dataset_uri, ds).await
582    }
583
584    pub async fn append_or_create_batch(
585        dataset_uri: &str,
586        dataset: Option<Dataset>,
587        batch: RecordBatch,
588    ) -> Result<Dataset> {
589        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
590        match dataset {
591            Some(mut ds) => {
592                let params = WriteParams {
593                    mode: WriteMode::Append,
594                    allow_external_blob_outside_bases: true,
595                    ..Default::default()
596                };
597                ds.append(reader, Some(params))
598                    .await
599                    .map_err(|e| OmniError::Lance(e.to_string()))?;
600                Ok(ds)
601            }
602            None => {
603                let params = WriteParams {
604                    mode: WriteMode::Create,
605                    enable_stable_row_ids: true,
606                    data_storage_version: Some(LanceFileVersion::V2_2),
607                    allow_external_blob_outside_bases: true,
608                    ..Default::default()
609                };
610                Dataset::write(reader, dataset_uri, Some(params))
611                    .await
612                    .map_err(|e| OmniError::Lance(e.to_string()))
613            }
614        }
615    }
616
617    pub async fn overwrite_batch(
618        &self,
619        dataset_uri: &str,
620        ds: &mut Dataset,
621        batch: RecordBatch,
622    ) -> Result<TableState> {
623        ds.truncate_table()
624            .await
625            .map_err(|e| OmniError::Lance(e.to_string()))?;
626        self.append_batch(dataset_uri, ds, batch).await
627    }
628
629    pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
630        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
631        let params = WriteParams {
632            mode: WriteMode::Overwrite,
633            enable_stable_row_ids: true,
634            data_storage_version: Some(LanceFileVersion::V2_2),
635            allow_external_blob_outside_bases: true,
636            ..Default::default()
637        };
638        Dataset::write(reader, dataset_uri, Some(params))
639            .await
640            .map_err(|e| OmniError::Lance(e.to_string()))
641    }
642
643    pub async fn merge_insert_batch(
644        &self,
645        dataset_uri: &str,
646        ds: Dataset,
647        batch: RecordBatch,
648        key_columns: Vec<String>,
649        when_matched: WhenMatched,
650        when_not_matched: WhenNotMatched,
651    ) -> Result<TableState> {
652        if batch.num_rows() == 0 {
653            return self.table_state(dataset_uri, &ds).await;
654        }
655
656        // Precondition for the FirstSeen workaround below: every caller of
657        // this primitive must hand in a source batch that is unique by
658        // `key_columns`. Without this check, `SourceDedupeBehavior::FirstSeen`
659        // would silently collapse genuine duplicates instead of erroring.
660        check_batch_unique_by_keys(&batch, &key_columns, "merge_insert_batch")?;
661
662        // TODO(lance-upstream): MergeInsertBuilder does not accept WriteParams,
663        // so allow_external_blob_outside_bases cannot be set here. External URI
664        // blobs via merge_insert (LoadMode::Merge, mutations) are unsupported
665        // until Lance exposes WriteParams on MergeInsertBuilder.
666        let ds = Arc::new(ds);
667        let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
668            .map_err(|e| OmniError::Lance(e.to_string()))?;
669        builder.when_matched(when_matched);
670        builder.when_not_matched(when_not_matched);
671        // Workaround for a Lance 4.0.x bug class where sequential
672        // merge_insert calls against rows previously rewritten by
673        // merge_insert produce a spurious "Ambiguous merge inserts:
674        // multiple source rows match the same target row on (id = ...)"
675        // error. Lance's `processed_row_ids: Mutex<HashSet<u64>>`
676        // (lance-4.0.0 `src/dataset/write/merge_insert.rs:2099`)
677        // double-processes the same source/target match against
678        // datasets previously rewritten by merge_insert, and the default
679        // `SourceDedupeBehavior::Fail` errors on the second insertion.
680        // `FirstSeen` makes Lance skip the duplicate match instead.
681        //
682        // Covers both observed surfaces:
683        // - PR #98 (sequential `load --mode merge` against same keys).
684        // - MR-920 (sequential `update T set {f} where x=y` on same row).
685        //
686        // Correctness-preserving for OmniGraph because every call path
687        // that reaches this primitive either pre-dedupes the source batch
688        // by id, or surfaces a real source dup via the
689        // `check_batch_unique_by_keys` precondition above (which fires
690        // before the FirstSeen setter has a chance to silently collapse
691        // anything):
692        // - Load path: `enforce_unique_constraints_intra_batch`
693        //   (`loader/mod.rs:1453`) errors on intra-batch `@key` dups.
694        // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
695        //   accumulates and dedupes by `id`.
696        // - Branch-merge path: `compute_source_delta` /
697        //   `compute_three_way_delta` (`exec/merge.rs`) walk via
698        //   `OrderedTableCursor` and `push_row` each id at most once.
699        // So FirstSeen only suppresses the spurious Lance behavior, never
700        // user data. Pinned by `loader_rejects_intra_batch_duplicate_keys`
701        // in `tests/consistency.rs` plus the
702        // `check_batch_unique_by_keys` precondition.
703        //
704        // Retire when upstream Lance fixes the bug class. Tracked at
705        // MR-957; upstream: lance-format/lance#6877.
706        builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
707        let job = builder
708            .try_build()
709            .map_err(|e| OmniError::Lance(e.to_string()))?;
710
711        let schema = batch.schema();
712        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
713        let (new_ds, _stats) = job
714            .execute(lance_datafusion::utils::reader_to_stream(Box::new(reader)))
715            .await
716            .map_err(|e| OmniError::Lance(e.to_string()))?;
717        self.table_state(dataset_uri, &new_ds).await
718    }
719
720    pub async fn merge_insert_batches(
721        &self,
722        dataset_uri: &str,
723        ds: Dataset,
724        batches: Vec<RecordBatch>,
725        key_columns: Vec<String>,
726        when_matched: WhenMatched,
727        when_not_matched: WhenNotMatched,
728    ) -> Result<TableState> {
729        if batches.is_empty() {
730            return self.table_state(dataset_uri, &ds).await;
731        }
732        let batch = if batches.len() == 1 {
733            batches.into_iter().next().unwrap()
734        } else {
735            let schema = batches[0].schema();
736            concat_batches(&schema, &batches).map_err(|e| OmniError::Lance(e.to_string()))?
737        };
738        self.merge_insert_batch(
739            dataset_uri,
740            ds,
741            batch,
742            key_columns,
743            when_matched,
744            when_not_matched,
745        )
746        .await
747    }
748
749    pub async fn delete_where(
750        &self,
751        dataset_uri: &str,
752        ds: &mut Dataset,
753        filter: &str,
754    ) -> Result<DeleteState> {
755        let delete_result = ds
756            .delete(filter)
757            .await
758            .map_err(|e| OmniError::Lance(e.to_string()))?;
759        Ok(DeleteState {
760            version: delete_result.new_dataset.version().version,
761            row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
762            deleted_rows: delete_result.num_deleted_rows as usize,
763            version_metadata: self
764                .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
765        })
766    }
767
768    // ─── Staged-write API ────────────────────────────────────────────────────
769    //
770    // These primitives wrap Lance's distributed-write API: each call writes
771    // fragment files to object storage but does NOT advance the dataset's
772    // HEAD or commit a manifest entry. The returned `Transaction` is held by
773    // the caller (typically `MutationStaging` or the loader's accumulator)
774    // and committed at end-of-query via `commit_staged`. On failure the
775    // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
776    //
777    // The extracted `Vec<Fragment>` is for read-your-writes within the same
778    // query: subsequent ops construct a `Scanner` and call
779    // `scanner.with_fragments(staged.clone())` to see staged data alongside
780    // the committed snapshot. Lance's filter pushdown, vector search, and
781    // FTS all respect the supplied fragment list.
782
783    /// Stage an append: write fragment files for `batch`, return the
784    /// uncommitted Lance transaction plus the new fragments for
785    /// read-your-writes.
786    ///
787    /// `prior_stages` is the slice of staged writes already accumulated
788    /// against the **same dataset** in the same query. Pass `&[]` for the
789    /// first call; pass the accumulated stages for subsequent calls. The
790    /// primitive uses this to offset row-ID assignment so chained
791    /// `stage_append` calls don't produce overlapping `_rowid` ranges.
792    /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
793    /// slice gets passed to both.
794    ///
795    /// On stable-row-id datasets we manually populate `row_id_meta` on
796    /// the cloned `new_fragments` we expose for `scan_with_staged`.
797    /// Lance's `InsertBuilder::execute_uncommitted` produces fragments
798    /// with `row_id_meta = None`; row IDs are normally assigned by
799    /// `Transaction::assign_row_ids` during commit. Because
800    /// `scan_with_staged` reads the staged fragments *before* commit,
801    /// the scanner trips on a stable-row-id dataset
802    /// (`Error::internal("Missing row id meta")` from
803    /// `dataset/rowids.rs:22`). The transaction's internal fragment copy
804    /// stays untouched — Lance assigns IDs there independently at commit
805    /// time, and the two ID assignments don't have to agree because no
806    /// caller threads `_rowid` from the staged scan into the commit
807    /// path.
808    ///
809    /// **Contract: `prior_stages` must contain only previous
810    /// `stage_append` results against the same dataset.** Mixing
811    /// stage_merge_insert into `prior_stages` would over-count because
812    /// merge_insert's `new_fragments` include rewrites that don't add
813    /// rows. The engine's parse-time D₂′ check (per touched table: all
814    /// stage_append OR exactly one stage_merge_insert) guarantees this
815    /// upstream; on the primitive layer it's the caller's responsibility.
816    pub async fn stage_append(
817        &self,
818        ds: &Dataset,
819        batch: RecordBatch,
820        prior_stages: &[StagedWrite],
821    ) -> Result<StagedWrite> {
822        if batch.num_rows() == 0 {
823            return Err(OmniError::manifest_internal(
824                "stage_append called with empty batch".to_string(),
825            ));
826        }
827        let params = WriteParams {
828            mode: WriteMode::Append,
829            allow_external_blob_outside_bases: true,
830            ..Default::default()
831        };
832        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
833            .with_params(&params)
834            .execute_uncommitted(vec![batch])
835            .await
836            .map_err(|e| OmniError::Lance(e.to_string()))?;
837        let mut new_fragments = match &transaction.operation {
838            Operation::Append { fragments } => fragments.clone(),
839            Operation::Overwrite { fragments, .. } => fragments.clone(),
840            other => {
841                return Err(OmniError::manifest_internal(format!(
842                    "stage_append: unexpected Lance operation {:?}",
843                    std::mem::discriminant(other)
844                )));
845            }
846        };
847        // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
848        // returns fragments with `id = 0` ("Temporary ID" — see lance-4.0.0
849        // `dataset/write.rs:1044/1712`); the real assignment happens during
850        // commit via `Transaction::fragments_with_ids`. Because we expose
851        // these fragments to `scan_with_staged` *before* commit, two staged
852        // fragments (or one staged + the seed) would collide on `id = 0`,
853        // causing Lance's scanner to mishandle the combined list (silent
854        // duplicates / dropped rows). Mirror the commit-time renumbering
855        // here, using `ds.manifest.max_fragment_id() + 1` as the base and
856        // accounting for prior stages.
857        // ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
858        // Lance's Fragment::id (and the commit-time renumbering counter in
859        // Transaction::fragments_with_ids) operate on u64.
860        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
861            + 1
862            + prior_stages_fragment_count(prior_stages);
863        assign_fragment_ids(&mut new_fragments, next_id_base);
864        if ds.manifest.uses_stable_row_ids() {
865            let prior_rows = prior_stages_row_count(prior_stages)?;
866            let start_row_id = ds.manifest.next_row_id + prior_rows;
867            assign_row_id_meta(&mut new_fragments, start_row_id)?;
868        }
869        Ok(StagedWrite {
870            transaction,
871            new_fragments,
872            // Append never supersedes existing fragments.
873            removed_fragment_ids: Vec::new(),
874        })
875    }
876
877    /// Stage a merge_insert (upsert): write fragment files describing the
878    /// merge result, return the uncommitted transaction plus the new
879    /// fragments. The transaction's `Operation::Update` carries the
880    /// fragments-to-remove and fragments-to-add; for read-your-writes we
881    /// expose `new_fragments` (rows that will be visible after commit).
882    ///
883    /// **Contract: do not chain `stage_merge_insert` calls on the same
884    /// table within one query.** Each call's `MergeInsertBuilder` runs
885    /// against the supplied dataset's committed view — it does not see
886    /// fragments produced by a previous staged merge on the same table.
887    /// Two chained `stage_merge_insert`s whose source rows share keys will
888    /// each independently produce `Operation::Update` transactions whose
889    /// `new_fragments` contain a row for the shared key. `scan_with_staged`
890    /// (and `count_rows_with_staged`) will then return both — i.e.
891    /// **duplicates by key**.
892    ///
893    /// This is intrinsic to the underlying Lance API: there is no public
894    /// way to make `MergeInsertBuilder` see uncommitted fragments. The
895    /// engine's `MutationStaging` accumulator works around this by
896    /// concatenating per-table batches in memory and issuing exactly
897    /// one `stage_merge_insert` per touched table at end-of-query (with
898    /// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
899    /// callers of this primitive must respect the contract themselves.
900    ///
901    /// Lift path: either a Lance API extension that lets
902    /// `MergeInsertBuilder` accept additional staged fragments, or an
903    /// in-memory pre-merge here that folds prior staged batches into the
904    /// input stream. See `docs/runs.md`.
905    pub async fn stage_merge_insert(
906        &self,
907        ds: Dataset,
908        batch: RecordBatch,
909        key_columns: Vec<String>,
910        when_matched: WhenMatched,
911        when_not_matched: WhenNotMatched,
912    ) -> Result<StagedWrite> {
913        if batch.num_rows() == 0 {
914            return Err(OmniError::manifest_internal(
915                "stage_merge_insert called with empty batch".to_string(),
916            ));
917        }
918
919        // Precondition for FirstSeen below. See the comment on
920        // `merge_insert_batch` for why this check is here, not on the caller:
921        // every call path that reaches stage_merge_insert (load,
922        // MutationStaging::finalize, branch_merge::publish_rewritten_merge_table)
923        // must hand in a source batch that is unique by `key_columns`.
924        check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
925
926        let ds = Arc::new(ds);
927        let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
928            .map_err(|e| OmniError::Lance(e.to_string()))?;
929        builder.when_matched(when_matched);
930        builder.when_not_matched(when_not_matched);
931        // See `merge_insert_batch` for the FirstSeen rationale. Workaround
932        // for the Lance 4.0.x bug class where sequential merge_insert /
933        // update against rows previously rewritten by merge_insert trips
934        // Lance's `processed_row_ids` HashSet and errors under the default
935        // `SourceDedupeBehavior::Fail`. Retire when upstream Lance is fixed.
936        builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
937        let job = builder
938            .try_build()
939            .map_err(|e| OmniError::Lance(e.to_string()))?;
940        let schema = batch.schema();
941        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
942        let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
943        let uncommitted = job
944            .execute_uncommitted(stream)
945            .await
946            .map_err(|e| OmniError::Lance(e.to_string()))?;
947        // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
948        // `new_fragments` are the freshly inserted rows; `updated_fragments`
949        // are rewrites of existing fragments that include both retained and
950        // updated rows; `removed_fragment_ids` lists the committed-manifest
951        // fragments that those rewrites supersede. For read-your-writes we
952        // expose `updated_fragments + new_fragments` and the
953        // `removed_fragment_ids` so `scan_with_staged` can filter the
954        // superseded committed fragments before combining — otherwise a
955        // single merge_insert appears as duplicate rows (original committed
956        // version + rewritten staged version).
957        let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
958            Operation::Update {
959                new_fragments,
960                updated_fragments,
961                removed_fragment_ids,
962                ..
963            } => {
964                let mut all = updated_fragments.clone();
965                all.extend(new_fragments.iter().cloned());
966                (all, removed_fragment_ids.clone())
967            }
968            Operation::Append { fragments } => (fragments.clone(), Vec::new()),
969            other => {
970                return Err(OmniError::manifest_internal(format!(
971                    "stage_merge_insert: unexpected Lance operation {:?}",
972                    std::mem::discriminant(other)
973                )));
974            }
975        };
976        Ok(StagedWrite {
977            transaction: uncommitted.transaction,
978            new_fragments,
979            removed_fragment_ids,
980        })
981    }
982
983    /// Commit a previously-staged transaction onto `ds`, returning the new
984    /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
985    /// the publisher at end-of-query to materialize all staged writes before
986    /// the meta-manifest commit.
987    pub async fn commit_staged(
988        &self,
989        ds: Arc<Dataset>,
990        transaction: Transaction,
991    ) -> Result<Dataset> {
992        CommitBuilder::new(ds)
993            .execute(transaction)
994            .await
995            .map_err(|e| OmniError::Lance(e.to_string()))
996    }
997
998    /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
999    /// Returns a StagedWrite carrying the replacement fragments. HEAD does
1000    /// NOT advance.
1001    ///
1002    /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
1003    /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
1004    /// `Operation::Overwrite` carries the new schema + fragments. The
1005    /// transaction is committed via `commit_staged` (same call as
1006    /// `stage_append`).
1007    ///
1008    /// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
1009    /// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
1010    pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
1011        if batch.num_rows() == 0 {
1012            return Err(OmniError::manifest_internal(
1013                "stage_overwrite called with empty batch".to_string(),
1014            ));
1015        }
1016        // `enable_stable_row_ids: true` is defensive — empirically Lance 4.0.0
1017        // preserves the source dataset's flag through `Operation::Overwrite`
1018        // when WriteParams omits it (pinned by
1019        // `stage_overwrite_preserves_stable_row_ids` in tests/staged_writes.rs),
1020        // but setting it explicitly matches the public `overwrite_dataset`
1021        // path and keeps the invariant documented at every Overwrite site
1022        // (see docs/storage.md "Stable row IDs"). Setting it on an existing
1023        // dataset that was created without stable row IDs is a no-op per
1024        // Lance's row-id-lineage spec, so this stays correct for legacy
1025        // datasets.
1026        let params = WriteParams {
1027            mode: WriteMode::Overwrite,
1028            enable_stable_row_ids: true,
1029            allow_external_blob_outside_bases: true,
1030            ..Default::default()
1031        };
1032        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1033            .with_params(&params)
1034            .execute_uncommitted(vec![batch])
1035            .await
1036            .map_err(|e| OmniError::Lance(e.to_string()))?;
1037        let mut new_fragments = match &transaction.operation {
1038            Operation::Overwrite { fragments, .. } => fragments.clone(),
1039            other => {
1040                return Err(OmniError::manifest_internal(format!(
1041                    "stage_overwrite: unexpected Lance operation {:?}",
1042                    std::mem::discriminant(other)
1043                )));
1044            }
1045        };
1046        // Overwrite REPLACES every committed fragment, and Lance restarts
1047        // fragment-ID and row-ID counters at the post-commit version.
1048        // For our pre-commit staged view we need to:
1049        //   1) Renumber temporary fragment IDs (Lance returns them as
1050        //      `id = 0` from `execute_uncommitted` — see stage_append
1051        //      for the same fix). For Overwrite there are no committed
1052        //      fragments to collide with (they're all in
1053        //      removed_fragment_ids below), so start at 1.
1054        //   2) For stable-row-id datasets, assign row_id_meta starting
1055        //      at 0 (Overwrite is a fresh-start) so `scan_with_staged`
1056        //      doesn't hit the "Missing row id meta" panic in
1057        //      lance-4.0.0 dataset/rowids.rs:22.
1058        assign_fragment_ids(&mut new_fragments, 1);
1059        if ds.manifest.uses_stable_row_ids() {
1060            assign_row_id_meta(&mut new_fragments, 0)?;
1061        }
1062        // Overwrite REPLACES every committed fragment. For
1063        // read-your-writes via scan_with_staged, list every committed
1064        // fragment in removed_fragment_ids so the post-stage view shows
1065        // ONLY the staged fragments.
1066        let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
1067        Ok(StagedWrite {
1068            transaction,
1069            new_fragments,
1070            removed_fragment_ids,
1071        })
1072    }
1073
1074    /// Stage a BTREE scalar index build. Returns a StagedWrite whose
1075    /// transaction commits via `commit_staged`. HEAD does NOT advance.
1076    ///
1077    /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
1078    /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
1079    /// { new_indices, removed_indices }` via the public `TransactionBuilder`,
1080    /// replicating the simple (non-segment-commit-path) branch of Lance's
1081    /// `CreateIndexBuilder::execute` (lance-4.0.0 `src/index/create.rs:502-512`).
1082    ///
1083    /// `removed_indices` mirrors `execute()` lines 466-476: when the
1084    /// build replaces an existing same-named index, those entries are
1085    /// listed for tombstoning by the manifest commit.
1086    ///
1087    /// MR-793 Phase 2: scalar index types (BTree, Inverted) are
1088    /// stage-able. Vector indices are NOT (segment-commit-path requires
1089    /// `build_index_metadata_from_segments` which is `pub(crate)` in
1090    /// lance-4.0.0); see `create_vector_index` and Appendix A.3.
1091    pub async fn stage_create_btree_index(
1092        &self,
1093        ds: &Dataset,
1094        columns: &[&str],
1095    ) -> Result<StagedWrite> {
1096        let params = ScalarIndexParams::default();
1097        let mut ds_clone = ds.clone();
1098        let new_idx = ds_clone
1099            .create_index_builder(columns, IndexType::BTree, &params)
1100            .replace(true)
1101            .execute_uncommitted()
1102            .await
1103            .map_err(|e| OmniError::Lance(format!("stage_create_btree_index: {}", e)))?;
1104        let removed_indices: Vec<IndexMetadata> = ds
1105            .load_indices()
1106            .await
1107            .map_err(|e| OmniError::Lance(e.to_string()))?
1108            .iter()
1109            .filter(|idx| idx.name == new_idx.name)
1110            .cloned()
1111            .collect();
1112        let transaction = TransactionBuilder::new(
1113            new_idx.dataset_version,
1114            Operation::CreateIndex {
1115                new_indices: vec![new_idx],
1116                removed_indices,
1117            },
1118        )
1119        .build();
1120        Ok(StagedWrite {
1121            transaction,
1122            new_fragments: Vec::new(),
1123            removed_fragment_ids: Vec::new(),
1124        })
1125    }
1126
1127    /// Stage an INVERTED (FTS) scalar index build. Same shape as
1128    /// `stage_create_btree_index`; see its docs for the Lance API
1129    /// citation and contract notes.
1130    pub async fn stage_create_inverted_index(
1131        &self,
1132        ds: &Dataset,
1133        column: &str,
1134    ) -> Result<StagedWrite> {
1135        let params = InvertedIndexParams::default();
1136        let mut ds_clone = ds.clone();
1137        let new_idx = ds_clone
1138            .create_index_builder(&[column], IndexType::Inverted, &params)
1139            .replace(true)
1140            .execute_uncommitted()
1141            .await
1142            .map_err(|e| OmniError::Lance(format!("stage_create_inverted_index: {}", e)))?;
1143        let removed_indices: Vec<IndexMetadata> = ds
1144            .load_indices()
1145            .await
1146            .map_err(|e| OmniError::Lance(e.to_string()))?
1147            .iter()
1148            .filter(|idx| idx.name == new_idx.name)
1149            .cloned()
1150            .collect();
1151        let transaction = TransactionBuilder::new(
1152            new_idx.dataset_version,
1153            Operation::CreateIndex {
1154                new_indices: vec![new_idx],
1155                removed_indices,
1156            },
1157        )
1158        .build();
1159        Ok(StagedWrite {
1160            transaction,
1161            new_fragments: Vec::new(),
1162            removed_fragment_ids: Vec::new(),
1163        })
1164    }
1165
1166    /// Run a scan with optional uncommitted staged writes visible
1167    /// alongside the committed snapshot. When `staged` is empty this is
1168    /// identical to `scan(...)`.
1169    ///
1170    /// Composes the visible fragment list as `committed - removed + new`:
1171    /// the committed manifest's fragments, minus any fragment IDs that
1172    /// staged `Operation::Update`s (merge_insert rewrites) have superseded,
1173    /// plus the staged new/updated fragments. Without the `removed`
1174    /// filter, a merge_insert that rewrites an existing fragment would
1175    /// surface twice — once via the original committed fragment, once via
1176    /// the rewrite in `new_fragments`.
1177    ///
1178    /// **Filter contract is incomplete on staged fragments.** When `filter`
1179    /// is `Some(...)`, Lance pushes the predicate to per-fragment scans
1180    /// with stats-based pruning. Uncommitted fragments produced by
1181    /// `write_fragments_internal` lack the per-column statistics that
1182    /// committed fragments carry; Lance's optimizer drops them from the
1183    /// filtered scan even when their data would match. Staged-fragment
1184    /// rows are silently absent from the result. `scanner.use_stats(false)`
1185    /// does not fix this in lance 4.0.0. Callers needing correct filtered
1186    /// reads against staged data should use a different strategy — the
1187    /// engine's `MutationStaging` accumulator unions in-memory pending
1188    /// batches with the committed scan via DataFusion `MemTable` (see
1189    /// `scan_with_pending`).
1190    ///
1191    /// This method remains on the surface for primitive-level testing
1192    /// (basic stage + scan correctness without filters works) and for
1193    /// callers that don't need filter pushdown.
1194    pub async fn scan_with_staged(
1195        &self,
1196        ds: &Dataset,
1197        staged: &[StagedWrite],
1198        projection: Option<&[&str]>,
1199        filter: Option<&str>,
1200    ) -> Result<Vec<RecordBatch>> {
1201        if staged.is_empty() {
1202            return self.scan(ds, projection, filter, None).await;
1203        }
1204        let mut scanner = ds.scan();
1205        if let Some(cols) = projection {
1206            let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
1207            scanner
1208                .project(&owned)
1209                .map_err(|e| OmniError::Lance(e.to_string()))?;
1210        }
1211        if let Some(f) = filter {
1212            scanner
1213                .filter(f)
1214                .map_err(|e| OmniError::Lance(e.to_string()))?;
1215        }
1216        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1217        let stream = scanner
1218            .try_into_stream()
1219            .await
1220            .map_err(|e| OmniError::Lance(e.to_string()))?;
1221        stream
1222            .try_collect()
1223            .await
1224            .map_err(|e| OmniError::Lance(e.to_string()))
1225    }
1226
1227    /// Scan committed via Lance + apply the same filter to in-memory
1228    /// pending batches via DataFusion `MemTable`, concat the two result
1229    /// streams. The replacement for `scan_with_staged` in engine code:
1230    /// the staged-write writer accumulates input batches in memory and
1231    /// unions them with the committed snapshot at read time,
1232    /// sidestepping the `Scanner::with_fragments` filter-pushdown
1233    /// limitation documented on `scan_with_staged`.
1234    ///
1235    /// `committed_ds` should be opened at the pre-mutation
1236    /// `expected_version` (the same version captured in `MutationStaging::expected_versions`
1237    /// at first touch of the table). `pending_batches` are the per-table
1238    /// accumulator's batches in their input shape. `pending_schema` is
1239    /// the schema of the accumulated batches; passing `None` falls back
1240    /// to the schema of the first pending batch.
1241    ///
1242    /// `filter` is the Lance / DataFusion SQL predicate. It is applied
1243    /// to both sides — Lance pushes it down on the committed side; the
1244    /// pending side runs it through a fresh DataFusion `SessionContext`
1245    /// with the batches registered as a `MemTable` named `pending`.
1246    ///
1247    /// `key_column` controls how committed and pending are unioned:
1248    /// - **`None` (union semantics)**: every committed row that matches
1249    ///   the filter and every pending row that matches the filter is
1250    ///   returned. Correct when committed and pending cannot share a
1251    ///   primary key — e.g., Append-mode loads with ULID-generated ids,
1252    ///   or any read where pending hasn't been used to update committed
1253    ///   rows.
1254    /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
1255    ///   `col` value appears in any pending batch are EXCLUDED from the
1256    ///   result; only pending's view of those rows is returned. Required
1257    ///   for Merge-mode reads (e.g., `execute_update` on the engine path)
1258    ///   so a chained `update` doesn't see stale committed values that
1259    ///   a prior op already updated in pending. Without this, a predicate
1260    ///   like `where age > 30` can match a row that an earlier
1261    ///   `set age = 20` already moved out of range.
1262    ///
1263    /// When `pending_batches` is empty this delegates to the regular
1264    /// scan path.
1265    pub async fn scan_with_pending(
1266        &self,
1267        committed_ds: &Dataset,
1268        pending_batches: &[RecordBatch],
1269        pending_schema: Option<SchemaRef>,
1270        projection: Option<&[&str]>,
1271        filter: Option<&str>,
1272        key_column: Option<&str>,
1273    ) -> Result<Vec<RecordBatch>> {
1274        // Contract: when merge-shadow semantics are requested via
1275        // `key_column`, the committed-side projection MUST include that
1276        // column so we can filter committed rows whose key appears in
1277        // pending. Silently dropping the shadow when projection omits
1278        // the key would re-introduce union semantics behind the
1279        // caller's back. Reject up front with a clear error so callers
1280        // either (a) include the key in projection or (b) drop
1281        // `key_column` if union is what they wanted.
1282        if let (Some(key_col), Some(cols)) = (key_column, projection) {
1283            if !cols.iter().any(|c| *c == key_col) {
1284                return Err(OmniError::Lance(format!(
1285                    "scan_with_pending: key_column '{}' must appear in projection \
1286                     when merge-shadow semantics are requested (got projection = {:?})",
1287                    key_col, cols
1288                )));
1289            }
1290        }
1291
1292        let committed = self.scan(committed_ds, projection, filter, None).await?;
1293        if pending_batches.is_empty() {
1294            return Ok(committed);
1295        }
1296
1297        // Shadow committed rows whose key value also appears in pending.
1298        // This makes scan_with_pending implement merge semantics rather
1299        // than naive union: any row that has a pending update is
1300        // represented ONLY by its pending value, never by both its
1301        // (stale) committed value and its (current) pending value.
1302        let committed = match key_column {
1303            Some(key_col) => {
1304                let pending_keys = collect_string_column_values(pending_batches, key_col)?;
1305                if pending_keys.is_empty() {
1306                    committed
1307                } else {
1308                    filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
1309                }
1310            }
1311            None => committed,
1312        };
1313
1314        let pending =
1315            scan_pending_batches(pending_batches, pending_schema, projection, filter).await?;
1316
1317        let mut out = committed;
1318        out.extend(pending);
1319        Ok(out)
1320    }
1321
1322    /// `count_rows` variant that respects staged writes. Used for
1323    /// edge-cardinality validation that needs to see staged edges before
1324    /// commit. Same `committed - removed + new` composition as
1325    /// `scan_with_staged`.
1326    pub async fn count_rows_with_staged(
1327        &self,
1328        ds: &Dataset,
1329        staged: &[StagedWrite],
1330        filter: Option<String>,
1331    ) -> Result<usize> {
1332        if staged.is_empty() {
1333            return self.count_rows(ds, filter).await;
1334        }
1335        let mut scanner = ds.scan();
1336        if let Some(f) = filter {
1337            scanner
1338                .filter(&f)
1339                .map_err(|e| OmniError::Lance(e.to_string()))?;
1340        }
1341        scanner.with_fragments(combine_committed_with_staged(ds, staged));
1342        let count = scanner
1343            .count_rows()
1344            .await
1345            .map_err(|e| OmniError::Lance(e.to_string()))?;
1346        Ok(count as usize)
1347    }
1348
1349    async fn user_indices_for_column(
1350        &self,
1351        ds: &Dataset,
1352        column: &str,
1353    ) -> Result<Vec<IndexMetadata>> {
1354        let field_id = ds
1355            .schema()
1356            .field(column)
1357            .map(|field| field.id)
1358            .ok_or_else(|| {
1359                OmniError::manifest_internal(format!(
1360                    "dataset is missing expected index column '{}'",
1361                    column
1362                ))
1363            })?;
1364        let indices = ds
1365            .load_indices()
1366            .await
1367            .map_err(|e| OmniError::Lance(e.to_string()))?;
1368        Ok(indices
1369            .iter()
1370            .filter(|index| !is_system_index(index))
1371            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
1372            .cloned()
1373            .collect())
1374    }
1375
1376    pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1377        let indices = self.user_indices_for_column(ds, column).await?;
1378        Ok(indices.iter().any(|index| {
1379            index
1380                .index_details
1381                .as_ref()
1382                .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
1383                .unwrap_or(false)
1384        }))
1385    }
1386
1387    pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1388        let indices = self.user_indices_for_column(ds, column).await?;
1389        Ok(indices.iter().any(|index| {
1390            index
1391                .index_details
1392                .as_ref()
1393                .map(|details| IndexDetails(details.clone()).supports_fts())
1394                .unwrap_or(false)
1395        }))
1396    }
1397
1398    pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1399        let indices = self.user_indices_for_column(ds, column).await?;
1400        Ok(indices.iter().any(|index| {
1401            index
1402                .index_details
1403                .as_ref()
1404                .map(|details| IndexDetails(details.clone()).is_vector())
1405                .unwrap_or(false)
1406        }))
1407    }
1408
1409    pub async fn create_btree_index(&self, ds: &mut Dataset, columns: &[&str]) -> Result<()> {
1410        let params = ScalarIndexParams::default();
1411        ds.create_index_builder(columns, IndexType::BTree, &params)
1412            .replace(true)
1413            .await
1414            .map(|_| ())
1415            .map_err(|e| OmniError::Lance(e.to_string()))
1416    }
1417
1418    pub async fn create_inverted_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1419        let params = InvertedIndexParams::default();
1420        ds.create_index_builder(&[column], IndexType::Inverted, &params)
1421            .replace(true)
1422            .await
1423            .map(|_| ())
1424            .map_err(|e| OmniError::Lance(e.to_string()))
1425    }
1426
1427    pub async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1428        let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
1429        ds.create_index_builder(&[column], IndexType::Vector, &params)
1430            .replace(true)
1431            .await
1432            .map(|_| ())
1433            .map_err(|e| OmniError::Lance(e.to_string()))
1434    }
1435
1436    pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
1437        let batch = RecordBatch::new_empty(schema.clone());
1438        Self::write_dataset(dataset_uri, batch).await
1439    }
1440
1441    pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
1442        let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
1443            .await?
1444            .try_collect::<Vec<RecordBatch>>()
1445            .await
1446            .map_err(|e| OmniError::Lance(e.to_string()))?;
1447        Ok(batches.iter().find_map(|batch| {
1448            batch
1449                .column_by_name("_rowid")
1450                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1451                .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
1452        }))
1453    }
1454
1455    pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
1456        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
1457        let params = WriteParams {
1458            mode: WriteMode::Create,
1459            enable_stable_row_ids: true,
1460            data_storage_version: Some(LanceFileVersion::V2_2),
1461            allow_external_blob_outside_bases: true,
1462            ..Default::default()
1463        };
1464        Dataset::write(reader, dataset_uri, Some(params))
1465            .await
1466            .map_err(|e| OmniError::Lance(e.to_string()))
1467    }
1468}
1469
1470/// Build the `Scanner::with_fragments` argument for read-your-writes:
1471/// committed manifest fragments minus any fragment IDs superseded by the
1472/// staged writes, plus the staged `new_fragments`. Order is:
1473///   1. committed fragments whose IDs are NOT in any staged
1474///      `removed_fragment_ids` (preserves committed order),
1475///   2. all staged `new_fragments` in stage order.
1476///
1477/// Lance's `Scanner` does not require any particular ordering between
1478/// committed and staged fragments — `with_fragments` scopes the scan to
1479/// exactly the supplied list. The dedup matters because merge_insert
1480/// rewrites a fragment in place at the Lance layer: the rewritten
1481/// fragment is in `new_fragments`, the original (which it supersedes) is
1482/// in `committed` until manifest commit, and including both would yield
1483/// duplicate rows.
1484///
1485/// **Inter-stage supersession is not handled here.** Each StagedWrite's
1486/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
1487/// later staged merge cannot know about an earlier staged merge's
1488/// fragments (Lance's `MergeInsertBuilder` runs against the committed
1489/// view). If two `stage_merge_insert`s on the same table produce rows
1490/// with the same key, the combined view returns duplicates by key. The
1491/// engine's mutation path enforces "per touched table: all stage_append
1492/// OR exactly one stage_merge_insert" at parse time (D₂′ in
1493/// `exec/mutation.rs`) so this primitive's caller never chains merges.
1494/// See `stage_merge_insert` for the full contract.
1495/// Sum `physical_rows` across all fragments in the supplied stages.
1496/// Used by `stage_append` to compute the row-ID offset for chained
1497/// `stage_append` calls against the same dataset.
1498///
1499/// Assumes `prior_stages` contains only `stage_append` results — see
1500/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the
1501/// `new_fragments` include rewrites that don't add new rows, so this
1502/// would over-count.
1503fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
1504    prior_stages
1505        .iter()
1506        .map(|s| s.new_fragments.len() as u64)
1507        .sum()
1508}
1509
1510/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
1511/// commit-time `Transaction::fragments_with_ids` (lance-4.0.0
1512/// `dataset/transaction.rs:1456`) — fragments produced by
1513/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
1514/// placeholder; we renumber here so they don't collide with committed
1515/// fragments (or with each other across chained stages) when the slice is
1516/// passed to `Scanner::with_fragments`.
1517fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
1518    for (i, fragment) in fragments.iter_mut().enumerate() {
1519        if fragment.id == 0 {
1520            fragment.id = start_id + i as u64;
1521        }
1522    }
1523}
1524
1525fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
1526    let mut total: u64 = 0;
1527    for stage in prior_stages {
1528        for fragment in &stage.new_fragments {
1529            let physical_rows = fragment.physical_rows.ok_or_else(|| {
1530                OmniError::manifest_internal(
1531                    "prior_stages_row_count: fragment is missing physical_rows".to_string(),
1532                )
1533            })? as u64;
1534            total += physical_rows;
1535        }
1536    }
1537    Ok(total)
1538}
1539
1540/// Assign sequential row IDs to fragments that lack them, starting from
1541/// `start_row_id`. Mirrors the relevant arm of Lance's
1542/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`)
1543/// for the `row_id_meta = None` case — fragments produced by
1544/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
1545///
1546/// Used only by `stage_append` for read-your-writes — see its docstring
1547/// for why pre-commit assignment is needed and why diverging from Lance's
1548/// commit-time IDs is safe.
1549fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
1550    let mut next_row_id = start_row_id;
1551    for fragment in fragments {
1552        if fragment.row_id_meta.is_some() {
1553            continue;
1554        }
1555        let physical_rows = fragment.physical_rows.ok_or_else(|| {
1556            OmniError::manifest_internal(
1557                "stage_append: fragment is missing physical_rows".to_string(),
1558            )
1559        })? as u64;
1560        let row_ids = next_row_id..(next_row_id + physical_rows);
1561        let sequence = RowIdSequence::from(row_ids);
1562        let serialized = write_row_ids(&sequence);
1563        fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
1564        next_row_id += physical_rows;
1565    }
1566    Ok(())
1567}
1568
1569/// Collect the set of values in a Utf8 column across multiple batches.
1570/// Used by `scan_with_pending`'s merge-semantic path to identify
1571/// committed rows that are shadowed by pending writes. NULL values are
1572/// skipped.
1573fn collect_string_column_values(
1574    batches: &[RecordBatch],
1575    column: &str,
1576) -> Result<std::collections::HashSet<String>> {
1577    use arrow_array::{Array, StringArray};
1578    let mut out = std::collections::HashSet::new();
1579    for batch in batches {
1580        let Some(col) = batch.column_by_name(column) else {
1581            return Err(OmniError::Lance(format!(
1582                "scan_with_pending: pending batch missing key column '{}'",
1583                column
1584            )));
1585        };
1586        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1587            OmniError::Lance(format!(
1588                "scan_with_pending: key column '{}' is not Utf8",
1589                column
1590            ))
1591        })?;
1592        for i in 0..arr.len() {
1593            if arr.is_valid(i) {
1594                out.insert(arr.value(i).to_string());
1595            }
1596        }
1597    }
1598    Ok(out)
1599}
1600
1601/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
1602/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
1603/// rows that pending has already updated. Returns the surviving rows.
1604///
1605/// `scan_with_pending` validates up front that the projection contains
1606/// `column`, so a missing column here is a programmer error — error
1607/// loudly instead of silently passing batches through (which would
1608/// re-introduce the union semantics the caller asked us to avoid).
1609fn filter_out_rows_where_string_in(
1610    batches: Vec<RecordBatch>,
1611    column: &str,
1612    excluded: &std::collections::HashSet<String>,
1613) -> Result<Vec<RecordBatch>> {
1614    use arrow_array::{Array, BooleanArray, StringArray};
1615    let mut out = Vec::with_capacity(batches.len());
1616    for batch in batches {
1617        if batch.num_rows() == 0 {
1618            out.push(batch);
1619            continue;
1620        }
1621        let col = batch.column_by_name(column).ok_or_else(|| {
1622            OmniError::manifest_internal(format!(
1623                "scan_with_pending: committed batch missing key column '{}' \
1624                 (the up-front projection check should have rejected this)",
1625                column
1626            ))
1627        })?;
1628        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1629            OmniError::Lance(format!(
1630                "scan_with_pending: committed column '{}' is not Utf8",
1631                column
1632            ))
1633        })?;
1634        let mask: BooleanArray = (0..arr.len())
1635            .map(|i| {
1636                if arr.is_valid(i) {
1637                    Some(!excluded.contains(arr.value(i)))
1638                } else {
1639                    Some(true)
1640                }
1641            })
1642            .collect();
1643        let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
1644            .map_err(|e| OmniError::Lance(e.to_string()))?;
1645        out.push(filtered);
1646    }
1647    Ok(out)
1648}
1649
1650/// Apply `projection` and `filter` to in-memory pending batches via a
1651/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
1652/// the read-your-writes side of the in-memory staging accumulator.
1653///
1654/// `pending_batches` must be non-empty (the caller short-circuits on
1655/// empty).
1656///
1657/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
1658/// on the committed side. Lance and DataFusion both accept standard
1659/// SQL comparison predicates (`col op literal`) and OmniGraph's
1660/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
1661/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
1662/// scanner extension (vector search, FTS, `_rowid` references) into
1663/// the filter, this function will need explicit translation — DataFusion
1664/// won't recognize those operators against the in-memory `MemTable`.
1665async fn scan_pending_batches(
1666    pending_batches: &[RecordBatch],
1667    pending_schema: Option<SchemaRef>,
1668    projection: Option<&[&str]>,
1669    filter: Option<&str>,
1670) -> Result<Vec<RecordBatch>> {
1671    let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
1672    let ctx = datafusion::execution::context::SessionContext::new();
1673    let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
1674        .map_err(|e| OmniError::Lance(e.to_string()))?;
1675    ctx.register_table("pending", Arc::new(mem))
1676        .map_err(|e| OmniError::Lance(e.to_string()))?;
1677
1678    let proj = projection
1679        .map(|cols| {
1680            cols.iter()
1681                .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
1682                .collect::<Vec<_>>()
1683                .join(", ")
1684        })
1685        .unwrap_or_else(|| "*".to_string());
1686    let where_clause = filter.map(|f| format!("WHERE {f}")).unwrap_or_default();
1687    let sql = format!("SELECT {proj} FROM pending {where_clause}");
1688    let df = ctx
1689        .sql(&sql)
1690        .await
1691        .map_err(|e| OmniError::Lance(e.to_string()))?;
1692    df.collect()
1693        .await
1694        .map_err(|e| OmniError::Lance(e.to_string()))
1695}
1696
1697fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
1698    let removed: std::collections::HashSet<u64> = staged
1699        .iter()
1700        .flat_map(|w| w.removed_fragment_ids.iter().copied())
1701        .collect();
1702    let mut combined: Vec<Fragment> = ds
1703        .manifest
1704        .fragments
1705        .iter()
1706        .filter(|f| !removed.contains(&f.id))
1707        .cloned()
1708        .collect();
1709    for write in staged {
1710        combined.extend(write.new_fragments.iter().cloned());
1711    }
1712    combined
1713}
1714
1715/// Precondition guard for `merge_insert_batch` and `stage_merge_insert`.
1716/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
1717/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
1718/// collapse genuine duplicate source keys; this check restores fail-fast
1719/// behavior on real dups by erroring before the builder gets a chance to
1720/// silently skip them.
1721///
1722/// Today only single-column string keys are used at the call sites
1723/// (`vec!["id".to_string()]`). The check restricts itself to that shape
1724/// and surfaces an internal error if a future caller passes anything
1725/// else — keeping the assumption explicit instead of silently degrading.
1726fn check_batch_unique_by_keys(
1727    batch: &RecordBatch,
1728    key_columns: &[String],
1729    context: &'static str,
1730) -> Result<()> {
1731    if key_columns.len() != 1 {
1732        return Err(OmniError::manifest_internal(format!(
1733            "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}",
1734            context, key_columns
1735        )));
1736    }
1737    let key_col_name = &key_columns[0];
1738    let column = batch.column_by_name(key_col_name).ok_or_else(|| {
1739        OmniError::manifest_internal(format!(
1740            "{}: source batch missing key column '{}'",
1741            context, key_col_name
1742        ))
1743    })?;
1744    let strs = column
1745        .as_any()
1746        .downcast_ref::<StringArray>()
1747        .ok_or_else(|| {
1748            OmniError::manifest_internal(format!(
1749                "{}: key column '{}' is not a StringArray (got {:?})",
1750                context,
1751                key_col_name,
1752                column.data_type()
1753            ))
1754        })?;
1755
1756    let mut seen: std::collections::HashSet<&str> =
1757        std::collections::HashSet::with_capacity(batch.num_rows());
1758    for i in 0..strs.len() {
1759        if !strs.is_valid(i) {
1760            continue;
1761        }
1762        let v = strs.value(i);
1763        if !seen.insert(v) {
1764            return Err(OmniError::manifest(format!(
1765                "{}: duplicate source row for key '{}' (column '{}'); \
1766                 callers must hand in a batch unique by `key_columns` \
1767                 — see MR-957",
1768                context, v, key_col_name
1769            )));
1770        }
1771    }
1772    Ok(())
1773}
1774
1775#[cfg(test)]
1776mod tests {
1777    use super::*;
1778    use arrow_array::StringArray;
1779    use arrow_schema::{DataType, Field, Schema};
1780
1781    fn batch_with_ids(ids: &[&str]) -> RecordBatch {
1782        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
1783        let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef;
1784        RecordBatch::try_new(schema, vec![col]).unwrap()
1785    }
1786
1787    #[test]
1788    fn check_batch_unique_by_keys_passes_when_all_unique() {
1789        let batch = batch_with_ids(&["a", "b", "c"]);
1790        check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap();
1791    }
1792
1793    #[test]
1794    fn check_batch_unique_by_keys_errors_on_duplicate_id() {
1795        let batch = batch_with_ids(&["a", "b", "a"]);
1796        let err =
1797            check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err();
1798        let msg = err.to_string();
1799        assert!(
1800            msg.contains("duplicate source row for key 'a'"),
1801            "unexpected error: {msg}"
1802        );
1803        assert!(msg.contains("MR-957"), "error should reference MR-957: {msg}");
1804    }
1805
1806    #[test]
1807    fn check_batch_unique_by_keys_rejects_multi_column_keys() {
1808        let batch = batch_with_ids(&["a"]);
1809        let err = check_batch_unique_by_keys(
1810            &batch,
1811            &["id".to_string(), "other".to_string()],
1812            "test",
1813        )
1814        .unwrap_err();
1815        assert!(err.to_string().contains("single-column keys only"));
1816    }
1817}