omnigraph/table_store.rs
1use arrow_array::{
2 Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
3};
4use arrow_schema::SchemaRef;
5use datafusion::physical_plan::SendableRecordBatchStream;
6use futures::TryStreamExt;
7use lance::Dataset;
8use lance::blob::BlobArrayBuilder;
9use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
10use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{
13 CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
14 WriteParams,
15};
16use lance::datatypes::{BlobKind, Schema as LanceSchema};
17use lance::index::DatasetIndexExt;
18use lance::index::scalar::IndexDetails;
19use lance_file::version::LanceFileVersion;
20use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
21use lance_index::{IndexType, is_system_index};
22use lance_linalg::distance::MetricType;
23use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
24use lance_table::rowids::{RowIdSequence, write_row_ids};
25use std::sync::Arc;
26
27use crate::db::manifest::TableVersionMetadata;
28use crate::db::{Snapshot, SubTableEntry};
29use crate::error::{OmniError, Result};
30use crate::storage_layer::ForkOutcome;
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct TableState {
34 pub version: u64,
35 pub row_count: u64,
36 pub(crate) version_metadata: TableVersionMetadata,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct DeleteState {
41 pub version: u64,
42 pub row_count: u64,
43 pub deleted_rows: usize,
44 pub(crate) version_metadata: TableVersionMetadata,
45}
46
47/// Whether a `key_col IN (...)` scan on a dataset will be served by the
48/// persisted scalar (BTREE) index, or silently fall back to a full filtered
49/// scan. Detection-only (metadata, no IO); the scan returns the correct rows
50/// either way. Surfaced by the indexed traversal path so the silent perf
51/// fallback is observable, and available to a future cost-based planner.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum IndexCoverage {
54 /// The column has a usable BTREE and every fragment records `physical_rows`.
55 Indexed,
56 /// Lance will not use the scalar index for this scan (correct, full scan).
57 Degraded { reason: String },
58}
59
60/// A Lance write that has produced fragment files on object storage but is
61/// not yet committed to the dataset's manifest. The staged-write primitives
62/// are consumed by `MutationStaging` (`exec/staging.rs`,
63/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
64/// intent: defer Lance commits to end-of-query so a mid-query failure
65/// leaves the touched table at the pre-mutation HEAD instead of
66/// drifting ahead. See `docs/dev/writes.md` for the publisher-CAS contract
67/// this builds on.
68///
69/// `transaction` is opaque from our side — Lance owns its semantics. We
70/// commit it via `CommitBuilder::execute(transaction)` (see
71/// `TableStore::commit_staged`).
72///
73/// For read-your-writes within the same query, `new_fragments` and
74/// `removed_fragment_ids` together describe the post-stage view delta:
75/// `scan_with_staged` (and `count_rows_with_staged`) compose
76/// `committed - removed + new` so subsequent reads see the staged result
77/// without double-counting fragments that `Operation::Update` rewrote.
78/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
79/// existing fragments would yield duplicate rows (the original fragment
80/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
81#[derive(Debug, Clone)]
82pub struct StagedWrite {
83 pub transaction: Transaction,
84 /// Fragments to surface alongside the committed manifest in
85 /// `Scanner::with_fragments(committed - removed + new)`. For
86 /// `Operation::Append` these are the freshly-appended fragments. For
87 /// `Operation::Update` (merge_insert) these are
88 /// `updated_fragments + new_fragments` (rewrites + freshly-inserted
89 /// rows).
90 pub new_fragments: Vec<Fragment>,
91 /// Fragment IDs that this staged write supersedes. The committed
92 /// manifest must filter these out before being combined with
93 /// `new_fragments` for read-your-writes scans, otherwise rewrites
94 /// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
95 /// adds without removing anything); populated from
96 /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
97 pub removed_fragment_ids: Vec<u64>,
98}
99
100#[derive(Debug, Clone)]
101pub struct TableStore {
102 root_uri: String,
103}
104
105impl TableStore {
106 pub fn new(root_uri: &str) -> Self {
107 Self {
108 root_uri: root_uri.trim_end_matches('/').to_string(),
109 }
110 }
111
112 pub fn root_uri(&self) -> &str {
113 &self.root_uri
114 }
115
116 pub fn dataset_uri(&self, table_path: &str) -> String {
117 format!("{}/{}", self.root_uri, table_path)
118 }
119
120 fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
121 let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
122 let table_path = dataset_uri
123 .strip_prefix(&prefix)
124 .map(|path| path.to_string())
125 .ok_or_else(|| {
126 OmniError::manifest_internal(format!(
127 "dataset uri '{}' is not under root '{}'",
128 dataset_uri, self.root_uri
129 ))
130 })?;
131 Ok(table_path
132 .split_once("/tree/")
133 .map(|(path, _)| path.to_string())
134 .unwrap_or(table_path))
135 }
136
137 fn dataset_version_metadata(
138 &self,
139 dataset_uri: &str,
140 ds: &Dataset,
141 ) -> Result<TableVersionMetadata> {
142 let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
143 TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
144 }
145
146 pub async fn open_snapshot_table(
147 &self,
148 snapshot: &Snapshot,
149 table_key: &str,
150 ) -> Result<Dataset> {
151 snapshot.open(table_key).await
152 }
153
154 pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
155 entry.open(&self.root_uri).await
156 }
157
158 pub async fn open_dataset_head(
159 &self,
160 dataset_uri: &str,
161 branch: Option<&str>,
162 ) -> Result<Dataset> {
163 // Direct open by URI (O(1) latest-resolution). Routed through the tracked
164 // opener so a cost test counts it via the per-query `table_wrapper`
165 // (no-op in production — the task-local is unset, so this is exactly
166 // `Dataset::open(uri)`).
167 let ds = crate::instrumentation::open_dataset_tracked(
168 dataset_uri,
169 crate::instrumentation::table_wrapper(),
170 )
171 .await?;
172 match branch {
173 Some(branch) if branch != "main" => ds
174 .checkout_branch(branch)
175 .await
176 .map_err(|e| OmniError::Lance(e.to_string())),
177 _ => Ok(ds),
178 }
179 }
180
181 pub async fn open_dataset_head_for_write(
182 &self,
183 table_key: &str,
184 dataset_uri: &str,
185 branch: Option<&str>,
186 ) -> Result<Dataset> {
187 // RFC-013 step 3a: open writes via the direct opener (O(1)) instead of the
188 // lance-namespace builder, which re-resolved the table's version chain
189 // O(depth) per write. The namespace is a catalog/discovery layer, not a
190 // per-open hot-path component (RFC §2.4); the manifest already holds the
191 // location, and `ensure_expected_version` still validates head == pinned
192 // for strict ops. `table_key` retained for signature stability.
193 let _ = table_key;
194 self.open_dataset_head(dataset_uri, branch).await
195 }
196
197 pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
198 let mut ds = Dataset::open(dataset_uri)
199 .await
200 .map_err(|e| OmniError::Lance(e.to_string()))?;
201 ds.delete_branch(branch)
202 .await
203 .map_err(|e| OmniError::Lance(e.to_string()))
204 }
205
206 /// List the named Lance branches present on the dataset at `dataset_uri`.
207 /// The `cleanup` orphan reconciler diffs this against the manifest branch
208 /// set to find orphaned per-table forks. `main`/default is not a named
209 /// branch and never appears here.
210 pub async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
211 let ds = Dataset::open(dataset_uri)
212 .await
213 .map_err(|e| OmniError::Lance(e.to_string()))?;
214 let branches = ds
215 .list_branches()
216 .await
217 .map_err(|e| OmniError::Lance(e.to_string()))?;
218 Ok(branches.into_keys().collect())
219 }
220
221 /// Idempotently drop `branch` from the dataset at `dataset_uri`.
222 ///
223 /// Unlike [`delete_branch`](Self::delete_branch), this tolerates an
224 /// already-absent branch — both a missing contents ref (Lance's
225 /// `force_delete_branch` handles that) and a missing `tree/{branch}/`
226 /// directory (the local-store `NotFound` quirk pinned by
227 /// `lance_surface_guards::force_delete_branch_semantics`). Safe to call on a
228 /// possibly-orphaned or already-reclaimed fork.
229 ///
230 /// A branch that still has referencing descendants (`RefConflict`) is NOT
231 /// tolerated: that is a real ordering error and surfaces as `OmniError::Lance`.
232 /// Used by the eager best-effort reclaim in `cleanup_deleted_branch_tables`
233 /// and the `cleanup` orphan reconciler.
234 pub async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
235 let mut ds = Dataset::open(dataset_uri)
236 .await
237 .map_err(|e| OmniError::Lance(e.to_string()))?;
238 match ds.force_delete_branch(branch).await {
239 Ok(()) => Ok(()),
240 Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => Ok(()),
241 Err(e) => Err(OmniError::Lance(e.to_string())),
242 }
243 }
244
245 pub async fn open_dataset_at_state(
246 &self,
247 table_path: &str,
248 branch: Option<&str>,
249 version: u64,
250 ) -> Result<Dataset> {
251 let ds = self
252 .open_dataset_head(&self.dataset_uri(table_path), branch)
253 .await?;
254 ds.checkout_version(version)
255 .await
256 .map_err(|e| OmniError::Lance(e.to_string()))
257 }
258
259 pub fn ensure_expected_version(
260 &self,
261 ds: &Dataset,
262 table_key: &str,
263 expected_version: u64,
264 ) -> Result<()> {
265 let actual = ds.version().version;
266 if actual != expected_version {
267 // Use the structured ExpectedVersionMismatch variant so callers
268 // (and the HTTP server) can match on details rather than parsing
269 // the message. This drift is a publisher-style OCC failure: the
270 // caller's pre-write view of the table version is stale relative
271 // to the on-disk Lance head.
272 return Err(OmniError::manifest_expected_version_mismatch(
273 table_key,
274 expected_version,
275 actual,
276 ));
277 }
278 Ok(())
279 }
280
281 pub async fn reopen_for_mutation(
282 &self,
283 dataset_uri: &str,
284 branch: Option<&str>,
285 table_key: &str,
286 expected_version: u64,
287 ) -> Result<Dataset> {
288 let ds = self
289 .open_dataset_head_for_write(table_key, dataset_uri, branch)
290 .await?;
291 self.ensure_expected_version(&ds, table_key, expected_version)?;
292 Ok(ds)
293 }
294
295 pub async fn fork_branch_from_state(
296 &self,
297 dataset_uri: &str,
298 source_branch: Option<&str>,
299 table_key: &str,
300 source_version: u64,
301 target_branch: &str,
302 ) -> Result<ForkOutcome<Dataset>> {
303 let mut source_ds = self
304 .open_dataset_head(dataset_uri, source_branch)
305 .await?
306 .checkout_version(source_version)
307 .await
308 .map_err(|e| OmniError::Lance(e.to_string()))?;
309 self.ensure_expected_version(&source_ds, table_key, source_version)?;
310
311 if let Err(create_err) = source_ds
312 .create_branch(target_branch, source_version, None)
313 .await
314 {
315 // Disambiguate the failure: only a genuinely pre-existing ref is a
316 // reclaim candidate. Mapping EVERY create_branch failure to
317 // `RefAlreadyExists` would route a transient I/O / version / Lance
318 // internal error into the destructive reclaim path. So check whether
319 // the ref actually exists; if not, the failure is real — propagate
320 // it (preserving error fidelity) rather than force-deleting.
321 //
322 // `list_branches` reads `_refs/branches/` from the store, so it sees
323 // a fully-formed manifest-unreferenced fork (our common case — a
324 // create_branch that completed but whose manifest publish did not).
325 // It does NOT see a phase-1-only Lance "zombie" (tree dir written,
326 // no BranchContents) — but neither does `cleanup`'s reconciler, also
327 // list_branches-based. A zombie only forms if create_branch is
328 // interrupted *between its two internal phases* (a far narrower
329 // window than the manifest-publish gap), and it surfaces here as the
330 // propagated create error requiring manual reclaim. We deliberately
331 // do NOT force-delete on a not-found-ref failure: it is
332 // indistinguishable from a transient error on a fresh create, and
333 // force-deleting there is the destructive overreach this guard
334 // removes. The caller holds the per-(table, branch) write queue, so
335 // no in-process writer races this fork; a cross-process create
336 // between our check and now is the documented one-winner-CAS gap and
337 // propagates as a retryable error.
338 let ref_exists = source_ds
339 .list_branches()
340 .await
341 .map(|b| b.contains_key(target_branch))
342 .unwrap_or(false);
343 if ref_exists {
344 return Ok(ForkOutcome::RefAlreadyExists);
345 }
346 return Err(OmniError::Lance(create_err.to_string()));
347 }
348
349 let ds = self
350 .open_dataset_head(dataset_uri, Some(target_branch))
351 .await?;
352 self.ensure_expected_version(&ds, table_key, source_version)?;
353 Ok(ForkOutcome::Created(ds))
354 }
355
356 pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
357 self.scan(ds, None, None, None).await
358 }
359
360 pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
361 let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
362 if !has_blob_columns {
363 return self.scan_batches(ds).await;
364 }
365
366 let batches = Self::scan_stream(ds, None, None, None, true)
367 .await?
368 .try_collect::<Vec<RecordBatch>>()
369 .await
370 .map_err(|e| OmniError::Lance(e.to_string()))?;
371 let mut materialized = Vec::with_capacity(batches.len());
372 for batch in batches {
373 materialized.push(Self::materialize_blob_batch(ds, batch).await?);
374 }
375 Ok(materialized)
376 }
377
378 /// Streaming, blob-aware sibling of [`Self::scan_batches_for_rewrite`].
379 /// Yields the dataset's rows lazily as a `SendableRecordBatchStream` so a
380 /// downstream writer (`stage_append_stream`) never materializes the whole
381 /// table in memory. Blob columns still need per-row rebuild, so those tables
382 /// fall back to the materialized path and are re-streamed from the `Vec`
383 /// (rare — only tables with a `Blob` property; bounded-memory blob streaming
384 /// is a follow-up). The non-blob path is a true lazy scan.
385 pub async fn scan_stream_for_rewrite(&self, ds: &Dataset) -> Result<SendableRecordBatchStream> {
386 let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
387 if has_blob_columns {
388 let arrow_schema: SchemaRef = Arc::new(ds.schema().into());
389 let batches = self.scan_batches_for_rewrite(ds).await?;
390 let reader = arrow_array::RecordBatchIterator::new(
391 batches.into_iter().map(Ok),
392 arrow_schema,
393 );
394 return Ok(lance_datafusion::utils::reader_to_stream(Box::new(reader)));
395 }
396 // Non-blob: a true lazy scan. `DatasetRecordBatchStream` converts to the
397 // `SendableRecordBatchStream` that `execute_uncommitted_stream` consumes.
398 Ok(Self::scan_stream(ds, None, None, None, false).await?.into())
399 }
400
401 pub(crate) async fn materialize_blob_batch(
402 ds: &Dataset,
403 batch: RecordBatch,
404 ) -> Result<RecordBatch> {
405 let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
406 if !has_blob_columns {
407 return Ok(batch);
408 }
409
410 let row_ids = batch
411 .column_by_name("_rowid")
412 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
413 .ok_or_else(|| {
414 OmniError::Lance("expected _rowid column when materializing blobs".to_string())
415 })?
416 .values()
417 .iter()
418 .copied()
419 .collect::<Vec<_>>();
420
421 let schema: SchemaRef = Arc::new(ds.schema().into());
422 let mut columns = Vec::with_capacity(schema.fields().len());
423 for field in schema.fields() {
424 let lance_field = lance::datatypes::Field::try_from(field.as_ref())
425 .map_err(|e| OmniError::Lance(e.to_string()))?;
426 let column = batch.column_by_name(field.name()).ok_or_else(|| {
427 OmniError::Lance(format!("batch missing column '{}'", field.name()))
428 })?;
429 if lance_field.is_blob() {
430 let descriptions =
431 column
432 .as_any()
433 .downcast_ref::<StructArray>()
434 .ok_or_else(|| {
435 OmniError::Lance(format!(
436 "expected blob descriptions for '{}'",
437 field.name()
438 ))
439 })?;
440 columns.push(
441 Self::rebuild_blob_column(ds, field.name(), descriptions, &row_ids).await?,
442 );
443 } else {
444 columns.push(column.clone());
445 }
446 }
447
448 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
449 }
450
451 async fn rebuild_blob_column(
452 ds: &Dataset,
453 column_name: &str,
454 descriptions: &StructArray,
455 row_ids: &[u64],
456 ) -> Result<ArrayRef> {
457 let mut builder = BlobArrayBuilder::new(row_ids.len());
458 let mut non_null_row_ids = Vec::new();
459 let mut row_has_blob = Vec::with_capacity(row_ids.len());
460
461 for row in 0..row_ids.len() {
462 let is_null = Self::blob_description_is_null(descriptions, row)?;
463 row_has_blob.push(!is_null);
464 if !is_null {
465 non_null_row_ids.push(row_ids[row]);
466 }
467 }
468
469 let blob_files = if non_null_row_ids.is_empty() {
470 Vec::new()
471 } else {
472 Arc::new(ds.clone())
473 .take_blobs(&non_null_row_ids, column_name)
474 .await
475 .map_err(|e| OmniError::Lance(e.to_string()))?
476 };
477
478 let mut files = blob_files.into_iter();
479 for has_blob in row_has_blob {
480 if !has_blob {
481 builder
482 .push_null()
483 .map_err(|e| OmniError::Lance(e.to_string()))?;
484 continue;
485 }
486
487 let blob = files.next().ok_or_else(|| {
488 OmniError::Lance(format!(
489 "blob rewrite for '{}' lost alignment with source rows",
490 column_name
491 ))
492 })?;
493 builder
494 .push_bytes(
495 blob.read()
496 .await
497 .map_err(|e| OmniError::Lance(e.to_string()))?,
498 )
499 .map_err(|e| OmniError::Lance(e.to_string()))?;
500 }
501
502 if files.next().is_some() {
503 return Err(OmniError::Lance(format!(
504 "blob rewrite for '{}' produced extra source blobs",
505 column_name
506 )));
507 }
508
509 builder
510 .finish()
511 .map_err(|e| OmniError::Lance(e.to_string()))
512 }
513
514 fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
515 if descriptions.is_null(row) {
516 return Ok(true);
517 }
518
519 let position = descriptions
520 .column_by_name("position")
521 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
522 .ok_or_else(|| {
523 OmniError::Lance(format!(
524 "unrecognized blob description schema {:?}: missing UInt64 position field",
525 descriptions.fields()
526 ))
527 })?;
528 let size = descriptions
529 .column_by_name("size")
530 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
531 .ok_or_else(|| {
532 OmniError::Lance(format!(
533 "unrecognized blob description schema {:?}: missing UInt64 size field",
534 descriptions.fields()
535 ))
536 })?;
537
538 let Some(kind_column) = descriptions.column_by_name("kind") else {
539 return Ok(position.is_null(row) || size.is_null(row));
540 };
541 let kind = if let Some(kind) = kind_column.as_any().downcast_ref::<UInt8Array>() {
542 if kind.is_null(row) {
543 return Ok(true);
544 }
545 kind.value(row)
546 } else if let Some(kind) = kind_column.as_any().downcast_ref::<UInt32Array>() {
547 if kind.is_null(row) {
548 return Ok(true);
549 }
550 kind.value(row) as u8
551 } else {
552 return Err(OmniError::Lance(format!(
553 "unrecognized blob description schema {:?}: kind field must be UInt8 or UInt32",
554 descriptions.fields()
555 )));
556 };
557
558 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
559 if kind != BlobKind::Inline {
560 return Ok(false);
561 }
562 let blob_uri = descriptions
563 .column_by_name("blob_uri")
564 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
565 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
566
567 Ok((position.is_null(row) || position.value(row) == 0)
568 && (size.is_null(row) || size.value(row) == 0)
569 && blob_uri.unwrap_or("").is_empty())
570 }
571
572 pub async fn scan_stream(
573 ds: &Dataset,
574 projection: Option<&[&str]>,
575 filter: Option<&str>,
576 order_by: Option<Vec<ColumnOrdering>>,
577 with_row_id: bool,
578 ) -> Result<DatasetRecordBatchStream> {
579 Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
580 }
581
582 pub async fn scan_stream_with<F>(
583 ds: &Dataset,
584 projection: Option<&[&str]>,
585 filter: Option<&str>,
586 order_by: Option<Vec<ColumnOrdering>>,
587 with_row_id: bool,
588 configure: F,
589 ) -> Result<DatasetRecordBatchStream>
590 where
591 F: FnOnce(&mut Scanner) -> Result<()>,
592 {
593 let mut scanner = ds.scan();
594 if with_row_id {
595 scanner.with_row_id();
596 }
597 if let Some(columns) = projection {
598 scanner
599 .project(columns)
600 .map_err(|e| OmniError::Lance(e.to_string()))?;
601 }
602 if let Some(filter_sql) = filter {
603 scanner
604 .filter(filter_sql)
605 .map_err(|e| OmniError::Lance(e.to_string()))?;
606 }
607 if let Some(ordering) = order_by {
608 scanner
609 .order_by(Some(ordering))
610 .map_err(|e| OmniError::Lance(e.to_string()))?;
611 }
612 configure(&mut scanner)?;
613 scanner
614 .try_into_stream()
615 .await
616 .map_err(|e| OmniError::Lance(e.to_string()))
617 }
618
619 pub async fn scan(
620 &self,
621 ds: &Dataset,
622 projection: Option<&[&str]>,
623 filter: Option<&str>,
624 order_by: Option<Vec<ColumnOrdering>>,
625 ) -> Result<Vec<RecordBatch>> {
626 Self::scan_stream(ds, projection, filter, order_by, false)
627 .await?
628 .try_collect()
629 .await
630 .map_err(|e| OmniError::Lance(e.to_string()))
631 }
632
633 pub async fn scan_with<F>(
634 &self,
635 ds: &Dataset,
636 projection: Option<&[&str]>,
637 filter: Option<&str>,
638 order_by: Option<Vec<ColumnOrdering>>,
639 with_row_id: bool,
640 configure: F,
641 ) -> Result<Vec<RecordBatch>>
642 where
643 F: FnOnce(&mut Scanner) -> Result<()>,
644 {
645 Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
646 .await?
647 .try_collect()
648 .await
649 .map_err(|e| OmniError::Lance(e.to_string()))
650 }
651
652 /// Indexed neighbor lookup for graph traversal. Given an edge dataset and a
653 /// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for
654 /// in-traversal), return the matching edge rows projected to
655 /// `[key_col, opposite_col]`.
656 ///
657 /// The `key_col IN (keys)` predicate is built as a structured DataFusion
658 /// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through
659 /// the persisted BTREE on `key_col` (index-search → take). Cost scales with
660 /// the frontier size, not |E| — the basis for serving selective traversals
661 /// without building the whole in-memory CSR. Empty `keys` returns empty
662 /// without scanning.
663 ///
664 /// Note: like any indexed scan, this observes only fragments the BTREE
665 /// covers plus an unindexed-fragment scan fallback; it reads the committed
666 /// snapshot `ds` was opened at.
667 pub async fn scan_edges_by_endpoint(
668 ds: &Dataset,
669 key_col: &str,
670 opposite_col: &str,
671 keys: &[String],
672 ) -> Result<Vec<RecordBatch>> {
673 use datafusion::prelude::{col, lit};
674
675 if keys.is_empty() {
676 return Ok(Vec::new());
677 }
678 let key_list: Vec<datafusion::prelude::Expr> =
679 keys.iter().map(|k| lit(k.clone())).collect();
680 let filter_expr = col(key_col).in_list(key_list, false);
681 Self::scan_stream_with(
682 ds,
683 Some(&[key_col, opposite_col]),
684 None,
685 None,
686 false,
687 |scanner| {
688 scanner.filter_expr(filter_expr);
689 Ok(())
690 },
691 )
692 .await?
693 .try_collect()
694 .await
695 .map_err(|e| OmniError::Lance(e.to_string()))
696 }
697
698 /// Metadata-only check (no IO) of whether `scan_edges_by_endpoint` — a
699 /// `key_col IN (...)` filter — on `ds` will be served by the persisted BTREE
700 /// on `column`, or silently fall back to a full filtered scan. Mirrors
701 /// Lance's own decision: scalar indices are disabled for the whole scan if
702 /// ANY fragment lacks `physical_rows` (lance `dataset/scanner.rs`
703 /// `create_filter_plan`), and are obviously unused if no BTREE on the
704 /// column exists. The scan is correct (returns all rows) either way — this
705 /// only surfaces the perf cliff so the indexed traversal can warn on it.
706 pub async fn key_column_index_coverage(ds: &Dataset, column: &str) -> Result<IndexCoverage> {
707 let Some(field_id) = ds.schema().field(column).map(|field| field.id) else {
708 return Ok(IndexCoverage::Degraded {
709 reason: format!("column '{}' not in schema", column),
710 });
711 };
712 let indices = ds
713 .load_indices()
714 .await
715 .map_err(|e| OmniError::Lance(e.to_string()))?;
716 let btree = indices
717 .iter()
718 .filter(|index| !is_system_index(index))
719 .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
720 .find(|index| {
721 index
722 .index_details
723 .as_ref()
724 .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
725 .unwrap_or(false)
726 });
727 let Some(btree) = btree else {
728 return Ok(IndexCoverage::Degraded {
729 reason: format!("no BTREE index on '{}'", column),
730 });
731 };
732 // Same check Lance runs: a fragment missing physical_rows disables
733 // scalar indices for the entire scan (all-or-nothing).
734 if ds.fragments().iter().any(|f| f.physical_rows.is_none()) {
735 return Ok(IndexCoverage::Degraded {
736 reason: "a fragment is missing physical_rows".to_string(),
737 });
738 }
739 // An index only covers the fragments it was built over; fragments
740 // appended afterward (edge-index creation is skipped once a BTREE exists)
741 // are scanned unindexed. If any CURRENT fragment is absent from the
742 // index's `fragment_bitmap`, the scan is partly a full scan — so the
743 // chooser must not price it as fully indexed. A `None` bitmap means Lance
744 // can't report coverage; don't over-degrade in that case.
745 if let Some(bitmap) = btree.fragment_bitmap.as_ref() {
746 let uncovered = ds
747 .fragments()
748 .iter()
749 .filter(|f| !bitmap.contains(f.id as u32))
750 .count();
751 if uncovered > 0 {
752 return Ok(IndexCoverage::Degraded {
753 reason: format!(
754 "{} fragment(s) not covered by the index on '{}'",
755 uncovered, column
756 ),
757 });
758 }
759 }
760 Ok(IndexCoverage::Indexed)
761 }
762
763 /// True if any non-system index on `ds` leaves at least one current
764 /// fragment uncovered, i.e. rows that the index does not yet account for
765 /// (appended after the index was built, or rewritten by compaction). Such
766 /// fragments are scanned unindexed until a reindex (`optimize_indices`)
767 /// folds them in. Returns false when every index covers every fragment, or
768 /// when the table has no (non-system) indices to optimize. A `None`
769 /// `fragment_bitmap` means Lance cannot report coverage for that index, so
770 /// we do not treat it as uncovered (mirrors `key_column_index_coverage`).
771 ///
772 /// Used by `optimize` to decide whether an otherwise-already-compacted
773 /// table still has index work to do.
774 pub async fn has_unindexed_fragments(ds: &Dataset) -> Result<bool> {
775 let indices = ds
776 .load_indices()
777 .await
778 .map_err(|e| OmniError::Lance(e.to_string()))?;
779 let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
780 for index in indices.iter() {
781 if is_system_index(index) {
782 continue;
783 }
784 if let Some(bitmap) = index.fragment_bitmap.as_ref() {
785 if frag_ids.iter().any(|id| !bitmap.contains(*id)) {
786 return Ok(true);
787 }
788 }
789 }
790 Ok(false)
791 }
792
793 pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
794 ds.count_rows(filter)
795 .await
796 .map(|count| count as usize)
797 .map_err(|e| OmniError::Lance(e.to_string()))
798 }
799
800 pub fn dataset_version(&self, ds: &Dataset) -> u64 {
801 ds.version().version
802 }
803
804 pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
805 Ok(TableState {
806 version: self.dataset_version(ds),
807 row_count: self.count_rows(ds, None).await? as u64,
808 version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
809 })
810 }
811
812 /// Legacy inline-commit append: writes fragments AND commits in one
813 /// call, advancing Lance HEAD as a side effect. Not on the
814 /// `TableStorage` trait surface — the staged primitive `stage_append`
815 /// + `commit_staged` is the engine write path. This inherent method
816 /// survives only for in-source recovery test setup, so it is
817 /// `#[cfg(test)]`-gated: engine code physically cannot call it (which
818 /// enforces "no new call sites" by construction and silences the
819 /// dead-code warning the non-test lib build would otherwise emit).
820 #[cfg(test)]
821 pub(crate) async fn append_batch(
822 &self,
823 dataset_uri: &str,
824 ds: &mut Dataset,
825 batch: RecordBatch,
826 ) -> Result<TableState> {
827 if batch.num_rows() == 0 {
828 return self.table_state(dataset_uri, ds).await;
829 }
830 let schema = batch.schema();
831 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
832 let params = WriteParams {
833 mode: WriteMode::Append,
834 allow_external_blob_outside_bases: true,
835 auto_cleanup: None,
836 skip_auto_cleanup: true,
837 ..Default::default()
838 };
839 ds.append(reader, Some(params))
840 .await
841 .map_err(|e| OmniError::Lance(e.to_string()))?;
842 self.table_state(dataset_uri, ds).await
843 }
844
845 pub async fn append_or_create_batch(
846 dataset_uri: &str,
847 dataset: Option<Dataset>,
848 batch: RecordBatch,
849 ) -> Result<Dataset> {
850 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
851 match dataset {
852 Some(mut ds) => {
853 let params = WriteParams {
854 mode: WriteMode::Append,
855 allow_external_blob_outside_bases: true,
856 auto_cleanup: None,
857 skip_auto_cleanup: true,
858 ..Default::default()
859 };
860 ds.append(reader, Some(params))
861 .await
862 .map_err(|e| OmniError::Lance(e.to_string()))?;
863 Ok(ds)
864 }
865 None => {
866 let params = WriteParams {
867 mode: WriteMode::Create,
868 enable_stable_row_ids: true,
869 data_storage_version: Some(LanceFileVersion::V2_2),
870 allow_external_blob_outside_bases: true,
871 auto_cleanup: None,
872 skip_auto_cleanup: true,
873 ..Default::default()
874 };
875 Dataset::write(reader, dataset_uri, Some(params))
876 .await
877 .map_err(|e| OmniError::Lance(e.to_string()))
878 }
879 }
880 }
881
882 pub(crate) async fn delete_where(
883 &self,
884 dataset_uri: &str,
885 ds: &mut Dataset,
886 filter: &str,
887 ) -> Result<DeleteState> {
888 let delete_result = ds
889 .delete(filter)
890 .await
891 .map_err(|e| OmniError::Lance(e.to_string()))?;
892 Ok(DeleteState {
893 version: delete_result.new_dataset.version().version,
894 row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
895 deleted_rows: delete_result.num_deleted_rows as usize,
896 version_metadata: self
897 .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
898 })
899 }
900
901 // ─── Staged-write API ────────────────────────────────────────────────────
902 //
903 // These primitives wrap Lance's distributed-write API: each call writes
904 // fragment files to object storage but does NOT advance the dataset's
905 // HEAD or commit a manifest entry. The returned `Transaction` is held by
906 // the caller (typically `MutationStaging` or the loader's accumulator)
907 // and committed at end-of-query via `commit_staged`. On failure the
908 // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
909 //
910 // The extracted `Vec<Fragment>` is for read-your-writes within the same
911 // query: subsequent ops construct a `Scanner` and call
912 // `scanner.with_fragments(staged.clone())` to see staged data alongside
913 // the committed snapshot. Lance's filter pushdown, vector search, and
914 // FTS all respect the supplied fragment list.
915
916 /// Stage an append: write fragment files for `batch`, return the
917 /// uncommitted Lance transaction plus the new fragments for
918 /// read-your-writes.
919 ///
920 /// `prior_stages` is the slice of staged writes already accumulated
921 /// against the **same dataset** in the same query. Pass `&[]` for the
922 /// first call; pass the accumulated stages for subsequent calls. The
923 /// primitive uses this to offset row-ID assignment so chained
924 /// `stage_append` calls don't produce overlapping `_rowid` ranges.
925 /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
926 /// slice gets passed to both.
927 ///
928 /// On stable-row-id datasets we manually populate `row_id_meta` on
929 /// the cloned `new_fragments` we expose for `scan_with_staged`.
930 /// Lance's `InsertBuilder::execute_uncommitted` produces fragments
931 /// with `row_id_meta = None`; row IDs are normally assigned by
932 /// `Transaction::assign_row_ids` during commit. Because
933 /// `scan_with_staged` reads the staged fragments *before* commit,
934 /// the scanner trips on a stable-row-id dataset
935 /// (`Error::internal("Missing row id meta")` from
936 /// `dataset/rowids.rs:22`). The transaction's internal fragment copy
937 /// stays untouched — Lance assigns IDs there independently at commit
938 /// time, and the two ID assignments don't have to agree because no
939 /// caller threads `_rowid` from the staged scan into the commit
940 /// path.
941 ///
942 /// **Contract: `prior_stages` must contain only previous
943 /// `stage_append` results against the same dataset.** Mixing
944 /// stage_merge_insert into `prior_stages` would over-count because
945 /// merge_insert's `new_fragments` include rewrites that don't add
946 /// rows. The engine's parse-time D₂′ check (per touched table: all
947 /// stage_append OR exactly one stage_merge_insert) guarantees this
948 /// upstream; on the primitive layer it's the caller's responsibility.
949 pub async fn stage_append(
950 &self,
951 ds: &Dataset,
952 batch: RecordBatch,
953 prior_stages: &[StagedWrite],
954 ) -> Result<StagedWrite> {
955 if batch.num_rows() == 0 {
956 return Err(OmniError::manifest_internal(
957 "stage_append called with empty batch".to_string(),
958 ));
959 }
960 let appended_rows = batch.num_rows() as u64;
961 let params = WriteParams {
962 mode: WriteMode::Append,
963 allow_external_blob_outside_bases: true,
964 auto_cleanup: None,
965 skip_auto_cleanup: true,
966 ..Default::default()
967 };
968 let transaction = InsertBuilder::new(Arc::new(ds.clone()))
969 .with_params(¶ms)
970 .execute_uncommitted(vec![batch])
971 .await
972 .map_err(|e| OmniError::Lance(e.to_string()))?;
973 // Record only after the staging write succeeds, so a failed write does
974 // not inflate the probe (matches `stage_append_stream`'s ordering).
975 crate::instrumentation::record_stage_append(appended_rows);
976 let mut new_fragments = match &transaction.operation {
977 Operation::Append { fragments } => fragments.clone(),
978 Operation::Overwrite { fragments, .. } => fragments.clone(),
979 other => {
980 return Err(OmniError::manifest_internal(format!(
981 "stage_append: unexpected Lance operation {:?}",
982 std::mem::discriminant(other)
983 )));
984 }
985 };
986 // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
987 // returns fragments with `id = 0` ("Temporary ID" — see lance-6.0.1
988 // `dataset/write.rs:1044/1712`); the real assignment happens during
989 // commit via `Transaction::fragments_with_ids`. Because we expose
990 // these fragments to `scan_with_staged` *before* commit, two staged
991 // fragments (or one staged + the seed) would collide on `id = 0`,
992 // causing Lance's scanner to mishandle the combined list (silent
993 // duplicates / dropped rows). Mirror the commit-time renumbering
994 // here, using `ds.manifest.max_fragment_id() + 1` as the base and
995 // accounting for prior stages.
996 // ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
997 // Lance's Fragment::id (and the commit-time renumbering counter in
998 // Transaction::fragments_with_ids) operate on u64.
999 let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
1000 + 1
1001 + prior_stages_fragment_count(prior_stages);
1002 assign_fragment_ids(&mut new_fragments, next_id_base);
1003 if ds.manifest.uses_stable_row_ids() {
1004 let prior_rows = prior_stages_row_count(prior_stages)?;
1005 let start_row_id = ds.manifest.next_row_id + prior_rows;
1006 assign_row_id_meta(&mut new_fragments, start_row_id)?;
1007 }
1008 Ok(StagedWrite {
1009 transaction,
1010 new_fragments,
1011 // Append never supersedes existing fragments.
1012 removed_fragment_ids: Vec::new(),
1013 })
1014 }
1015
1016 /// Streaming variant of [`Self::stage_append`]: appends the rows of `source`
1017 /// into `ds` without materializing them in memory. It scans `source` lazily
1018 /// (`scan_stream_for_rewrite`) and hands the stream to Lance's
1019 /// `execute_uncommitted_stream`, which rolls fragments at `max_rows_per_file`
1020 /// — bounded memory, one Append transaction. This is the substrate-blessed
1021 /// bulk-append path (the same one LanceDB's `Table::add` uses). Identical
1022 /// fragment-id / stable-row-id staging as `stage_append`.
1023 ///
1024 /// TRANSITIONAL caller — its only caller is the row-level merge append
1025 /// (`publish_adopted_delta`, see `AdoptDelta`), which the fragment-adopt work
1026 /// (Lance #7263/#7185) removes: a fragment graft re-appends no rows. This
1027 /// primitive and `scan_stream_for_rewrite` are then dead unless re-adopted as
1028 /// a general bulk-append path (the `Table::add` shape makes that plausible).
1029 pub async fn stage_append_stream(
1030 &self,
1031 ds: &Dataset,
1032 source: &Dataset,
1033 prior_stages: &[StagedWrite],
1034 ) -> Result<StagedWrite> {
1035 let stream = self.scan_stream_for_rewrite(source).await?;
1036 let params = WriteParams {
1037 mode: WriteMode::Append,
1038 allow_external_blob_outside_bases: true,
1039 auto_cleanup: None,
1040 skip_auto_cleanup: true,
1041 ..Default::default()
1042 };
1043 let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1044 .with_params(¶ms)
1045 .execute_uncommitted_stream(stream)
1046 .await
1047 .map_err(|e| OmniError::Lance(e.to_string()))?;
1048 let mut new_fragments = match &transaction.operation {
1049 Operation::Append { fragments } => fragments.clone(),
1050 Operation::Overwrite { fragments, .. } => fragments.clone(),
1051 other => {
1052 return Err(OmniError::manifest_internal(format!(
1053 "stage_append_stream: unexpected Lance operation {:?}",
1054 std::mem::discriminant(other)
1055 )));
1056 }
1057 };
1058 let appended_rows: u64 = new_fragments
1059 .iter()
1060 .filter_map(|f| f.physical_rows)
1061 .map(|r| r as u64)
1062 .sum();
1063 crate::instrumentation::record_stage_append(appended_rows);
1064 // Same commit-time fragment-id / row-id renumbering as `stage_append`.
1065 let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
1066 + 1
1067 + prior_stages_fragment_count(prior_stages);
1068 assign_fragment_ids(&mut new_fragments, next_id_base);
1069 if ds.manifest.uses_stable_row_ids() {
1070 let prior_rows = prior_stages_row_count(prior_stages)?;
1071 let start_row_id = ds.manifest.next_row_id + prior_rows;
1072 assign_row_id_meta(&mut new_fragments, start_row_id)?;
1073 }
1074 Ok(StagedWrite {
1075 transaction,
1076 new_fragments,
1077 removed_fragment_ids: Vec::new(),
1078 })
1079 }
1080
1081 /// Stage a merge_insert (upsert): write fragment files describing the
1082 /// merge result, return the uncommitted transaction plus the new
1083 /// fragments. The transaction's `Operation::Update` carries the
1084 /// fragments-to-remove and fragments-to-add; for read-your-writes we
1085 /// expose `new_fragments` (rows that will be visible after commit).
1086 ///
1087 /// **Contract: do not chain `stage_merge_insert` calls on the same
1088 /// table within one query.** Each call's `MergeInsertBuilder` runs
1089 /// against the supplied dataset's committed view — it does not see
1090 /// fragments produced by a previous staged merge on the same table.
1091 /// Two chained `stage_merge_insert`s whose source rows share keys will
1092 /// each independently produce `Operation::Update` transactions whose
1093 /// `new_fragments` contain a row for the shared key. `scan_with_staged`
1094 /// (and `count_rows_with_staged`) will then return both — i.e.
1095 /// **duplicates by key**.
1096 ///
1097 /// This is intrinsic to the underlying Lance API: there is no public
1098 /// way to make `MergeInsertBuilder` see uncommitted fragments. The
1099 /// engine's `MutationStaging` accumulator works around this by
1100 /// concatenating per-table batches in memory and issuing exactly
1101 /// one `stage_merge_insert` per touched table at end-of-query (with
1102 /// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
1103 /// callers of this primitive must respect the contract themselves.
1104 ///
1105 /// Lift path: either a Lance API extension that lets
1106 /// `MergeInsertBuilder` accept additional staged fragments, or an
1107 /// in-memory pre-merge here that folds prior staged batches into the
1108 /// input stream. See `docs/dev/writes.md`.
1109 pub async fn stage_merge_insert(
1110 &self,
1111 ds: Dataset,
1112 batch: RecordBatch,
1113 key_columns: Vec<String>,
1114 when_matched: WhenMatched,
1115 when_not_matched: WhenNotMatched,
1116 ) -> Result<StagedWrite> {
1117 if batch.num_rows() == 0 {
1118 return Err(OmniError::manifest_internal(
1119 "stage_merge_insert called with empty batch".to_string(),
1120 ));
1121 }
1122 let merged_rows = batch.num_rows() as u64;
1123
1124 // Precondition for the FirstSeen workaround below: every call path that
1125 // reaches stage_merge_insert (load, MutationStaging::finalize,
1126 // branch_merge::publish_rewritten_merge_table) must hand in a source
1127 // batch that is unique by `key_columns`. Without this check,
1128 // `SourceDedupeBehavior::FirstSeen` would silently collapse genuine
1129 // duplicates instead of erroring.
1130 check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
1131
1132 let ds = Arc::new(ds);
1133 let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
1134 .map_err(|e| OmniError::Lance(e.to_string()))?;
1135 builder.when_matched(when_matched);
1136 builder.when_not_matched(when_not_matched);
1137 // Workaround for a Lance bug class where sequential merge_insert calls
1138 // against rows previously rewritten by merge_insert produce a spurious
1139 // "Ambiguous merge inserts: multiple source rows match the same target
1140 // row on (id = ...)" error. Lance's `processed_row_ids:
1141 // Mutex<HashSet<u64>>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`)
1142 // double-processes the same source/target match against datasets
1143 // previously rewritten by merge_insert, and the default
1144 // `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen
1145 // makes Lance skip the duplicate match instead. Correctness-preserving
1146 // because every call path pre-dedupes the source batch by id or surfaces
1147 // a real source dup via `check_batch_unique_by_keys` above (load:
1148 // `enforce_unique_constraints_intra_batch`; mutate:
1149 // `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor`
1150 // walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug
1151 // class. Tracked at MR-957; upstream: lance-format/lance#6877.
1152 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1153 let job = builder
1154 .try_build()
1155 .map_err(|e| OmniError::Lance(e.to_string()))?;
1156 let schema = batch.schema();
1157 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
1158 let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
1159 let uncommitted = job
1160 .execute_uncommitted(stream)
1161 .await
1162 .map_err(|e| OmniError::Lance(e.to_string()))?;
1163 // Record only after the staging write succeeds, so a failed write does
1164 // not inflate the probe (matches `stage_append`/`stage_append_stream`).
1165 crate::instrumentation::record_stage_merge_insert(merged_rows);
1166 // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
1167 // `new_fragments` are the freshly inserted rows; `updated_fragments`
1168 // are rewrites of existing fragments that include both retained and
1169 // updated rows; `removed_fragment_ids` lists the committed-manifest
1170 // fragments that those rewrites supersede. For read-your-writes we
1171 // expose `updated_fragments + new_fragments` and the
1172 // `removed_fragment_ids` so `scan_with_staged` can filter the
1173 // superseded committed fragments before combining — otherwise a
1174 // single merge_insert appears as duplicate rows (original committed
1175 // version + rewritten staged version).
1176 let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
1177 Operation::Update {
1178 new_fragments,
1179 updated_fragments,
1180 removed_fragment_ids,
1181 ..
1182 } => {
1183 let mut all = updated_fragments.clone();
1184 all.extend(new_fragments.iter().cloned());
1185 (all, removed_fragment_ids.clone())
1186 }
1187 Operation::Append { fragments } => (fragments.clone(), Vec::new()),
1188 other => {
1189 return Err(OmniError::manifest_internal(format!(
1190 "stage_merge_insert: unexpected Lance operation {:?}",
1191 std::mem::discriminant(other)
1192 )));
1193 }
1194 };
1195 Ok(StagedWrite {
1196 transaction: uncommitted.transaction,
1197 new_fragments,
1198 removed_fragment_ids,
1199 })
1200 }
1201
1202 /// Commit a previously-staged transaction onto `ds`, returning the new
1203 /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
1204 /// the publisher at end-of-query to materialize all staged writes before
1205 /// the meta-manifest commit.
1206 pub async fn commit_staged(
1207 &self,
1208 ds: Arc<Dataset>,
1209 transaction: Transaction,
1210 ) -> Result<Dataset> {
1211 // Skip Lance's auto-cleanup hook on every commit. OmniGraph owns version
1212 // GC explicitly (optimize.rs::cleanup_all_tables); Lance's hook fires off
1213 // the *dataset's stored* `lance.auto_cleanup.*` config, which graphs
1214 // created before the v7 bump (6.0.1 defaulted auto_cleanup ON) still
1215 // carry — so `WriteParams::auto_cleanup = None` alone does NOT stop it on
1216 // upgraded graphs. Skipping here covers the staged write path (the main
1217 // data path) for new and legacy datasets alike, preventing Lance from
1218 // GC'ing versions the __manifest still pins for snapshots/time-travel.
1219 CommitBuilder::new(ds)
1220 .with_skip_auto_cleanup(true)
1221 .execute(transaction)
1222 .await
1223 .map_err(|e| OmniError::Lance(e.to_string()))
1224 }
1225
1226 /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
1227 /// Returns a StagedWrite carrying the replacement fragments. HEAD does
1228 /// NOT advance.
1229 ///
1230 /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
1231 /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
1232 /// `Operation::Overwrite` carries the new schema + fragments. The
1233 /// transaction is committed via `commit_staged` (same call as
1234 /// `stage_append`).
1235 ///
1236 /// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
1237 /// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
1238 pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
1239 // `enable_stable_row_ids: true` is defensive — empirically Lance 6.0.1
1240 // preserves the source dataset's flag through `Operation::Overwrite`
1241 // when WriteParams omits it (pinned by
1242 // `stage_overwrite_preserves_stable_row_ids` in tests/staged_writes.rs),
1243 // but setting it explicitly keeps the invariant documented at every Overwrite site
1244 // (see docs/storage.md "Stable row IDs"). Setting it on an existing
1245 // dataset that was created without stable row IDs is a no-op per
1246 // Lance's row-id-lineage spec, so this stays correct for legacy
1247 // datasets.
1248 let (transaction, mut new_fragments) = if batch.num_rows() == 0 {
1249 let schema = LanceSchema::try_from(batch.schema().as_ref())
1250 .map_err(|e| OmniError::Lance(e.to_string()))?;
1251 let transaction = TransactionBuilder::new(
1252 ds.manifest.version,
1253 Operation::Overwrite {
1254 fragments: Vec::new(),
1255 schema,
1256 config_upsert_values: None,
1257 initial_bases: None,
1258 },
1259 )
1260 .build();
1261 (transaction, Vec::new())
1262 } else {
1263 let params = WriteParams {
1264 mode: WriteMode::Overwrite,
1265 enable_stable_row_ids: true,
1266 allow_external_blob_outside_bases: true,
1267 auto_cleanup: None,
1268 skip_auto_cleanup: true,
1269 ..Default::default()
1270 };
1271 let transaction = InsertBuilder::new(Arc::new(ds.clone()))
1272 .with_params(¶ms)
1273 .execute_uncommitted(vec![batch])
1274 .await
1275 .map_err(|e| OmniError::Lance(e.to_string()))?;
1276 let new_fragments = match &transaction.operation {
1277 Operation::Overwrite { fragments, .. } => fragments.clone(),
1278 other => {
1279 return Err(OmniError::manifest_internal(format!(
1280 "stage_overwrite: unexpected Lance operation {:?}",
1281 std::mem::discriminant(other)
1282 )));
1283 }
1284 };
1285 (transaction, new_fragments)
1286 };
1287 // Overwrite REPLACES every committed fragment, and Lance restarts
1288 // fragment-ID and row-ID counters at the post-commit version.
1289 // For our pre-commit staged view we need to:
1290 // 1) Renumber temporary fragment IDs (Lance returns them as
1291 // `id = 0` from `execute_uncommitted` — see stage_append
1292 // for the same fix). For Overwrite there are no committed
1293 // fragments to collide with (they're all in
1294 // removed_fragment_ids below), so start at 1.
1295 // 2) For stable-row-id datasets, assign row_id_meta starting
1296 // at 0 (Overwrite is a fresh-start) so `scan_with_staged`
1297 // doesn't hit the "Missing row id meta" panic in
1298 // lance-6.0.1 dataset/rowids.rs:22.
1299 assign_fragment_ids(&mut new_fragments, 1);
1300 if ds.manifest.uses_stable_row_ids() {
1301 assign_row_id_meta(&mut new_fragments, 0)?;
1302 }
1303 // Overwrite REPLACES every committed fragment. For
1304 // read-your-writes via scan_with_staged, list every committed
1305 // fragment in removed_fragment_ids so the post-stage view shows
1306 // ONLY the staged fragments.
1307 let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
1308 Ok(StagedWrite {
1309 transaction,
1310 new_fragments,
1311 removed_fragment_ids,
1312 })
1313 }
1314
1315 /// Stage a BTREE scalar index build. Returns a StagedWrite whose
1316 /// transaction commits via `commit_staged`. HEAD does NOT advance.
1317 ///
1318 /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
1319 /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
1320 /// { new_indices, removed_indices }` via the public `TransactionBuilder`,
1321 /// replicating the simple (non-segment-commit-path) branch of Lance's
1322 /// `CreateIndexBuilder::execute` (lance-6.0.1 `src/index/create.rs:502-512`).
1323 ///
1324 /// `removed_indices` mirrors `execute()` lines 466-476: when the
1325 /// build replaces an existing same-named index, those entries are
1326 /// listed for tombstoning by the manifest commit.
1327 ///
1328 /// MR-793 Phase 2: scalar index types (BTree, Inverted) are
1329 /// stage-able. Vector indices are NOT (segment-commit-path requires
1330 /// `build_index_metadata_from_segments` which is `pub(crate)` in
1331 /// lance-6.0.1); see `create_vector_index` and Appendix A.3.
1332 pub async fn stage_create_btree_index(
1333 &self,
1334 ds: &Dataset,
1335 columns: &[&str],
1336 ) -> Result<StagedWrite> {
1337 let params = ScalarIndexParams::default();
1338 let mut ds_clone = ds.clone();
1339 let new_idx = ds_clone
1340 .create_index_builder(columns, IndexType::BTree, ¶ms)
1341 .replace(true)
1342 .execute_uncommitted()
1343 .await
1344 .map_err(|e| OmniError::Lance(format!("stage_create_btree_index: {}", e)))?;
1345 let removed_indices: Vec<IndexMetadata> = ds
1346 .load_indices()
1347 .await
1348 .map_err(|e| OmniError::Lance(e.to_string()))?
1349 .iter()
1350 .filter(|idx| idx.name == new_idx.name)
1351 .cloned()
1352 .collect();
1353 let transaction = TransactionBuilder::new(
1354 new_idx.dataset_version,
1355 Operation::CreateIndex {
1356 new_indices: vec![new_idx],
1357 removed_indices,
1358 },
1359 )
1360 .build();
1361 Ok(StagedWrite {
1362 transaction,
1363 new_fragments: Vec::new(),
1364 removed_fragment_ids: Vec::new(),
1365 })
1366 }
1367
1368 /// Stage an INVERTED (FTS) scalar index build. Same shape as
1369 /// `stage_create_btree_index`; see its docs for the Lance API
1370 /// citation and contract notes.
1371 pub async fn stage_create_inverted_index(
1372 &self,
1373 ds: &Dataset,
1374 column: &str,
1375 ) -> Result<StagedWrite> {
1376 let params = InvertedIndexParams::default();
1377 let mut ds_clone = ds.clone();
1378 let new_idx = ds_clone
1379 .create_index_builder(&[column], IndexType::Inverted, ¶ms)
1380 .replace(true)
1381 .execute_uncommitted()
1382 .await
1383 .map_err(|e| OmniError::Lance(format!("stage_create_inverted_index: {}", e)))?;
1384 let removed_indices: Vec<IndexMetadata> = ds
1385 .load_indices()
1386 .await
1387 .map_err(|e| OmniError::Lance(e.to_string()))?
1388 .iter()
1389 .filter(|idx| idx.name == new_idx.name)
1390 .cloned()
1391 .collect();
1392 let transaction = TransactionBuilder::new(
1393 new_idx.dataset_version,
1394 Operation::CreateIndex {
1395 new_indices: vec![new_idx],
1396 removed_indices,
1397 },
1398 )
1399 .build();
1400 Ok(StagedWrite {
1401 transaction,
1402 new_fragments: Vec::new(),
1403 removed_fragment_ids: Vec::new(),
1404 })
1405 }
1406
1407 /// Run a scan with optional uncommitted staged writes visible
1408 /// alongside the committed snapshot. When `staged` is empty this is
1409 /// identical to `scan(...)`.
1410 ///
1411 /// Composes the visible fragment list as `committed - removed + new`:
1412 /// the committed manifest's fragments, minus any fragment IDs that
1413 /// staged `Operation::Update`s (merge_insert rewrites) have superseded,
1414 /// plus the staged new/updated fragments. Without the `removed`
1415 /// filter, a merge_insert that rewrites an existing fragment would
1416 /// surface twice — once via the original committed fragment, once via
1417 /// the rewrite in `new_fragments`.
1418 ///
1419 /// **Filter contract is incomplete on staged fragments.** When `filter`
1420 /// is `Some(...)`, Lance pushes the predicate to per-fragment scans
1421 /// with stats-based pruning. Uncommitted fragments produced by
1422 /// `write_fragments_internal` lack the per-column statistics that
1423 /// committed fragments carry; Lance's optimizer drops them from the
1424 /// filtered scan even when their data would match. Staged-fragment
1425 /// rows are silently absent from the result. `scanner.use_stats(false)`
1426 /// does not fix this in lance 6.0.1. Callers needing correct filtered
1427 /// reads against staged data should use a different strategy — the
1428 /// engine's `MutationStaging` accumulator unions in-memory pending
1429 /// batches with the committed scan via DataFusion `MemTable` (see
1430 /// `scan_with_pending`).
1431 ///
1432 /// This method remains on the surface for primitive-level testing
1433 /// (basic stage + scan correctness without filters works) and for
1434 /// callers that don't need filter pushdown.
1435 pub async fn scan_with_staged(
1436 &self,
1437 ds: &Dataset,
1438 staged: &[StagedWrite],
1439 projection: Option<&[&str]>,
1440 filter: Option<&str>,
1441 ) -> Result<Vec<RecordBatch>> {
1442 if staged.is_empty() {
1443 return self.scan(ds, projection, filter, None).await;
1444 }
1445 let mut scanner = ds.scan();
1446 if let Some(cols) = projection {
1447 let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
1448 scanner
1449 .project(&owned)
1450 .map_err(|e| OmniError::Lance(e.to_string()))?;
1451 }
1452 if let Some(f) = filter {
1453 scanner
1454 .filter(f)
1455 .map_err(|e| OmniError::Lance(e.to_string()))?;
1456 }
1457 scanner.with_fragments(combine_committed_with_staged(ds, staged));
1458 let stream = scanner
1459 .try_into_stream()
1460 .await
1461 .map_err(|e| OmniError::Lance(e.to_string()))?;
1462 stream
1463 .try_collect()
1464 .await
1465 .map_err(|e| OmniError::Lance(e.to_string()))
1466 }
1467
1468 /// Scan committed via Lance + apply the same filter to in-memory
1469 /// pending batches via DataFusion `MemTable`, concat the two result
1470 /// streams. The replacement for `scan_with_staged` in engine code:
1471 /// the staged-write writer accumulates input batches in memory and
1472 /// unions them with the committed snapshot at read time,
1473 /// sidestepping the `Scanner::with_fragments` filter-pushdown
1474 /// limitation documented on `scan_with_staged`.
1475 ///
1476 /// `committed_ds` should be opened at the pre-mutation
1477 /// `expected_version` (the same version captured in `MutationStaging::expected_versions`
1478 /// at first touch of the table). `pending_batches` are the per-table
1479 /// accumulator's batches in their input shape. `pending_schema` is
1480 /// the schema of the accumulated batches; passing `None` falls back
1481 /// to the schema of the first pending batch.
1482 ///
1483 /// `filter` is the Lance / DataFusion SQL predicate. It is applied
1484 /// to both sides — Lance pushes it down on the committed side; the
1485 /// pending side runs it through a fresh DataFusion `SessionContext`
1486 /// with the batches registered as a `MemTable` named `pending`.
1487 ///
1488 /// `key_column` controls how committed and pending are unioned:
1489 /// - **`None` (union semantics)**: every committed row that matches
1490 /// the filter and every pending row that matches the filter is
1491 /// returned. Correct when committed and pending cannot share a
1492 /// primary key — e.g., Append-mode loads with ULID-generated ids,
1493 /// or any read where pending hasn't been used to update committed
1494 /// rows.
1495 /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
1496 /// `col` value appears in any pending batch are EXCLUDED from the
1497 /// result; only pending's view of those rows is returned. Required
1498 /// for Merge-mode reads (e.g., `execute_update` on the engine path)
1499 /// so a chained `update` doesn't see stale committed values that
1500 /// a prior op already updated in pending. Without this, a predicate
1501 /// like `where age > 30` can match a row that an earlier
1502 /// `set age = 20` already moved out of range.
1503 ///
1504 /// When `pending_batches` is empty this delegates to the regular
1505 /// scan path.
1506 pub async fn scan_with_pending(
1507 &self,
1508 committed_ds: &Dataset,
1509 pending_batches: &[RecordBatch],
1510 pending_schema: Option<SchemaRef>,
1511 projection: Option<&[&str]>,
1512 filter: Option<&str>,
1513 key_column: Option<&str>,
1514 ) -> Result<Vec<RecordBatch>> {
1515 // Contract: when merge-shadow semantics are requested via
1516 // `key_column`, the committed-side projection MUST include that
1517 // column so we can filter committed rows whose key appears in
1518 // pending. Silently dropping the shadow when projection omits
1519 // the key would re-introduce union semantics behind the
1520 // caller's back. Reject up front with a clear error so callers
1521 // either (a) include the key in projection or (b) drop
1522 // `key_column` if union is what they wanted.
1523 if let (Some(key_col), Some(cols)) = (key_column, projection) {
1524 if !cols.iter().any(|c| *c == key_col) {
1525 return Err(OmniError::Lance(format!(
1526 "scan_with_pending: key_column '{}' must appear in projection \
1527 when merge-shadow semantics are requested (got projection = {:?})",
1528 key_col, cols
1529 )));
1530 }
1531 }
1532
1533 let committed = self.scan(committed_ds, projection, filter, None).await?;
1534 if pending_batches.is_empty() {
1535 return Ok(committed);
1536 }
1537
1538 // Shadow committed rows whose key value also appears in pending.
1539 // This makes scan_with_pending implement merge semantics rather
1540 // than naive union: any row that has a pending update is
1541 // represented ONLY by its pending value, never by both its
1542 // (stale) committed value and its (current) pending value.
1543 let committed = match key_column {
1544 Some(key_col) => {
1545 let pending_keys = collect_string_column_values(pending_batches, key_col)?;
1546 if pending_keys.is_empty() {
1547 committed
1548 } else {
1549 filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
1550 }
1551 }
1552 None => committed,
1553 };
1554
1555 let pending =
1556 scan_pending_batches(pending_batches, pending_schema, projection, filter).await?;
1557
1558 let mut out = committed;
1559 out.extend(pending);
1560 Ok(out)
1561 }
1562
1563 /// `count_rows` variant that respects staged writes. Used for
1564 /// edge-cardinality validation that needs to see staged edges before
1565 /// commit. Same `committed - removed + new` composition as
1566 /// `scan_with_staged`.
1567 pub async fn count_rows_with_staged(
1568 &self,
1569 ds: &Dataset,
1570 staged: &[StagedWrite],
1571 filter: Option<String>,
1572 ) -> Result<usize> {
1573 if staged.is_empty() {
1574 return self.count_rows(ds, filter).await;
1575 }
1576 let mut scanner = ds.scan();
1577 if let Some(f) = filter {
1578 scanner
1579 .filter(&f)
1580 .map_err(|e| OmniError::Lance(e.to_string()))?;
1581 }
1582 scanner.with_fragments(combine_committed_with_staged(ds, staged));
1583 let count = scanner
1584 .count_rows()
1585 .await
1586 .map_err(|e| OmniError::Lance(e.to_string()))?;
1587 Ok(count as usize)
1588 }
1589
1590 async fn user_indices_for_column(
1591 &self,
1592 ds: &Dataset,
1593 column: &str,
1594 ) -> Result<Vec<IndexMetadata>> {
1595 let field_id = ds
1596 .schema()
1597 .field(column)
1598 .map(|field| field.id)
1599 .ok_or_else(|| {
1600 OmniError::manifest_internal(format!(
1601 "dataset is missing expected index column '{}'",
1602 column
1603 ))
1604 })?;
1605 let indices = ds
1606 .load_indices()
1607 .await
1608 .map_err(|e| OmniError::Lance(e.to_string()))?;
1609 Ok(indices
1610 .iter()
1611 .filter(|index| !is_system_index(index))
1612 .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
1613 .cloned()
1614 .collect())
1615 }
1616
1617 pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1618 let indices = self.user_indices_for_column(ds, column).await?;
1619 Ok(indices.iter().any(|index| {
1620 index
1621 .index_details
1622 .as_ref()
1623 .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
1624 .unwrap_or(false)
1625 }))
1626 }
1627
1628 pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1629 let indices = self.user_indices_for_column(ds, column).await?;
1630 Ok(indices.iter().any(|index| {
1631 index
1632 .index_details
1633 .as_ref()
1634 .map(|details| IndexDetails(details.clone()).supports_fts())
1635 .unwrap_or(false)
1636 }))
1637 }
1638
1639 pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
1640 let indices = self.user_indices_for_column(ds, column).await?;
1641 Ok(indices.iter().any(|index| {
1642 index
1643 .index_details
1644 .as_ref()
1645 .map(|details| IndexDetails(details.clone()).is_vector())
1646 .unwrap_or(false)
1647 }))
1648 }
1649
1650 pub(crate) async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
1651 let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
1652 ds.create_index_builder(&[column], IndexType::Vector, ¶ms)
1653 .replace(true)
1654 .await
1655 .map_err(|e| OmniError::Lance(e.to_string()))?;
1656 // Record only after the index build succeeds, so a failed build does not
1657 // inflate the probe (matches the `stage_*` probes).
1658 crate::instrumentation::record_create_vector_index();
1659 Ok(())
1660 }
1661
1662 pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
1663 let batch = RecordBatch::new_empty(schema.clone());
1664 Self::write_dataset(dataset_uri, batch).await
1665 }
1666
1667 pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
1668 let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
1669 .await?
1670 .try_collect::<Vec<RecordBatch>>()
1671 .await
1672 .map_err(|e| OmniError::Lance(e.to_string()))?;
1673 Ok(batches.iter().find_map(|batch| {
1674 batch
1675 .column_by_name("_rowid")
1676 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1677 .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
1678 }))
1679 }
1680
1681 pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
1682 let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
1683 let params = WriteParams {
1684 mode: WriteMode::Create,
1685 enable_stable_row_ids: true,
1686 data_storage_version: Some(LanceFileVersion::V2_2),
1687 allow_external_blob_outside_bases: true,
1688 auto_cleanup: None,
1689 skip_auto_cleanup: true,
1690 ..Default::default()
1691 };
1692 Dataset::write(reader, dataset_uri, Some(params))
1693 .await
1694 .map_err(|e| OmniError::Lance(e.to_string()))
1695 }
1696}
1697
1698/// Build the `Scanner::with_fragments` argument for read-your-writes:
1699/// committed manifest fragments minus any fragment IDs superseded by the
1700/// staged writes, plus the staged `new_fragments`. Order is:
1701/// 1. committed fragments whose IDs are NOT in any staged
1702/// `removed_fragment_ids` (preserves committed order),
1703/// 2. all staged `new_fragments` in stage order.
1704///
1705/// Lance's `Scanner` does not require any particular ordering between
1706/// committed and staged fragments — `with_fragments` scopes the scan to
1707/// exactly the supplied list. The dedup matters because merge_insert
1708/// rewrites a fragment in place at the Lance layer: the rewritten
1709/// fragment is in `new_fragments`, the original (which it supersedes) is
1710/// in `committed` until manifest commit, and including both would yield
1711/// duplicate rows.
1712///
1713/// **Inter-stage supersession is not handled here.** Each StagedWrite's
1714/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
1715/// later staged merge cannot know about an earlier staged merge's
1716/// fragments (Lance's `MergeInsertBuilder` runs against the committed
1717/// view). If two `stage_merge_insert`s on the same table produce rows
1718/// with the same key, the combined view returns duplicates by key. The
1719/// engine's mutation path enforces "per touched table: all stage_append
1720/// OR exactly one stage_merge_insert" at parse time (D₂′ in
1721/// `exec/mutation.rs`) so this primitive's caller never chains merges.
1722/// See `stage_merge_insert` for the full contract.
1723/// Sum `physical_rows` across all fragments in the supplied stages.
1724/// Used by `stage_append` to compute the row-ID offset for chained
1725/// `stage_append` calls against the same dataset.
1726///
1727/// Assumes `prior_stages` contains only `stage_append` results — see
1728/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the
1729/// `new_fragments` include rewrites that don't add new rows, so this
1730/// would over-count.
1731fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
1732 prior_stages
1733 .iter()
1734 .map(|s| s.new_fragments.len() as u64)
1735 .sum()
1736}
1737
1738/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
1739/// commit-time `Transaction::fragments_with_ids` (lance-6.0.1
1740/// `dataset/transaction.rs:1456`) — fragments produced by
1741/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
1742/// placeholder; we renumber here so they don't collide with committed
1743/// fragments (or with each other across chained stages) when the slice is
1744/// passed to `Scanner::with_fragments`.
1745fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
1746 for (i, fragment) in fragments.iter_mut().enumerate() {
1747 if fragment.id == 0 {
1748 fragment.id = start_id + i as u64;
1749 }
1750 }
1751}
1752
1753fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
1754 let mut total: u64 = 0;
1755 for stage in prior_stages {
1756 for fragment in &stage.new_fragments {
1757 let physical_rows = fragment.physical_rows.ok_or_else(|| {
1758 OmniError::manifest_internal(
1759 "prior_stages_row_count: fragment is missing physical_rows".to_string(),
1760 )
1761 })? as u64;
1762 total += physical_rows;
1763 }
1764 }
1765 Ok(total)
1766}
1767
1768/// Assign sequential row IDs to fragments that lack them, starting from
1769/// `start_row_id`. Mirrors the relevant arm of Lance's
1770/// `Transaction::assign_row_ids` (lance-6.0.1 `dataset/transaction.rs:2682`)
1771/// for the `row_id_meta = None` case — fragments produced by
1772/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
1773///
1774/// Used only by `stage_append` for read-your-writes — see its docstring
1775/// for why pre-commit assignment is needed and why diverging from Lance's
1776/// commit-time IDs is safe.
1777fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
1778 let mut next_row_id = start_row_id;
1779 for fragment in fragments {
1780 if fragment.row_id_meta.is_some() {
1781 continue;
1782 }
1783 let physical_rows = fragment.physical_rows.ok_or_else(|| {
1784 OmniError::manifest_internal(
1785 "stage_append: fragment is missing physical_rows".to_string(),
1786 )
1787 })? as u64;
1788 let row_ids = next_row_id..(next_row_id + physical_rows);
1789 let sequence = RowIdSequence::from(row_ids);
1790 let serialized = write_row_ids(&sequence);
1791 fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
1792 next_row_id += physical_rows;
1793 }
1794 Ok(())
1795}
1796
1797/// Collect the set of values in a Utf8 column across multiple batches.
1798/// Used by `scan_with_pending`'s merge-semantic path to identify
1799/// committed rows that are shadowed by pending writes. NULL values are
1800/// skipped.
1801fn collect_string_column_values(
1802 batches: &[RecordBatch],
1803 column: &str,
1804) -> Result<std::collections::HashSet<String>> {
1805 use arrow_array::{Array, StringArray};
1806 let mut out = std::collections::HashSet::new();
1807 for batch in batches {
1808 let Some(col) = batch.column_by_name(column) else {
1809 return Err(OmniError::Lance(format!(
1810 "scan_with_pending: pending batch missing key column '{}'",
1811 column
1812 )));
1813 };
1814 let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1815 OmniError::Lance(format!(
1816 "scan_with_pending: key column '{}' is not Utf8",
1817 column
1818 ))
1819 })?;
1820 for i in 0..arr.len() {
1821 if arr.is_valid(i) {
1822 out.insert(arr.value(i).to_string());
1823 }
1824 }
1825 }
1826 Ok(out)
1827}
1828
1829/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
1830/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
1831/// rows that pending has already updated. Returns the surviving rows.
1832///
1833/// `scan_with_pending` validates up front that the projection contains
1834/// `column`, so a missing column here is a programmer error — error
1835/// loudly instead of silently passing batches through (which would
1836/// re-introduce the union semantics the caller asked us to avoid).
1837fn filter_out_rows_where_string_in(
1838 batches: Vec<RecordBatch>,
1839 column: &str,
1840 excluded: &std::collections::HashSet<String>,
1841) -> Result<Vec<RecordBatch>> {
1842 use arrow_array::{Array, BooleanArray, StringArray};
1843 let mut out = Vec::with_capacity(batches.len());
1844 for batch in batches {
1845 if batch.num_rows() == 0 {
1846 out.push(batch);
1847 continue;
1848 }
1849 let col = batch.column_by_name(column).ok_or_else(|| {
1850 OmniError::manifest_internal(format!(
1851 "scan_with_pending: committed batch missing key column '{}' \
1852 (the up-front projection check should have rejected this)",
1853 column
1854 ))
1855 })?;
1856 let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
1857 OmniError::Lance(format!(
1858 "scan_with_pending: committed column '{}' is not Utf8",
1859 column
1860 ))
1861 })?;
1862 let mask: BooleanArray = (0..arr.len())
1863 .map(|i| {
1864 if arr.is_valid(i) {
1865 Some(!excluded.contains(arr.value(i)))
1866 } else {
1867 Some(true)
1868 }
1869 })
1870 .collect();
1871 let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
1872 .map_err(|e| OmniError::Lance(e.to_string()))?;
1873 out.push(filtered);
1874 }
1875 Ok(out)
1876}
1877
1878/// Apply `projection` and `filter` to in-memory pending batches via a
1879/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
1880/// the read-your-writes side of the in-memory staging accumulator.
1881///
1882/// `pending_batches` must be non-empty (the caller short-circuits on
1883/// empty).
1884///
1885/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
1886/// on the committed side. Lance and DataFusion both accept standard
1887/// SQL comparison predicates (`col op literal`) and OmniGraph's
1888/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
1889/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
1890/// scanner extension (vector search, FTS, `_rowid` references) into
1891/// the filter, this function will need explicit translation — DataFusion
1892/// won't recognize those operators against the in-memory `MemTable`.
1893async fn scan_pending_batches(
1894 pending_batches: &[RecordBatch],
1895 pending_schema: Option<SchemaRef>,
1896 projection: Option<&[&str]>,
1897 filter: Option<&str>,
1898) -> Result<Vec<RecordBatch>> {
1899 let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
1900 // #283: disable SQL identifier normalization so an unquoted camelCase
1901 // column in `filter` (e.g. `repoName = 'acme'`, emitted unquoted by
1902 // `predicate_to_sql` because the committed Lance scan needs it unquoted)
1903 // is matched case-preserving against the case-sensitive MemTable schema.
1904 // Without this, DataFusion lowercases `repoName` → `reponame` and fails to
1905 // resolve. Quoted identifiers (the projection list below) are unaffected.
1906 let mut config = datafusion::execution::context::SessionConfig::new();
1907 config.options_mut().sql_parser.enable_ident_normalization = false;
1908 let ctx = datafusion::execution::context::SessionContext::new_with_config(config);
1909 let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
1910 .map_err(|e| OmniError::Lance(e.to_string()))?;
1911 ctx.register_table("pending", Arc::new(mem))
1912 .map_err(|e| OmniError::Lance(e.to_string()))?;
1913
1914 let proj = projection
1915 .map(|cols| {
1916 cols.iter()
1917 .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
1918 .collect::<Vec<_>>()
1919 .join(", ")
1920 })
1921 .unwrap_or_else(|| "*".to_string());
1922 let where_clause = filter.map(|f| format!("WHERE {f}")).unwrap_or_default();
1923 let sql = format!("SELECT {proj} FROM pending {where_clause}");
1924 let df = ctx
1925 .sql(&sql)
1926 .await
1927 .map_err(|e| OmniError::Lance(e.to_string()))?;
1928 df.collect()
1929 .await
1930 .map_err(|e| OmniError::Lance(e.to_string()))
1931}
1932
1933fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
1934 let removed: std::collections::HashSet<u64> = staged
1935 .iter()
1936 .flat_map(|w| w.removed_fragment_ids.iter().copied())
1937 .collect();
1938 let mut combined: Vec<Fragment> = ds
1939 .manifest
1940 .fragments
1941 .iter()
1942 .filter(|f| !removed.contains(&f.id))
1943 .cloned()
1944 .collect();
1945 for write in staged {
1946 combined.extend(write.new_fragments.iter().cloned());
1947 }
1948 combined
1949}
1950
1951/// Precondition guard for `stage_merge_insert`.
1952/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
1953/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
1954/// collapse genuine duplicate source keys; this check restores fail-fast
1955/// behavior on real dups by erroring before the builder gets a chance to
1956/// silently skip them.
1957///
1958/// Today only single-column string keys are used at the call sites
1959/// (`vec!["id".to_string()]`). The check restricts itself to that shape
1960/// and surfaces an internal error if a future caller passes anything
1961/// else — keeping the assumption explicit instead of silently degrading.
1962fn check_batch_unique_by_keys(
1963 batch: &RecordBatch,
1964 key_columns: &[String],
1965 context: &'static str,
1966) -> Result<()> {
1967 if key_columns.len() != 1 {
1968 return Err(OmniError::manifest_internal(format!(
1969 "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}",
1970 context, key_columns
1971 )));
1972 }
1973 let key_col_name = &key_columns[0];
1974 let column = batch.column_by_name(key_col_name).ok_or_else(|| {
1975 OmniError::manifest_internal(format!(
1976 "{}: source batch missing key column '{}'",
1977 context, key_col_name
1978 ))
1979 })?;
1980 let strs = column
1981 .as_any()
1982 .downcast_ref::<StringArray>()
1983 .ok_or_else(|| {
1984 OmniError::manifest_internal(format!(
1985 "{}: key column '{}' is not a StringArray (got {:?})",
1986 context,
1987 key_col_name,
1988 column.data_type()
1989 ))
1990 })?;
1991
1992 let mut seen: std::collections::HashSet<&str> =
1993 std::collections::HashSet::with_capacity(batch.num_rows());
1994 for i in 0..strs.len() {
1995 if !strs.is_valid(i) {
1996 continue;
1997 }
1998 let v = strs.value(i);
1999 if !seen.insert(v) {
2000 return Err(OmniError::manifest(format!(
2001 "{}: duplicate source row for key '{}' (column '{}'); \
2002 callers must hand in a batch unique by `key_columns` \
2003 — see MR-957",
2004 context, v, key_col_name
2005 )));
2006 }
2007 }
2008 Ok(())
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013 use super::*;
2014 use arrow_array::StringArray;
2015 use arrow_schema::{DataType, Field, Schema};
2016
2017 fn batch_with_ids(ids: &[&str]) -> RecordBatch {
2018 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
2019 let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef;
2020 RecordBatch::try_new(schema, vec![col]).unwrap()
2021 }
2022
2023 #[test]
2024 fn check_batch_unique_by_keys_passes_when_all_unique() {
2025 let batch = batch_with_ids(&["a", "b", "c"]);
2026 check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap();
2027 }
2028
2029 #[test]
2030 fn check_batch_unique_by_keys_errors_on_duplicate_id() {
2031 let batch = batch_with_ids(&["a", "b", "a"]);
2032 let err = check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err();
2033 let msg = err.to_string();
2034 assert!(
2035 msg.contains("duplicate source row for key 'a'"),
2036 "unexpected error: {msg}"
2037 );
2038 assert!(
2039 msg.contains("MR-957"),
2040 "error should reference MR-957: {msg}"
2041 );
2042 }
2043
2044 #[test]
2045 fn check_batch_unique_by_keys_rejects_multi_column_keys() {
2046 let batch = batch_with_ids(&["a"]);
2047 let err =
2048 check_batch_unique_by_keys(&batch, &["id".to_string(), "other".to_string()], "test")
2049 .unwrap_err();
2050 assert!(err.to_string().contains("single-column keys only"));
2051 }
2052}