use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WhenMatched, WhenNotMatched};
use lance_table::format::Fragment;
use omnigraph::table_store::{StagedWrite, TableStore};
use std::sync::Arc;
fn person_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
]))
}
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
],
)
.unwrap()
}
fn collect_ids(batches: &[RecordBatch]) -> Vec<String> {
let mut out = Vec::new();
for b in batches {
let ids = b
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..ids.len() {
out.push(ids.value(i).to_string());
}
}
out.sort();
out
}
#[tokio::test]
async fn stage_append_is_visible_via_scan_with_staged() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]), &[])
.await
.unwrap();
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
let plain = store.scan_batches(&ds).await.unwrap();
assert_eq!(collect_ids(&plain), vec!["alice"]);
}
#[tokio::test]
async fn stage_merge_insert_dedupes_superseded_committed_fragment() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
assert!(
!staged.removed_fragment_ids.is_empty(),
"merge_insert that rewrites a committed row must set removed_fragment_ids \
so the scan-with-staged composer can shadow the superseded committed \
fragment — without it, the committed row and its rewrite both appear, \
producing duplicates by key"
);
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(ids, vec!["alice"], "merge_insert must not surface duplicates");
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 1);
let ages: Vec<i32> = batches
.iter()
.flat_map(|b| {
let col = b
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
(0..col.len()).map(|i| col.value(i)).collect::<Vec<_>>()
})
.collect();
assert_eq!(ages, vec![31]);
}
#[tokio::test]
async fn count_rows_with_staged_matches_scan() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25)), ("carol", Some(40))]),
&[],
)
.await
.unwrap();
let count = store
.count_rows_with_staged(&ds, std::slice::from_ref(&staged), None)
.await
.unwrap();
assert_eq!(count, 3);
}
#[tokio::test]
async fn chained_stage_appends_have_distinct_row_ids() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))]))
.await
.unwrap();
let s1 = store
.stage_append(&ds, person_batch(&[("alice", Some(30))]), &[])
.await
.unwrap();
let s2 = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25))]),
std::slice::from_ref(&s1),
)
.await
.unwrap();
let staged = vec![s1, s2];
let batches = store
.scan_with_staged(&ds, &staged, None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(ids, vec!["alice", "bob", "seed"]);
let mut scanner = ds.scan();
scanner.with_row_id();
scanner.with_fragments(combine_for_scan(&ds, &staged));
let stream = scanner.try_into_stream().await.unwrap();
let projected: Vec<_> = stream.try_collect().await.unwrap();
let row_ids: std::collections::BTreeSet<u64> = projected
.iter()
.flat_map(|b| {
let arr = b
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
(0..arr.len()).map(|i| arr.value(i)).collect::<Vec<_>>()
})
.collect();
assert_eq!(
row_ids.len(),
3,
"all 3 rows (1 committed + 2 staged) should have distinct _rowid; \
overlap implies stage_append failed to offset by prior_stages"
);
}
fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()
.flat_map(|w| w.removed_fragment_ids.iter().copied())
.collect();
let mut combined: Vec<_> = ds
.manifest
.fragments
.iter()
.filter(|f| !removed.contains(&f.id))
.cloned()
.collect();
for s in staged {
combined.extend(s.new_fragments.iter().cloned());
}
combined
}
#[tokio::test]
async fn stage_append_then_commit_persists_data() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]), &[])
.await
.unwrap();
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(
new_ds.version().version > pre_version,
"commit_staged must advance the dataset version"
);
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
}
#[tokio::test]
async fn stage_merge_insert_then_commit_persists_merged_view() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31)), ("bob", Some(25))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
store
.commit_staged(Arc::new(ds), staged.transaction)
.await
.unwrap();
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 2, "merge_insert must not duplicate the matched row");
}
#[tokio::test]
async fn scan_with_staged_with_filter_silently_drops_staged_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("carol", Some(40))]),
)
.await
.unwrap();
let staged = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25)), ("dave", Some(35))]),
&[],
)
.await
.unwrap();
let batches = store
.scan_with_staged(
&ds,
std::slice::from_ref(&staged),
None,
Some("age >= 30"),
)
.await
.unwrap();
assert_eq!(
collect_ids(&batches),
vec!["alice", "carol"],
"documented limitation: filter pushdown drops staged fragments. \
If you're here because this assertion failed: either (a) Lance \
exposed a way to scan uncommitted fragments without stats-based \
pruning (good — update to assert == [alice, carol, dave]), or \
(b) something changed in our scan_with_staged path. See PR #67 \
test fix discussion + .context/mr-794-step2-design.md §1.1."
);
let unfiltered = store
.scan_with_staged(
&ds,
std::slice::from_ref(&staged),
None,
None,
)
.await
.unwrap();
assert_eq!(
collect_ids(&unfiltered),
vec!["alice", "bob", "carol", "dave"],
"unfiltered scan_with_staged returns all rows correctly"
);
}
#[tokio::test]
async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))]))
.await
.unwrap();
let staged_1 = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(30))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
let staged_2 = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
let batches = store
.scan_with_staged(&ds, &[staged_1, staged_2], None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
let alice_count = ids.iter().filter(|id| *id == "alice").count();
assert_eq!(
alice_count, 2,
"chained stage_merge_insert with shared key produces duplicates — \
this is the contract documented on stage_merge_insert. If you're \
here because this assertion failed: either (a) the primitive was \
improved to dedupe across stages (good — update to assert == 1) \
or (b) something subtler broke (investigate before changing the \
assertion). The engine's MutationStaging accumulator dedupes by \
id at finalize time so this primitive-level pitfall doesn't \
surface in production paths — see exec/staging.rs."
);
}
#[tokio::test]
async fn stage_overwrite_does_not_advance_head_until_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_overwrite(&ds, person_batch(&[("zoe", Some(99))]))
.await
.unwrap();
assert_eq!(
ds.version().version,
pre_version,
"stage_overwrite must not advance HEAD"
);
let reopened = Dataset::open(&uri).await.unwrap();
assert_eq!(reopened.version().version, pre_version);
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
let after = store.scan_batches(&new_ds).await.unwrap();
assert_eq!(collect_ids(&after), vec!["zoe"]);
}
#[tokio::test]
async fn stage_overwrite_replaces_all_fragments() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let committed_fragment_ids: std::collections::HashSet<u64> =
ds.manifest.fragments.iter().map(|f| f.id).collect();
let staged = store
.stage_overwrite(&ds, person_batch(&[("zoe", Some(99))]))
.await
.unwrap();
let removed: std::collections::HashSet<u64> =
staged.removed_fragment_ids.iter().copied().collect();
assert_eq!(
removed, committed_fragment_ids,
"stage_overwrite must list every committed fragment as removed so \
scan_with_staged shadows them all (overwrite semantics — pre-data \
is being wiped)"
);
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
assert_eq!(
collect_ids(&batches),
vec!["zoe"],
"scan_with_staged must show only the staged row, not committed + staged"
);
}
#[tokio::test]
async fn stage_create_btree_index_does_not_advance_head_until_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
assert!(
!store.has_btree_index(&ds, "id").await.unwrap(),
"fresh dataset has no btree index on `id`"
);
let staged = store.stage_create_btree_index(&ds, &["id"]).await.unwrap();
assert_eq!(
ds.version().version,
pre_version,
"stage_create_btree_index must not advance HEAD"
);
let reopened = Dataset::open(&uri).await.unwrap();
assert_eq!(
reopened.version().version,
pre_version,
"no Lance commit happened on disk"
);
assert!(
!store.has_btree_index(&reopened, "id").await.unwrap(),
"index is not visible until commit_staged"
);
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
assert!(
store.has_btree_index(&new_ds, "id").await.unwrap(),
"after commit_staged, the index IS visible"
);
}
#[tokio::test]
async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_create_inverted_index(&ds, "id")
.await
.unwrap();
assert_eq!(
ds.version().version,
pre_version,
"stage_create_inverted_index must not advance HEAD"
);
assert!(!store.has_fts_index(&ds, "id").await.unwrap());
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
assert!(
store.has_fts_index(&new_ds, "id").await.unwrap(),
"after commit_staged, the FTS index IS visible"
);
}
#[tokio::test]
async fn delete_where_advances_head_inline_documents_residual() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
let result = store
.delete_where(&uri, &mut ds, "id = 'alice'")
.await
.unwrap();
assert_eq!(result.deleted_rows, 1);
assert!(
result.version > pre_version,
"delete_where ADVANCES Lance HEAD inline (the residual). When \
lance-format/lance#6658 ships and we migrate to stage_delete + \
commit_staged, flip this assertion to assert that staging does \
NOT advance HEAD."
);
}
#[tokio::test]
async fn create_vector_index_advances_head_inline_documents_residual() {
use arrow_array::FixedSizeListArray;
use arrow_schema::FieldRef;
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/vec.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let dim = 4usize;
let n_rows = 8usize;
let item_field: FieldRef = Arc::new(Field::new("item", DataType::Float32, true));
let vec_field = Field::new(
"embedding",
DataType::FixedSizeList(item_field.clone(), dim as i32),
false,
);
let id_field = Field::new("id", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![id_field, vec_field]));
let ids: Vec<String> = (0..n_rows).map(|i| format!("v{}", i)).collect();
let id_arr = StringArray::from(ids);
let flat: Vec<f32> = (0..(n_rows * dim)).map(|i| i as f32).collect();
let values = arrow_array::Float32Array::from(flat);
let vec_arr =
FixedSizeListArray::new(item_field, dim as i32, Arc::new(values), None);
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(id_arr), Arc::new(vec_arr)],
)
.unwrap();
let mut ds = TableStore::write_dataset(&uri, batch).await.unwrap();
let pre_version = ds.version().version;
assert!(!store.has_vector_index(&ds, "embedding").await.unwrap());
store
.create_vector_index(&mut ds, "embedding")
.await
.unwrap();
assert!(
ds.version().version > pre_version,
"create_vector_index ADVANCES Lance HEAD inline (the residual). \
When the upstream Lance helper `build_index_metadata_from_segments` \
is made `pub`, add `stage_create_vector_index` to the trait and \
flip this test to assert staging does NOT advance HEAD."
);
assert!(store.has_vector_index(&ds, "embedding").await.unwrap());
}