use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field, Schema};
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::optimize::{CompactionOptions, compact_files};
use lance::dataset::transaction::Operation;
use lance::dataset::write::delete::DeleteResult;
use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
};
use lance::index::DatasetIndexExt;
use lance_file::version::LanceFileVersion;
use lance_index::IndexType;
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::ScalarIndexParams;
use lance_namespace::LanceNamespace;
use lance_table::io::commit::ManifestNamingScheme;
async fn fresh_dataset(uri: &str) -> Dataset {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["alice", "bob"])),
Arc::new(Int32Array::from(vec![1, 2])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Create,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
Dataset::write(reader, uri, Some(params)).await.unwrap()
}
#[tokio::test]
async fn lance_error_too_much_write_contention_variant_exists() {
let err = lance::Error::too_much_write_contention("guard");
assert!(
matches!(err, lance::Error::TooMuchWriteContention { .. }),
"Lance::Error::TooMuchWriteContention variant missing or renamed; \
update db/manifest/publisher.rs::map_lance_publish_error and \
this guard, then re-pin docs/dev/lance.md."
);
}
#[tokio::test]
async fn manifest_location_field_shape() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard.lance");
let ds = fresh_dataset(uri.to_str().unwrap()).await;
let loc = ds.manifest_location();
let _path: &object_store::path::Path = &loc.path;
let _size: Option<u64> = loc.size;
let _e_tag: Option<String> = loc.e_tag.clone();
let _scheme: ManifestNamingScheme = loc.naming_scheme;
assert!(!format!("{:?}", loc.naming_scheme).is_empty());
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_checkout_version_then_restore_signature() -> lance::Result<()> {
let ds: Dataset = unimplemented!();
let mut ds: Dataset = ds.checkout_version(1u64).await?;
let _: () = ds.restore().await?;
Ok(())
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_dataset_builder_from_namespace_signature(
ns: Arc<dyn LanceNamespace>,
) -> lance::Result<()> {
let builder: DatasetBuilder =
DatasetBuilder::from_namespace(ns, vec!["table".to_string()]).await?;
let builder: DatasetBuilder = builder.with_branch("b", None);
let builder: DatasetBuilder = builder.with_version(1u64);
let _ds: Dataset = builder.load().await?;
Ok(())
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_merge_insert_builder_method_chain() -> lance::Result<()> {
use lance::dataset::MergeStats;
let ds: Arc<Dataset> = unimplemented!();
let job = MergeInsertBuilder::try_new(ds, vec!["object_id".to_string()])?
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.conflict_retries(0)
.use_index(false)
.try_build()?;
let source: RecordBatchIterator<Vec<Result<RecordBatch, arrow_schema::ArrowError>>> =
unimplemented!();
let result: (Arc<Dataset>, MergeStats) = job.execute_reader(source).await?;
let _ds: Arc<Dataset> = result.0;
let _stats: MergeStats = result.1;
Ok(())
}
#[test]
fn write_params_default_does_not_set_storage_version() {
let params = WriteParams::default();
assert_eq!(
params.data_storage_version, None,
"WriteParams::default().data_storage_version is no longer None; \
audit every explicit V2_2 pin (see rg 'LanceFileVersion::V2_2')."
);
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_compact_files_signature() -> lance::Result<()> {
let mut ds: Dataset = unimplemented!();
let options: CompactionOptions = CompactionOptions::default();
let _metrics = compact_files(&mut ds, options, None).await?;
Ok(())
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_transaction_history_for_repair_signature() -> lance::Result<()> {
let ds: Dataset = unimplemented!();
let tx = ds.read_transaction_by_version(1u64).await?;
if let Some(tx) = tx {
let operation = tx.operation;
let _name: &str = operation.name();
match operation {
Operation::Rewrite { .. } | Operation::ReserveFragments { .. } => {}
_ => {}
}
}
Ok(())
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_delete_result_field_shape() -> lance::Result<()> {
let mut ds: Dataset = unimplemented!();
let result: DeleteResult = ds.delete("x = 1").await?;
let _new_dataset: Arc<Dataset> = result.new_dataset;
let _num_deleted: u64 = result.num_deleted_rows;
Ok(())
}
#[tokio::test]
async fn force_delete_branch_semantics() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard9.lance");
let uri = uri.to_str().unwrap();
let mut ds = fresh_dataset(uri).await;
assert!(
ds.delete_branch("nope").await.is_err(),
"Dataset::delete_branch on a missing ref should error; if this is now \
Ok, the reconciler could drop the force variant."
);
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
ds.force_delete_branch("feature").await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"force_delete_branch should remove an existing branch ref"
);
assert!(
ds.force_delete_branch("never").await.is_err(),
"force_delete_branch on a fully-absent branch no longer errors — \
TableStore::force_delete_branch's NotFound tolerance can be simplified."
);
}
#[tokio::test]
async fn compact_files_still_fails_on_blob_columns() {
use arrow_array::{LargeBinaryArray, StructArray};
fn blob_batch(start: i32, n: i32) -> RecordBatch {
let ids: Vec<String> = (start..start + n).map(|i| format!("n{i}")).collect();
let data =
LargeBinaryArray::from_iter_values((start..start + n).map(|i| format!("blob{i}")));
let blob_uri = StringArray::from(vec![None::<&str>; n as usize]);
let DataType::Struct(fields) = lance::blob::blob_field("content", true).data_type().clone()
else {
unreachable!("blob_field is always a Struct");
};
let content = StructArray::new(
fields,
vec![Arc::new(data) as _, Arc::new(blob_uri) as _],
None,
);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
lance::blob::blob_field("content", true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(ids)) as _,
Arc::new(content) as _,
],
)
.unwrap()
}
async fn write(uri: &str, batch: RecordBatch, mode: WriteMode) {
let schema = batch.schema();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
Dataset::write(reader, uri, Some(params)).await.unwrap();
}
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard10-blob.lance");
let uri = uri.to_str().unwrap();
write(uri, blob_batch(0, 2), WriteMode::Create).await;
write(uri, blob_batch(100, 2), WriteMode::Append).await;
let mut ds = Dataset::open(uri).await.unwrap();
assert!(
ds.get_fragments().len() >= 2,
"guard needs a multi-fragment table to trigger a real compaction rewrite"
);
let result = compact_files(&mut ds, CompactionOptions::default(), None).await;
let err = result.expect_err(
"compact_files unexpectedly SUCCEEDED on a blob table — the Lance blob-v2 \
compaction bug is fixed. Flip LANCE_SUPPORTS_BLOB_COMPACTION to true in \
db/omnigraph/optimize.rs, remove the blob-skip branch, and re-pin docs/dev/lance.md.",
);
assert!(
err.to_string()
.contains("more fields in the schema than provided column indices"),
"blob compaction failed with an unexpected error (Lance internals may have \
shifted): {err}"
);
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_scalar_index_coverage_surface() -> lance::Result<()> {
let ds: Dataset = unimplemented!();
for frag in ds.fragments().iter() {
let _physical_rows: Option<usize> = frag.physical_rows;
let _id: u64 = frag.id;
}
let indices = ds.load_indices().await?;
for index in indices.iter() {
let _fields: &Vec<i32> = &index.fields;
if let Some(details) = index.index_details.as_ref() {
let _type_url: &str = details.type_url.as_str();
}
let _covered: Option<bool> = index.fragment_bitmap.as_ref().map(|b| b.contains(0u32));
}
Ok(())
}
#[tokio::test]
async fn scalar_index_on_system_version_column_probe() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard12.lance");
let mut ds = fresh_dataset(uri.to_str().unwrap()).await;
assert!(
ds.schema().field("_row_last_updated_at_version").is_none(),
"PROBE NOTE: `_row_last_updated_at_version` is NOT in the user schema \
(it is system metadata); indexing it resolves through a different path."
);
let result = ds
.create_index_builder(
&["_row_last_updated_at_version"],
IndexType::BTree,
&ScalarIndexParams::default(),
)
.replace(true)
.await;
assert!(
result.is_err(),
"create_index on `_row_last_updated_at_version` unexpectedly SUCCEEDED — \
a system-column scalar index is now buildable; the persisted-artifact \
delta read could use it. Update the deferred-design notes."
);
}
#[tokio::test]
async fn fragment_deletion_metadata_is_available() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard13.lance");
let ds = fresh_dataset(uri.to_str().unwrap()).await;
let deleted: DeleteResult = {
let mut ds = ds;
ds.delete("id = 'alice'").await.unwrap()
};
assert_eq!(deleted.num_deleted_rows, 1, "one row deleted");
let ds = deleted.new_dataset;
let with_deletion = ds
.fragments()
.iter()
.find(|f| f.deletion_file.is_some())
.expect(
"after a delete, some fragment must carry a deletion_file — if not, \
Lance changed deletion tracking; the artifact coverage model's \
cheap delete-detection assumption is invalid.",
);
let count: Option<usize> = with_deletion
.deletion_file
.as_ref()
.and_then(|df| df.num_deleted_rows);
assert_eq!(
count,
Some(1),
"PROBE: deletion_file.num_deleted_rows is not a populated metadata count \
(got {count:?}); the artifact coverage model cannot cheaply detect \
per-fragment deletions and would need to read the deletion vector.",
);
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_optimize_indices_signature() -> lance::Result<()> {
let mut ds: Dataset = unimplemented!();
let options = OptimizeOptions::default();
let _: () = ds.optimize_indices(&options).await?;
Ok(())
}
#[tokio::test]
async fn optimize_indices_extends_fragment_coverage() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard_optimize_indices.lance");
let uri = uri.to_str().unwrap();
let mut ds = fresh_dataset(uri).await;
ds.create_index_builder(&["value"], IndexType::BTree, &ScalarIndexParams::default())
.replace(true)
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["carol"])),
Arc::new(Int32Array::from(vec![3])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Append,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
Dataset::write(reader, uri, Some(params)).await.unwrap();
let mut ds = Dataset::open(uri).await.unwrap();
assert!(
value_index_uncovered_count(&ds).await > 0,
"appended fragment should be uncovered by the BTREE before optimize_indices"
);
ds.optimize_indices(&OptimizeOptions::default())
.await
.unwrap();
assert_eq!(
value_index_uncovered_count(&ds).await,
0,
"optimize_indices must fold the appended fragment into the existing index \
(incremental coverage); if this regresses, PR3's reindex no longer keeps \
coverage current — revisit db/omnigraph/optimize.rs and docs/dev/lance.md."
);
}
async fn value_index_uncovered_count(ds: &Dataset) -> usize {
let indices = ds.load_indices().await.unwrap();
let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
let value_fid = ds.schema().field("value").unwrap().id;
for index in indices.iter() {
if index.fields.len() == 1 && index.fields[0] == value_fid {
if let Some(bitmap) = index.fragment_bitmap.as_ref() {
return frag_ids.iter().filter(|id| !bitmap.contains(**id)).count();
}
}
}
frag_ids.len()
}
#[tokio::test]
async fn scalar_index_use_requires_matched_literal_type() {
use datafusion::physical_plan::displayable;
use datafusion::prelude::{col, lit};
use datafusion::scalar::ScalarValue;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("probe_literal_type.lance");
let uri = uri.to_str().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("n32", DataType::Int32, false),
Field::new("d32", DataType::Date32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 5, 9, 13])),
Arc::new(arrow_array::Date32Array::from(vec![19000, 19723, 20000, 20500])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Create,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
let mut ds = Dataset::write(reader, uri, Some(params)).await.unwrap();
for c in ["n32", "d32"] {
ds.create_index_builder(&[c], IndexType::BTree, &ScalarIndexParams::default())
.replace(true)
.await
.unwrap();
}
async fn plan_str(ds: &Dataset, filter: datafusion::prelude::Expr) -> String {
let mut scanner = ds.scan();
scanner.filter_expr(filter);
let plan = scanner.create_plan().await.unwrap();
format!("{}", displayable(plan.as_ref()).indent(true))
}
let cases = [
("n32 = 5i32 (matched Int32)", col("n32").eq(lit(5i32)), true),
("n32 = 5i64 (widened Int64)", col("n32").eq(lit(5i64)), false),
(
"d32 = Date32 (matched)",
col("d32").eq(lit(ScalarValue::Date32(Some(19723)))),
true,
),
(
"d32 = '2024-01-01' (Utf8 vs Date32)",
col("d32").eq(lit("2024-01-01")),
true,
),
];
for (label, filter, expect_index) in cases {
let s = plan_str(&ds, filter).await;
let uses_index = s.contains("ScalarIndexQuery");
assert_eq!(
uses_index, expect_index,
"[{label}] expected scalar-index use = {expect_index}, got {uses_index}.\n\
A change here means Lance/DataFusion shifted its coercion or index \
pushdown; re-validate query.rs::literal_to_typed_expr.\nplan:\n{s}"
);
}
let widened = plan_str(&ds, col("n32").eq(lit(5i64))).await;
assert!(
widened.contains("CAST(n32 AS Int64)"),
"expected a column-side cast in the widened plan, got:\n{widened}"
);
}
#[tokio::test]
async fn btree_range_query_boundary_is_correct() {
use arrow_array::Float64Array;
use futures::TryStreamExt;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard17.lance");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
Arc::new(Float64Array::from(vec![1.0, 5.0, 10.0, 15.0, 20.0])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Create,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
let mut ds = Dataset::write(reader, uri.to_str().unwrap(), Some(params))
.await
.unwrap();
ds.create_index_builder(&["price"], IndexType::BTree, &ScalarIndexParams::default())
.replace(true)
.await
.unwrap();
let mut scanner = ds.scan();
scanner.filter("price <= 10.0 AND price > 5.0").unwrap();
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut got: Vec<f64> = Vec::new();
for b in &batches {
let col = b
.column_by_name("price")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
for i in 0..col.len() {
got.push(col.value(i));
}
}
got.sort_by(|a, b| a.partial_cmp(b).unwrap());
assert_eq!(
got,
vec![10.0],
"BTREE range `price <= 10 AND price > 5` must return exactly [10.0] \
(lance#6796 / issue #6792 boundary fix); got {got:?}. If this regressed, \
Lance reintroduced the range-bound inclusiveness bug.",
);
}
#[tokio::test]
async fn skip_auto_cleanup_suppresses_version_gc() {
use std::collections::HashMap;
async fn set_legacy_cleanup(ds: &mut Dataset) {
let mut cfg = HashMap::new();
cfg.insert("lance.auto_cleanup.interval".to_string(), "1".to_string());
cfg.insert("lance.auto_cleanup.older_than".to_string(), "0ms".to_string());
ds.update_config(cfg).await.unwrap();
}
fn row(i: i32) -> (Arc<Schema>, RecordBatch) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec![format!("k{i}")])),
Arc::new(Int32Array::from(vec![i])),
],
)
.unwrap();
(schema, batch)
}
let ctrl = tempfile::tempdir().unwrap();
let curi = ctrl.path().join("g18_ctrl.lance");
let curi = curi.to_str().unwrap();
let mut ds = fresh_dataset(curi).await;
let v1 = ds.version().version;
set_legacy_cleanup(&mut ds).await;
for i in 0..5 {
let (schema, batch) = row(i);
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
ds.append(
reader,
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap();
}
assert!(
ds.checkout_version(v1).await.is_err(),
"negative control: without skip_auto_cleanup, the legacy auto_cleanup \
config should have GC'd pinned v{v1}; if this fails the config is not \
firing and the positive assertion below proves nothing."
);
let keep = tempfile::tempdir().unwrap();
let kuri = keep.path().join("g18.lance");
let kuri = kuri.to_str().unwrap();
let mut ds = fresh_dataset(kuri).await;
let v1 = ds.version().version;
set_legacy_cleanup(&mut ds).await;
for i in 0..5 {
let (_schema, batch) = row(i);
let tx = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&WriteParams {
mode: WriteMode::Append,
..Default::default()
})
.execute_uncommitted(vec![batch])
.await
.unwrap();
ds = CommitBuilder::new(Arc::new(ds.clone()))
.with_skip_auto_cleanup(true)
.execute(tx)
.await
.unwrap();
}
assert!(
ds.checkout_version(v1).await.is_ok(),
"v{v1} was GC'd despite CommitBuilder::with_skip_auto_cleanup(true) — the \
commit_staged / publisher skip is the only thing protecting \
__manifest-pinned versions on upgraded (pre-bump) graphs."
);
}
#[tokio::test]
async fn unenforced_primary_key_is_immutable_once_set() {
use lance::datatypes::LANCE_UNENFORCED_PRIMARY_KEY;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("g19.lance");
let mut ds = fresh_dataset(uri.to_str().unwrap()).await;
assert!(
ds.schema().unenforced_primary_key().is_empty(),
"fresh dataset should carry no unenforced primary key"
);
ds.update_field_metadata()
.update(
"id",
[(LANCE_UNENFORCED_PRIMARY_KEY.to_string(), "true".to_string())],
)
.unwrap()
.await
.unwrap();
let pk: Vec<String> = ds
.schema()
.unenforced_primary_key()
.iter()
.map(|field| field.name.clone())
.collect();
assert_eq!(
pk,
["id"],
"first set should install `id` as the unenforced PK"
);
let outcome: lance::Result<()> = match ds.update_field_metadata().update(
"id",
[(LANCE_UNENFORCED_PRIMARY_KEY.to_string(), "true".to_string())],
) {
Ok(builder) => builder.await.map(|_| ()),
Err(e) => Err(e),
};
assert!(
matches!(&outcome, Err(e) if e.to_string().contains("cannot be changed once set")),
"Lance no longer rejects re-setting the unenforced PK as immutable \
(got: {outcome:?}); immutability relaxed or moved off the commit path \
— revisit migrate_v1_to_v2's field-guard and re-pin docs/dev/lance.md."
);
}