1use arrow_array::{
2 Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
3};
4use arrow_schema::SchemaRef;
5use arrow_select::concat::concat_batches;
6use futures::TryStreamExt;
7use lance::Dataset;
8use lance::blob::BlobArrayBuilder;
9use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
10use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{
13 CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
14 WriteParams,
15};
16use lance::datatypes::BlobKind;
17use lance::index::DatasetIndexExt;
18use lance::index::scalar::IndexDetails;
19use lance_file::version::LanceFileVersion;
20use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
21use lance_index::{IndexType, is_system_index};
22use lance_linalg::distance::MetricType;
23use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
24use lance_table::rowids::{RowIdSequence, write_row_ids};
25use std::sync::Arc;
26
27use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
28use crate::db::{Snapshot, SubTableEntry};
29use crate::error::{OmniError, Result};
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct TableState {
33 pub version: u64,
34 pub row_count: u64,
35 pub(crate) version_metadata: TableVersionMetadata,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct DeleteState {
40 pub version: u64,
41 pub row_count: u64,
42 pub deleted_rows: usize,
43 pub(crate) version_metadata: TableVersionMetadata,
44}
45
46#[derive(Debug, Clone)]
68pub struct StagedWrite {
69 pub transaction: Transaction,
70 pub new_fragments: Vec<Fragment>,
77 pub removed_fragment_ids: Vec<u64>,
84}
85
86#[derive(Debug, Clone)]
87pub struct TableStore {
88 root_uri: String,
89}
90
91impl TableStore {
92 pub fn new(root_uri: &str) -> Self {
93 Self {
94 root_uri: root_uri.trim_end_matches('/').to_string(),
95 }
96 }
97
98 pub fn root_uri(&self) -> &str {
99 &self.root_uri
100 }
101
102 pub fn dataset_uri(&self, table_path: &str) -> String {
103 format!("{}/{}", self.root_uri, table_path)
104 }
105
106 fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
107 let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
108 let table_path = dataset_uri
109 .strip_prefix(&prefix)
110 .map(|path| path.to_string())
111 .ok_or_else(|| {
112 OmniError::manifest_internal(format!(
113 "dataset uri '{}' is not under root '{}'",
114 dataset_uri, self.root_uri
115 ))
116 })?;
117 Ok(table_path
118 .split_once("/tree/")
119 .map(|(path, _)| path.to_string())
120 .unwrap_or(table_path))
121 }
122
123 fn dataset_version_metadata(
124 &self,
125 dataset_uri: &str,
126 ds: &Dataset,
127 ) -> Result<TableVersionMetadata> {
128 let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
129 TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
130 }
131
132 pub async fn open_snapshot_table(
133 &self,
134 snapshot: &Snapshot,
135 table_key: &str,
136 ) -> Result<Dataset> {
137 snapshot.open(table_key).await
138 }
139
140 pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
141 entry.open(&self.root_uri).await
142 }
143
144 pub async fn open_dataset_head(
145 &self,
146 dataset_uri: &str,
147 branch: Option<&str>,
148 ) -> Result<Dataset> {
149 let ds = Dataset::open(dataset_uri)
150 .await
151 .map_err(|e| OmniError::Lance(e.to_string()))?;
152 match branch {
153 Some(branch) if branch != "main" => ds
154 .checkout_branch(branch)
155 .await
156 .map_err(|e| OmniError::Lance(e.to_string())),
157 _ => Ok(ds),
158 }
159 }
160
161 pub async fn open_dataset_head_for_write(
162 &self,
163 table_key: &str,
164 dataset_uri: &str,
165 branch: Option<&str>,
166 ) -> Result<Dataset> {
167 let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
168 open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
169 }
170
171 pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
172 let mut ds = Dataset::open(dataset_uri)
173 .await
174 .map_err(|e| OmniError::Lance(e.to_string()))?;
175 ds.delete_branch(branch)
176 .await
177 .map_err(|e| OmniError::Lance(e.to_string()))
178 }
179
180 pub async fn open_dataset_at_state(
181 &self,
182 table_path: &str,
183 branch: Option<&str>,
184 version: u64,
185 ) -> Result<Dataset> {
186 let ds = self
187 .open_dataset_head(&self.dataset_uri(table_path), branch)
188 .await?;
189 ds.checkout_version(version)
190 .await
191 .map_err(|e| OmniError::Lance(e.to_string()))
192 }
193
194 pub fn ensure_expected_version(
195 &self,
196 ds: &Dataset,
197 table_key: &str,
198 expected_version: u64,
199 ) -> Result<()> {
200 let actual = ds.version().version;
201 if actual != expected_version {
202 return Err(OmniError::manifest_expected_version_mismatch(
208 table_key,
209 expected_version,
210 actual,
211 ));
212 }
213 Ok(())
214 }
215
216 pub async fn reopen_for_mutation(
217 &self,
218 dataset_uri: &str,
219 branch: Option<&str>,
220 table_key: &str,
221 expected_version: u64,
222 ) -> Result<Dataset> {
223 let ds = self
224 .open_dataset_head_for_write(table_key, dataset_uri, branch)
225 .await?;
226 self.ensure_expected_version(&ds, table_key, expected_version)?;
227 Ok(ds)
228 }
229
230 pub async fn fork_branch_from_state(
231 &self,
232 dataset_uri: &str,
233 source_branch: Option<&str>,
234 table_key: &str,
235 source_version: u64,
236 target_branch: &str,
237 ) -> Result<Dataset> {
238 let mut source_ds = self
239 .open_dataset_head(dataset_uri, source_branch)
240 .await?
241 .checkout_version(source_version)
242 .await
243 .map_err(|e| OmniError::Lance(e.to_string()))?;
244 self.ensure_expected_version(&source_ds, table_key, source_version)?;
245
246 match source_ds
247 .create_branch(target_branch, source_version, None)
248 .await
249 {
250 Ok(_) => {}
251 Err(create_err) => match self
252 .open_dataset_head(dataset_uri, Some(target_branch))
253 .await
254 {
255 Ok(ds) => {
256 self.ensure_expected_version(&ds, table_key, source_version)?;
257 return Ok(ds);
258 }
259 Err(_) => return Err(OmniError::Lance(create_err.to_string())),
260 },
261 }
262
263 let ds = self
264 .open_dataset_head(dataset_uri, Some(target_branch))
265 .await?;
266 self.ensure_expected_version(&ds, table_key, source_version)?;
267 Ok(ds)
268 }
269
270 pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
271 self.scan(ds, None, None, None).await
272 }
273
274 pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
275 let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
276 if !has_blob_columns {
277 return self.scan_batches(ds).await;
278 }
279
280 let batches = Self::scan_stream(ds, None, None, None, true)
281 .await?
282 .try_collect::<Vec<RecordBatch>>()
283 .await
284 .map_err(|e| OmniError::Lance(e.to_string()))?;
285 let mut materialized = Vec::with_capacity(batches.len());
286 for batch in batches {
287 materialized.push(Self::materialize_blob_batch(ds, batch).await?);
288 }
289 Ok(materialized)
290 }
291
292 pub(crate) async fn materialize_blob_batch(
293 ds: &Dataset,
294 batch: RecordBatch,
295 ) -> Result<RecordBatch> {
296 let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
297 if !has_blob_columns {
298 return Ok(batch);
299 }
300
301 let row_ids = batch
302 .column_by_name("_rowid")
303 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
304 .ok_or_else(|| {
305 OmniError::Lance("expected _rowid column when materializing blobs".to_string())
306 })?
307 .values()
308 .iter()
309 .copied()
310 .collect::<Vec<_>>();
311
312 let schema: SchemaRef = Arc::new(ds.schema().into());
313 let mut columns = Vec::with_capacity(schema.fields().len());
314 for field in schema.fields() {
315 let lance_field = lance::datatypes::Field::try_from(field.as_ref())
316 .map_err(|e| OmniError::Lance(e.to_string()))?;
317 let column = batch.column_by_name(field.name()).ok_or_else(|| {
318 OmniError::Lance(format!("batch missing column '{}'", field.name()))
319 })?;
320 if lance_field.is_blob() {
321 let descriptions =
322 column
323 .as_any()
324 .downcast_ref::<StructArray>()
325 .ok_or_else(|| {
326 OmniError::Lance(format!(
327 "expected blob descriptions for '{}'",
328 field.name()
329 ))
330 })?;
331 columns.push(
332 Self::rebuild_blob_column(ds, field.name(), descriptions, &row_ids).await?,
333 );
334 } else {
335 columns.push(column.clone());
336 }
337 }
338
339 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
340 }
341
342 async fn rebuild_blob_column(
343 ds: &Dataset,
344 column_name: &str,
345 descriptions: &StructArray,
346 row_ids: &[u64],
347 ) -> Result<ArrayRef> {
348 let mut builder = BlobArrayBuilder::new(row_ids.len());
349 let mut non_null_row_ids = Vec::new();
350 let mut row_has_blob = Vec::with_capacity(row_ids.len());
351
352 for row in 0..row_ids.len() {
353 let is_null = Self::blob_description_is_null(descriptions, row)?;
354 row_has_blob.push(!is_null);
355 if !is_null {
356 non_null_row_ids.push(row_ids[row]);
357 }
358 }
359
360 let blob_files = if non_null_row_ids.is_empty() {
361 Vec::new()
362 } else {
363 Arc::new(ds.clone())
364 .take_blobs(&non_null_row_ids, column_name)
365 .await
366 .map_err(|e| OmniError::Lance(e.to_string()))?
367 };
368
369 let mut files = blob_files.into_iter();
370 for has_blob in row_has_blob {
371 if !has_blob {
372 builder
373 .push_null()
374 .map_err(|e| OmniError::Lance(e.to_string()))?;
375 continue;
376 }
377
378 let blob = files.next().ok_or_else(|| {
379 OmniError::Lance(format!(
380 "blob rewrite for '{}' lost alignment with source rows",
381 column_name
382 ))
383 })?;
384 builder
385 .push_bytes(
386 blob.read()
387 .await
388 .map_err(|e| OmniError::Lance(e.to_string()))?,
389 )
390 .map_err(|e| OmniError::Lance(e.to_string()))?;
391 }
392
393 if files.next().is_some() {
394 return Err(OmniError::Lance(format!(
395 "blob rewrite for '{}' produced extra source blobs",
396 column_name
397 )));
398 }
399
400 builder
401 .finish()
402 .map_err(|e| OmniError::Lance(e.to_string()))
403 }
404
405 fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
406 if descriptions.is_null(row) {
407 return Ok(true);
408 }
409
410 let position = descriptions
411 .column_by_name("position")
412 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
413 .ok_or_else(|| {
414 OmniError::Lance(format!(
415 "unrecognized blob description schema {:?}: missing UInt64 position field",
416 descriptions.fields()
417 ))
418 })?;
419 let size = descriptions
420 .column_by_name("size")
421 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
422 .ok_or_else(|| {
423 OmniError::Lance(format!(
424 "unrecognized blob description schema {:?}: missing UInt64 size field",
425 descriptions.fields()
426 ))
427 })?;
428
429 let Some(kind_column) = descriptions.column_by_name("kind") else {
430 return Ok(position.is_null(row) || size.is_null(row));
431 };
432 let kind = if let Some(kind) = kind_column.as_any().downcast_ref::<UInt8Array>() {
433 if kind.is_null(row) {
434 return Ok(true);
435 }
436 kind.value(row)
437 } else if let Some(kind) = kind_column.as_any().downcast_ref::<UInt32Array>() {
438 if kind.is_null(row) {
439 return Ok(true);
440 }
441 kind.value(row) as u8
442 } else {
443 return Err(OmniError::Lance(format!(
444 "unrecognized blob description schema {:?}: kind field must be UInt8 or UInt32",
445 descriptions.fields()
446 )));
447 };
448
449 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
450 if kind != BlobKind::Inline {
451 return Ok(false);
452 }
453 let blob_uri = descriptions
454 .column_by_name("blob_uri")
455 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
456 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
457
458 Ok((position.is_null(row) || position.value(row) == 0)
459 && (size.is_null(row) || size.value(row) == 0)
460 && blob_uri.unwrap_or("").is_empty())
461 }
462
463 pub async fn scan_stream(
464 ds: &Dataset,
465 projection: Option<&[&str]>,
466 filter: Option<&str>,
467 order_by: Option<Vec<ColumnOrdering>>,
468 with_row_id: bool,
469 ) -> Result<DatasetRecordBatchStream> {
470 Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
471 }
472
473 pub async fn scan_stream_with<F>(
474 ds: &Dataset,
475 projection: Option<&[&str]>,
476 filter: Option<&str>,
477 order_by: Option<Vec<ColumnOrdering>>,
478 with_row_id: bool,
479 configure: F,
480 ) -> Result<DatasetRecordBatchStream>
481 where
482 F: FnOnce(&mut Scanner) -> Result<()>,
483 {
484 let mut scanner = ds.scan();
485 if with_row_id {
486 scanner.with_row_id();
487 }
488 if let Some(columns) = projection {
489 scanner
490 .project(columns)
491 .map_err(|e| OmniError::Lance(e.to_string()))?;
492 }
493 if let Some(filter_sql) = filter {
494 scanner
495 .filter(filter_sql)
496 .map_err(|e| OmniError::Lance(e.to_string()))?;
497 }
498 if let Some(ordering) = order_by {
499 scanner
500 .order_by(Some(ordering))
501 .map_err(|e| OmniError::Lance(e.to_string()))?;
502 }
503 configure(&mut scanner)?;
504 scanner
505 .try_into_stream()
506 .await
507 .map_err(|e| OmniError::Lance(e.to_string()))
508 }
509
510 pub async fn scan(
511 &self,
512 ds: &Dataset,
513 projection: Option<&[&str]>,
514 filter: Option<&str>,
515 order_by: Option<Vec<ColumnOrdering>>,
516 ) -> Result<Vec<RecordBatch>> {
517 Self::scan_stream(ds, projection, filter, order_by, false)
518 .await?
519 .try_collect()
520 .await
521 .map_err(|e| OmniError::Lance(e.to_string()))
522 }
523
524 pub async fn scan_with<F>(
525 &self,
526 ds: &Dataset,
527 projection: Option<&[&str]>,
528 filter: Option<&str>,
529 order_by: Option<Vec<ColumnOrdering>>,
530 with_row_id: bool,
531 configure: F,
532 ) -> Result<Vec<RecordBatch>>
533 where
534 F: FnOnce(&mut Scanner) -> Result<()>,
535 {
536 Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
537 .await?
538 .try_collect()
539 .await
540 .map_err(|e| OmniError::Lance(e.to_string()))
541 }
542
543 pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
544 ds.count_rows(filter)
545 .await
546 .map(|count| count as usize)
547 .map_err(|e| OmniError::Lance(e.to_string()))
548 }
549
550 pub fn dataset_version(&self, ds: &Dataset) -> u64 {
551 ds.version().version
552 }
553
554 pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
555 Ok(TableState {
556 version: self.dataset_version(ds),
557 row_count: self.count_rows(ds, None).await? as u64,
558 version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
559 })
560 }
561
562 pub async fn append_batch(
563 &self,
564 dataset_uri: &str,
565 ds: &mut Dataset,
566 batch: RecordBatch,
567 ) -> Result<TableState> {
568 if batch.num_rows() == 0 {
569 return self.table_state(dataset_uri, ds).await;
570 }
571 let schema = batch.schema();
572 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
573 let params = WriteParams {
574 mode: WriteMode::Append,
575 allow_external_blob_outside_bases: true,
576 ..Default::default()
577 };
578 ds.append(reader, Some(params))
579 .await
580 .map_err(|e| OmniError::Lance(e.to_string()))?;
581 self.table_state(dataset_uri, ds).await
582 }
583
584 pub async fn append_or_create_batch(
585 dataset_uri: &str,
586 dataset: Option<Dataset>,
587 batch: RecordBatch,
588 ) -> Result<Dataset> {
589 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
590 match dataset {
591 Some(mut ds) => {
592 let params = WriteParams {
593 mode: WriteMode::Append,
594 allow_external_blob_outside_bases: true,
595 ..Default::default()
596 };
597 ds.append(reader, Some(params))
598 .await
599 .map_err(|e| OmniError::Lance(e.to_string()))?;
600 Ok(ds)
601 }
602 None => {
603 let params = WriteParams {
604 mode: WriteMode::Create,
605 enable_stable_row_ids: true,
606 data_storage_version: Some(LanceFileVersion::V2_2),
607 allow_external_blob_outside_bases: true,
608 ..Default::default()
609 };
610 Dataset::write(reader, dataset_uri, Some(params))
611 .await
612 .map_err(|e| OmniError::Lance(e.to_string()))
613 }
614 }
615 }
616
617 pub async fn overwrite_batch(
618 &self,
619 dataset_uri: &str,
620 ds: &mut Dataset,
621 batch: RecordBatch,
622 ) -> Result<TableState> {
623 ds.truncate_table()
624 .await
625 .map_err(|e| OmniError::Lance(e.to_string()))?;
626 self.append_batch(dataset_uri, ds, batch).await
627 }
628
629 pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
630 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
631 let params = WriteParams {
632 mode: WriteMode::Overwrite,
633 enable_stable_row_ids: true,
634 data_storage_version: Some(LanceFileVersion::V2_2),
635 allow_external_blob_outside_bases: true,
636 ..Default::default()
637 };
638 Dataset::write(reader, dataset_uri, Some(params))
639 .await
640 .map_err(|e| OmniError::Lance(e.to_string()))
641 }
642
643 pub async fn merge_insert_batch(
644 &self,
645 dataset_uri: &str,
646 ds: Dataset,
647 batch: RecordBatch,
648 key_columns: Vec<String>,
649 when_matched: WhenMatched,
650 when_not_matched: WhenNotMatched,
651 ) -> Result<TableState> {
652 if batch.num_rows() == 0 {
653 return self.table_state(dataset_uri, &ds).await;
654 }
655
656 check_batch_unique_by_keys(&batch, &key_columns, "merge_insert_batch")?;
661
662 let ds = Arc::new(ds);
667 let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
668 .map_err(|e| OmniError::Lance(e.to_string()))?;
669 builder.when_matched(when_matched);
670 builder.when_not_matched(when_not_matched);
671 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
707 let job = builder
708 .try_build()
709 .map_err(|e| OmniError::Lance(e.to_string()))?;
710
711 let schema = batch.schema();
712 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
713 let (new_ds, _stats) = job
714 .execute(lance_datafusion::utils::reader_to_stream(Box::new(reader)))
715 .await
716 .map_err(|e| OmniError::Lance(e.to_string()))?;
717 self.table_state(dataset_uri, &new_ds).await
718 }
719
720 pub async fn merge_insert_batches(
721 &self,
722 dataset_uri: &str,
723 ds: Dataset,
724 batches: Vec<RecordBatch>,
725 key_columns: Vec<String>,
726 when_matched: WhenMatched,
727 when_not_matched: WhenNotMatched,
728 ) -> Result<TableState> {
729 if batches.is_empty() {
730 return self.table_state(dataset_uri, &ds).await;
731 }
732 let batch = if batches.len() == 1 {
733 batches.into_iter().next().unwrap()
734 } else {
735 let schema = batches[0].schema();
736 concat_batches(&schema, &batches).map_err(|e| OmniError::Lance(e.to_string()))?
737 };
738 self.merge_insert_batch(
739 dataset_uri,
740 ds,
741 batch,
742 key_columns,
743 when_matched,
744 when_not_matched,
745 )
746 .await
747 }
748
749 pub async fn delete_where(
750 &self,
751 dataset_uri: &str,
752 ds: &mut Dataset,
753 filter: &str,
754 ) -> Result<DeleteState> {
755 let delete_result = ds
756 .delete(filter)
757 .await
758 .map_err(|e| OmniError::Lance(e.to_string()))?;
759 Ok(DeleteState {
760 version: delete_result.new_dataset.version().version,
761 row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
762 deleted_rows: delete_result.num_deleted_rows as usize,
763 version_metadata: self
764 .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
765 })
766 }
767
768 pub async fn stage_append(
817 &self,
818 ds: &Dataset,
819 batch: RecordBatch,
820 prior_stages: &[StagedWrite],
821 ) -> Result<StagedWrite> {
822 if batch.num_rows() == 0 {
823 return Err(OmniError::manifest_internal(
824 "stage_append called with empty batch".to_string(),
825 ));
826 }
827 let params = WriteParams {
828 mode: WriteMode::Append,
829 allow_external_blob_outside_bases: true,
830 ..Default::default()
831 };
832 let transaction = InsertBuilder::new(Arc::new(ds.clone()))
833 .with_params(¶ms)
834 .execute_uncommitted(vec![batch])
835 .await
836 .map_err(|e| OmniError::Lance(e.to_string()))?;
837 let mut new_fragments = match &transaction.operation {
838 Operation::Append { fragments } => fragments.clone(),
839 Operation::Overwrite { fragments, .. } => fragments.clone(),
840 other => {
841 return Err(OmniError::manifest_internal(format!(
842 "stage_append: unexpected Lance operation {:?}",
843 std::mem::discriminant(other)
844 )));
845 }
846 };
847 let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
861 + 1
862 + prior_stages_fragment_count(prior_stages);
863 assign_fragment_ids(&mut new_fragments, next_id_base);
864 if ds.manifest.uses_stable_row_ids() {
865 let prior_rows = prior_stages_row_count(prior_stages)?;
866 let start_row_id = ds.manifest.next_row_id + prior_rows;
867 assign_row_id_meta(&mut new_fragments, start_row_id)?;
868 }
869 Ok(StagedWrite {
870 transaction,
871 new_fragments,
872 removed_fragment_ids: Vec::new(),
874 })
875 }
876
877 pub async fn stage_merge_insert(
906 &self,
907 ds: Dataset,
908 batch: RecordBatch,
909 key_columns: Vec<String>,
910 when_matched: WhenMatched,
911 when_not_matched: WhenNotMatched,
912 ) -> Result<StagedWrite> {
913 if batch.num_rows() == 0 {
914 return Err(OmniError::manifest_internal(
915 "stage_merge_insert called with empty batch".to_string(),
916 ));
917 }
918
919 check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
925
926 let ds = Arc::new(ds);
927 let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
928 .map_err(|e| OmniError::Lance(e.to_string()))?;
929 builder.when_matched(when_matched);
930 builder.when_not_matched(when_not_matched);
931 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
937 let job = builder
938 .try_build()
939 .map_err(|e| OmniError::Lance(e.to_string()))?;
940 let schema = batch.schema();
941 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
942 let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
943 let uncommitted = job
944 .execute_uncommitted(stream)
945 .await
946 .map_err(|e| OmniError::Lance(e.to_string()))?;
947 let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
958 Operation::Update {
959 new_fragments,
960 updated_fragments,
961 removed_fragment_ids,
962 ..
963 } => {
964 let mut all = updated_fragments.clone();
965 all.extend(new_fragments.iter().cloned());
966 (all, removed_fragment_ids.clone())
967 }
968 Operation::Append { fragments } => (fragments.clone(), Vec::new()),
969 other => {
970 return Err(OmniError::manifest_internal(format!(
971 "stage_merge_insert: unexpected Lance operation {:?}",
972 std::mem::discriminant(other)
973 )));
974 }
975 };
976 Ok(StagedWrite {
977 transaction: uncommitted.transaction,
978 new_fragments,
979 removed_fragment_ids,
980 })
981 }
982
983 pub async fn commit_staged(
988 &self,
989 ds: Arc<Dataset>,
990 transaction: Transaction,
991 ) -> Result<Dataset> {
992 CommitBuilder::new(ds)
993 .execute(transaction)
994 .await
995 .map_err(|e| OmniError::Lance(e.to_string()))
996 }
997
998 pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
1011 if batch.num_rows() == 0 {
1012 return Err(OmniError::manifest_internal(
1013 "stage_overwrite called with empty batch".to_string(),
1014 ));
1015 }
1016 let params = WriteParams {
1027 mode: WriteMode::Overwrite,
1028 enable_stable_row_ids: true,
1029 allow_external_blob_outside_bases: true,
1030 ..Default::default()
1031 };
1032 let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1033 .with_params(¶ms)
1034 .execute_uncommitted(vec![batch])
1035 .await
1036 .map_err(|e| OmniError::Lance(e.to_string()))?;
1037 let mut new_fragments = match &transaction.operation {
1038 Operation::Overwrite { fragments, .. } => fragments.clone(),
1039 other => {
1040 return Err(OmniError::manifest_internal(format!(
1041 "stage_overwrite: unexpected Lance operation {:?}",
1042 std::mem::discriminant(other)
1043 )));
1044 }
1045 };
1046 assign_fragment_ids(&mut new_fragments, 1);
1059 if ds.manifest.uses_stable_row_ids() {
1060 assign_row_id_meta(&mut new_fragments, 0)?;
1061 }
1062 let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
1067 Ok(StagedWrite {
1068 transaction,
1069 new_fragments,
1070 removed_fragment_ids,
1071 })
1072 }
1073
1074 pub async fn stage_create_btree_index(
1092 &self,
1093 ds: &Dataset,
1094 columns: &[&str],
1095 ) -> Result<StagedWrite> {
1096 let params = ScalarIndexParams::default();
1097 let mut ds_clone = ds.clone();
1098 let new_idx = ds_clone
1099 .create_index_builder(columns, IndexType::BTree, ¶ms)
1100 .replace(true)
1101 .execute_uncommitted()
1102 .await
1103 .map_err(|e| OmniError::Lance(format!("stage_create_btree_index: {}", e)))?;
1104 let removed_indices: Vec<IndexMetadata> = ds
1105 .load_indices()
1106 .await
1107 .map_err(|e| OmniError::Lance(e.to_string()))?
1108 .iter()
1109 .filter(|idx| idx.name == new_idx.name)
1110 .cloned()
1111 .collect();
1112 let transaction = TransactionBuilder::new(
1113 new_idx.dataset_version,
1114 Operation::CreateIndex {
1115 new_indices: vec![new_idx],
1116 removed_indices,
1117 },
1118 )
1119 .build();
1120 Ok(StagedWrite {
1121 transaction,
1122 new_fragments: Vec::new(),
1123 removed_fragment_ids: Vec::new(),
1124 })
1125 }
1126
1127 pub async fn stage_create_inverted_index(
1131 &self,
1132 ds: &Dataset,
1133 column: &str,
1134 ) -> Result<StagedWrite> {
1135 let params = InvertedIndexParams::default();
1136 let mut ds_clone = ds.clone();
1137 let new_idx = ds_clone
1138 .create_index_builder(&[column], IndexType::Inverted, ¶ms)
1139 .replace(true)
1140 .execute_uncommitted()
1141 .await
1142 .map_err(|e| OmniError::Lance(format!("stage_create_inverted_index: {}", e)))?;
1143 let removed_indices: Vec<IndexMetadata> = ds
1144 .load_indices()
1145 .await
1146 .map_err(|e| OmniError::Lance(e.to_string()))?
1147 .iter()
1148 .filter(|idx| idx.name == new_idx.name)
1149 .cloned()
1150 .collect();
1151 let transaction = TransactionBuilder::new(
1152 new_idx.dataset_version,
1153 Operation::CreateIndex {
1154 new_indices: vec![new_idx],
1155 removed_indices,
1156 },
1157 )
1158 .build();
1159 Ok(StagedWrite {
1160 transaction,
1161 new_fragments: Vec::new(),
1162 removed_fragment_ids: Vec::new(),
1163 })
1164 }
1165
1166 pub async fn scan_with_staged(
1195 &self,
1196 ds: &Dataset,
1197 staged: &[StagedWrite],
1198 projection: Option<&[&str]>,
1199 filter: Option<&str>,
1200 ) -> Result<Vec<RecordBatch>> {
1201 if staged.is_empty() {
1202 return self.scan(ds, projection, filter, None).await;
1203 }
1204 let mut scanner = ds.scan();
1205 if let Some(cols) = projection {
1206 let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
1207 scanner
1208 .project(&owned)
1209 .map_err(|e| OmniError::Lance(e.to_string()))?;
1210 }
1211 if let Some(f) = filter {
1212 scanner
1213 .filter(f)
1214 .map_err(|e| OmniError::Lance(e.to_string()))?;
1215 }
1216 scanner.with_fragments(combine_committed_with_staged(ds, staged));
1217 let stream = scanner
1218 .try_into_stream()
1219 .await
1220 .map_err(|e| OmniError::Lance(e.to_string()))?;
1221 stream
1222 .try_collect()
1223 .await
1224 .map_err(|e| OmniError::Lance(e.to_string()))
1225 }
1226
1227 pub async fn scan_with_pending(
1266 &self,
1267 committed_ds: &Dataset,
1268 pending_batches: &[RecordBatch],
1269 pending_schema: Option<SchemaRef>,
1270 projection: Option<&[&str]>,
1271 filter: Option<&str>,
1272 key_column: Option<&str>,
1273 ) -> Result<Vec<RecordBatch>> {
1274 if let (Some(key_col), Some(cols)) = (key_column, projection) {
1283 if !cols.iter().any(|c| *c == key_col) {
1284 return Err(OmniError::Lance(format!(
1285 "scan_with_pending: key_column '{}' must appear in projection \
1286 when merge-shadow semantics are requested (got projection = {:?})",
1287 key_col, cols
1288 )));
1289 }
1290 }
1291
1292 let committed = self.scan(committed_ds, projection, filter, None).await?;
1293 if pending_batches.is_empty() {
1294 return Ok(committed);
1295 }
1296
1297 let committed = match key_column {
1303 Some(key_col) => {
1304 let pending_keys = collect_string_column_values(pending_batches, key_col)?;
1305 if pending_keys.is_empty() {
1306 committed
1307 } else {
1308 filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
1309 }
1310 }
1311 None => committed,
1312 };
1313
1314 let pending =
1315 scan_pending_batches(pending_batches, pending_schema, projection, filter).await?;
1316
1317 let mut out = committed;
1318 out.extend(pending);
1319 Ok(out)
1320 }
1321
1322 pub async fn count_rows_with_staged(
1327 &self,
1328 ds: &Dataset,
1329 staged: &[StagedWrite],
1330 filter: Option<String>,
1331 ) -> Result<usize> {
1332 if staged.is_empty() {
1333 return self.count_rows(ds, filter).await;
1334 }
1335 let mut scanner = ds.scan();
1336 if let Some(f) = filter {
1337 scanner
1338 .filter(&f)
1339 .map_err(|e| OmniError::Lance(e.to_string()))?;
1340 }
1341 scanner.with_fragments(combine_committed_with_staged(ds, staged));
1342 let count = scanner
1343 .count_rows()
1344 .await
1345 .map_err(|e| OmniError::Lance(e.to_string()))?;
1346 Ok(count as usize)
1347 }
1348
1349 async fn user_indices_for_column(
1350 &self,
1351 ds: &Dataset,
1352 column: &str,
1353 ) -> Result<Vec<IndexMetadata>> {
1354 let field_id = ds
1355 .schema()
1356 .field(column)
1357 .map(|field| field.id)
1358 .ok_or_else(|| {
1359 OmniError::manifest_internal(format!(
1360 "dataset is missing expected index column '{}'",
1361 column
1362 ))
1363 })?;
1364 let indices = ds
1365 .load_indices()
1366 .await
1367 .map_err(|e| OmniError::Lance(e.to_string()))?;
1368 Ok(indices
1369 .iter()
1370 .filter(|index| !is_system_index(index))
1371 .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
1372 .cloned()
1373 .collect())
1374 }
1375
1376 pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1377 let indices = self.user_indices_for_column(ds, column).await?;
1378 Ok(indices.iter().any(|index| {
1379 index
1380 .index_details
1381 .as_ref()
1382 .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
1383 .unwrap_or(false)
1384 }))
1385 }
1386
1387 pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1388 let indices = self.user_indices_for_column(ds, column).await?;
1389 Ok(indices.iter().any(|index| {
1390 index
1391 .index_details
1392 .as_ref()
1393 .map(|details| IndexDetails(details.clone()).supports_fts())
1394 .unwrap_or(false)
1395 }))
1396 }
1397
1398 pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1399 let indices = self.user_indices_for_column(ds, column).await?;
1400 Ok(indices.iter().any(|index| {
1401 index
1402 .index_details
1403 .as_ref()
1404 .map(|details| IndexDetails(details.clone()).is_vector())
1405 .unwrap_or(false)
1406 }))
1407 }
1408
1409 pub async fn create_btree_index(&self, ds: &mut Dataset, columns: &[&str]) -> Result<()> {
1410 let params = ScalarIndexParams::default();
1411 ds.create_index_builder(columns, IndexType::BTree, ¶ms)
1412 .replace(true)
1413 .await
1414 .map(|_| ())
1415 .map_err(|e| OmniError::Lance(e.to_string()))
1416 }
1417
1418 pub async fn create_inverted_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1419 let params = InvertedIndexParams::default();
1420 ds.create_index_builder(&[column], IndexType::Inverted, ¶ms)
1421 .replace(true)
1422 .await
1423 .map(|_| ())
1424 .map_err(|e| OmniError::Lance(e.to_string()))
1425 }
1426
1427 pub async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1428 let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
1429 ds.create_index_builder(&[column], IndexType::Vector, ¶ms)
1430 .replace(true)
1431 .await
1432 .map(|_| ())
1433 .map_err(|e| OmniError::Lance(e.to_string()))
1434 }
1435
1436 pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
1437 let batch = RecordBatch::new_empty(schema.clone());
1438 Self::write_dataset(dataset_uri, batch).await
1439 }
1440
1441 pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
1442 let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
1443 .await?
1444 .try_collect::<Vec<RecordBatch>>()
1445 .await
1446 .map_err(|e| OmniError::Lance(e.to_string()))?;
1447 Ok(batches.iter().find_map(|batch| {
1448 batch
1449 .column_by_name("_rowid")
1450 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1451 .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
1452 }))
1453 }
1454
1455 pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
1456 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
1457 let params = WriteParams {
1458 mode: WriteMode::Create,
1459 enable_stable_row_ids: true,
1460 data_storage_version: Some(LanceFileVersion::V2_2),
1461 allow_external_blob_outside_bases: true,
1462 ..Default::default()
1463 };
1464 Dataset::write(reader, dataset_uri, Some(params))
1465 .await
1466 .map_err(|e| OmniError::Lance(e.to_string()))
1467 }
1468}
1469
1470fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
1504 prior_stages
1505 .iter()
1506 .map(|s| s.new_fragments.len() as u64)
1507 .sum()
1508}
1509
1510fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
1518 for (i, fragment) in fragments.iter_mut().enumerate() {
1519 if fragment.id == 0 {
1520 fragment.id = start_id + i as u64;
1521 }
1522 }
1523}
1524
1525fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
1526 let mut total: u64 = 0;
1527 for stage in prior_stages {
1528 for fragment in &stage.new_fragments {
1529 let physical_rows = fragment.physical_rows.ok_or_else(|| {
1530 OmniError::manifest_internal(
1531 "prior_stages_row_count: fragment is missing physical_rows".to_string(),
1532 )
1533 })? as u64;
1534 total += physical_rows;
1535 }
1536 }
1537 Ok(total)
1538}
1539
1540fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
1550 let mut next_row_id = start_row_id;
1551 for fragment in fragments {
1552 if fragment.row_id_meta.is_some() {
1553 continue;
1554 }
1555 let physical_rows = fragment.physical_rows.ok_or_else(|| {
1556 OmniError::manifest_internal(
1557 "stage_append: fragment is missing physical_rows".to_string(),
1558 )
1559 })? as u64;
1560 let row_ids = next_row_id..(next_row_id + physical_rows);
1561 let sequence = RowIdSequence::from(row_ids);
1562 let serialized = write_row_ids(&sequence);
1563 fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
1564 next_row_id += physical_rows;
1565 }
1566 Ok(())
1567}
1568
1569fn collect_string_column_values(
1574 batches: &[RecordBatch],
1575 column: &str,
1576) -> Result<std::collections::HashSet<String>> {
1577 use arrow_array::{Array, StringArray};
1578 let mut out = std::collections::HashSet::new();
1579 for batch in batches {
1580 let Some(col) = batch.column_by_name(column) else {
1581 return Err(OmniError::Lance(format!(
1582 "scan_with_pending: pending batch missing key column '{}'",
1583 column
1584 )));
1585 };
1586 let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1587 OmniError::Lance(format!(
1588 "scan_with_pending: key column '{}' is not Utf8",
1589 column
1590 ))
1591 })?;
1592 for i in 0..arr.len() {
1593 if arr.is_valid(i) {
1594 out.insert(arr.value(i).to_string());
1595 }
1596 }
1597 }
1598 Ok(out)
1599}
1600
1601fn filter_out_rows_where_string_in(
1610 batches: Vec<RecordBatch>,
1611 column: &str,
1612 excluded: &std::collections::HashSet<String>,
1613) -> Result<Vec<RecordBatch>> {
1614 use arrow_array::{Array, BooleanArray, StringArray};
1615 let mut out = Vec::with_capacity(batches.len());
1616 for batch in batches {
1617 if batch.num_rows() == 0 {
1618 out.push(batch);
1619 continue;
1620 }
1621 let col = batch.column_by_name(column).ok_or_else(|| {
1622 OmniError::manifest_internal(format!(
1623 "scan_with_pending: committed batch missing key column '{}' \
1624 (the up-front projection check should have rejected this)",
1625 column
1626 ))
1627 })?;
1628 let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1629 OmniError::Lance(format!(
1630 "scan_with_pending: committed column '{}' is not Utf8",
1631 column
1632 ))
1633 })?;
1634 let mask: BooleanArray = (0..arr.len())
1635 .map(|i| {
1636 if arr.is_valid(i) {
1637 Some(!excluded.contains(arr.value(i)))
1638 } else {
1639 Some(true)
1640 }
1641 })
1642 .collect();
1643 let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
1644 .map_err(|e| OmniError::Lance(e.to_string()))?;
1645 out.push(filtered);
1646 }
1647 Ok(out)
1648}
1649
1650async fn scan_pending_batches(
1666 pending_batches: &[RecordBatch],
1667 pending_schema: Option<SchemaRef>,
1668 projection: Option<&[&str]>,
1669 filter: Option<&str>,
1670) -> Result<Vec<RecordBatch>> {
1671 let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
1672 let ctx = datafusion::execution::context::SessionContext::new();
1673 let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
1674 .map_err(|e| OmniError::Lance(e.to_string()))?;
1675 ctx.register_table("pending", Arc::new(mem))
1676 .map_err(|e| OmniError::Lance(e.to_string()))?;
1677
1678 let proj = projection
1679 .map(|cols| {
1680 cols.iter()
1681 .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
1682 .collect::<Vec<_>>()
1683 .join(", ")
1684 })
1685 .unwrap_or_else(|| "*".to_string());
1686 let where_clause = filter.map(|f| format!("WHERE {f}")).unwrap_or_default();
1687 let sql = format!("SELECT {proj} FROM pending {where_clause}");
1688 let df = ctx
1689 .sql(&sql)
1690 .await
1691 .map_err(|e| OmniError::Lance(e.to_string()))?;
1692 df.collect()
1693 .await
1694 .map_err(|e| OmniError::Lance(e.to_string()))
1695}
1696
1697fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
1698 let removed: std::collections::HashSet<u64> = staged
1699 .iter()
1700 .flat_map(|w| w.removed_fragment_ids.iter().copied())
1701 .collect();
1702 let mut combined: Vec<Fragment> = ds
1703 .manifest
1704 .fragments
1705 .iter()
1706 .filter(|f| !removed.contains(&f.id))
1707 .cloned()
1708 .collect();
1709 for write in staged {
1710 combined.extend(write.new_fragments.iter().cloned());
1711 }
1712 combined
1713}
1714
1715fn check_batch_unique_by_keys(
1727 batch: &RecordBatch,
1728 key_columns: &[String],
1729 context: &'static str,
1730) -> Result<()> {
1731 if key_columns.len() != 1 {
1732 return Err(OmniError::manifest_internal(format!(
1733 "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}",
1734 context, key_columns
1735 )));
1736 }
1737 let key_col_name = &key_columns[0];
1738 let column = batch.column_by_name(key_col_name).ok_or_else(|| {
1739 OmniError::manifest_internal(format!(
1740 "{}: source batch missing key column '{}'",
1741 context, key_col_name
1742 ))
1743 })?;
1744 let strs = column
1745 .as_any()
1746 .downcast_ref::<StringArray>()
1747 .ok_or_else(|| {
1748 OmniError::manifest_internal(format!(
1749 "{}: key column '{}' is not a StringArray (got {:?})",
1750 context,
1751 key_col_name,
1752 column.data_type()
1753 ))
1754 })?;
1755
1756 let mut seen: std::collections::HashSet<&str> =
1757 std::collections::HashSet::with_capacity(batch.num_rows());
1758 for i in 0..strs.len() {
1759 if !strs.is_valid(i) {
1760 continue;
1761 }
1762 let v = strs.value(i);
1763 if !seen.insert(v) {
1764 return Err(OmniError::manifest(format!(
1765 "{}: duplicate source row for key '{}' (column '{}'); \
1766 callers must hand in a batch unique by `key_columns` \
1767 — see MR-957",
1768 context, v, key_col_name
1769 )));
1770 }
1771 }
1772 Ok(())
1773}
1774
1775#[cfg(test)]
1776mod tests {
1777 use super::*;
1778 use arrow_array::StringArray;
1779 use arrow_schema::{DataType, Field, Schema};
1780
1781 fn batch_with_ids(ids: &[&str]) -> RecordBatch {
1782 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
1783 let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef;
1784 RecordBatch::try_new(schema, vec![col]).unwrap()
1785 }
1786
1787 #[test]
1788 fn check_batch_unique_by_keys_passes_when_all_unique() {
1789 let batch = batch_with_ids(&["a", "b", "c"]);
1790 check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap();
1791 }
1792
1793 #[test]
1794 fn check_batch_unique_by_keys_errors_on_duplicate_id() {
1795 let batch = batch_with_ids(&["a", "b", "a"]);
1796 let err = check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err();
1797 let msg = err.to_string();
1798 assert!(
1799 msg.contains("duplicate source row for key 'a'"),
1800 "unexpected error: {msg}"
1801 );
1802 assert!(
1803 msg.contains("MR-957"),
1804 "error should reference MR-957: {msg}"
1805 );
1806 }
1807
1808 #[test]
1809 fn check_batch_unique_by_keys_rejects_multi_column_keys() {
1810 let batch = batch_with_ids(&["a"]);
1811 let err =
1812 check_batch_unique_by_keys(&batch, &["id".to_string(), "other".to_string()], "test")
1813 .unwrap_err();
1814 assert!(err.to_string().contains("single-column keys only"));
1815 }
1816}