use super::Dataset;
use crate::session::caches::{RowIdIndexKey, RowIdSequenceKey};
use crate::{Error, Result};
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use lance_core::utils::deletion::DeletionVector;
use lance_table::{
format::{Fragment, RowIdMeta},
rowids::{FragmentRowIdIndex, RowIdIndex, RowIdSequence, read_row_ids},
};
use std::sync::Arc;
pub async fn load_row_id_sequence(
dataset: &Dataset,
fragment: &Fragment,
) -> Result<Arc<RowIdSequence>> {
match &fragment.row_id_meta {
None => Err(Error::internal("Missing row id meta")),
Some(RowIdMeta::Inline(data)) => {
let data = data.clone();
let key = RowIdSequenceKey {
fragment_id: fragment.id,
};
dataset
.metadata_cache
.get_or_insert_with_key(key, || async move { read_row_ids(&data) })
.await
}
Some(RowIdMeta::External(file_slice)) => {
let file_slice = file_slice.clone();
let dataset_clone = dataset.clone();
let key = RowIdSequenceKey {
fragment_id: fragment.id,
};
dataset
.metadata_cache
.get_or_insert_with_key(key, || async move {
let path = dataset_clone.base.child(file_slice.path.as_str());
let range = file_slice.offset as usize
..(file_slice.offset as usize + file_slice.size as usize);
let data = dataset_clone
.object_store
.open(&path)
.await?
.get_range(range)
.await?;
read_row_ids(&data)
})
.await
}
}
}
pub fn load_row_id_sequences<'a>(
dataset: &'a Dataset,
fragments: &'a [Fragment],
) -> impl Stream<Item = Result<(u32, Arc<RowIdSequence>)>> + 'a {
futures::stream::iter(fragments)
.map(|fragment| {
load_row_id_sequence(dataset, fragment).map_ok(move |seq| (fragment.id as u32, seq))
})
.buffer_unordered(dataset.object_store.io_parallelism())
}
pub async fn get_row_id_index(
dataset: &Dataset,
) -> Result<Option<Arc<lance_table::rowids::RowIdIndex>>> {
if dataset.manifest.uses_stable_row_ids() {
let key = RowIdIndexKey {
version: dataset.manifest.version,
};
let index = dataset
.metadata_cache
.get_or_insert_with_key(key, || load_row_id_index(dataset))
.await?;
Ok(Some(index))
} else {
Ok(None)
}
}
async fn load_row_id_index(dataset: &Dataset) -> Result<lance_table::rowids::RowIdIndex> {
let sequences = load_row_id_sequences(dataset, &dataset.manifest.fragments)
.try_collect::<Vec<_>>()
.await?;
let fragment_indices: Vec<_> =
futures::future::try_join_all(sequences.into_iter().map(|(fragment_id, sequence)| {
let dataset = dataset.clone();
async move {
let fragments = dataset.get_fragments();
let fragment = fragments
.iter()
.find(|f| f.id() as u32 == fragment_id)
.expect("Fragment should exist");
let deletion_vector = match fragment.get_deletion_vector().await {
Ok(Some(dv)) => dv,
Ok(None) | Err(_) => Arc::new(DeletionVector::default()),
};
Ok::<FragmentRowIdIndex, Error>(FragmentRowIdIndex {
fragment_id,
row_id_sequence: sequence,
deletion_vector,
})
}
}))
.await?;
let index = RowIdIndex::new(&fragment_indices)?;
Ok(index)
}
#[cfg(test)]
mod test {
use std::ops::Range;
use crate::dataset::{UpdateBuilder, WriteMode, WriteParams, builder::DatasetBuilder};
use super::*;
use crate::dataset::optimize::{CompactionOptions, compact_files};
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
use arrow_array::cast::AsArray;
use arrow_array::types::{Float32Type, Int32Type, UInt64Type};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, UInt64Array};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use futures::Future;
use lance_core::datatypes::Schema;
use lance_core::{ROW_ADDR, ROW_ID, utils::address::RowAddress};
use lance_datagen::Dimension;
use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
use std::collections::HashMap;
use std::collections::HashSet;
fn sequence_batch(values: Range<i32>) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
DataType::Int32,
false,
)]));
RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from_iter_values(values))]).unwrap()
}
#[tokio::test]
async fn test_empty_dataset_rowids() {
let schema = sequence_batch(0..0).schema();
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone());
let write_params = WriteParams {
enable_stable_row_ids: true,
..Default::default()
};
let dataset = Dataset::write(reader, "memory://", Some(write_params))
.await
.unwrap();
assert!(dataset.manifest.uses_stable_row_ids());
let index = get_row_id_index(&dataset).await.unwrap().unwrap();
assert!(index.get(0).is_none());
assert_eq!(dataset.manifest().next_row_id, 0);
}
#[tokio::test]
async fn test_must_set_on_creation() {
let tmp_dir = lance_core::utils::tempfile::TempStrDir::default();
let tmp_path = &tmp_dir;
let batch = sequence_batch(0..10);
let reader =
RecordBatchIterator::new(vec![batch.clone()].into_iter().map(Ok), batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: false,
..Default::default()
};
let dataset = Dataset::write(reader, tmp_path, Some(write_params))
.await
.unwrap();
assert!(!dataset.manifest().uses_stable_row_ids());
let write_params = WriteParams {
enable_stable_row_ids: true,
mode: WriteMode::Append,
..Default::default()
};
let reader =
RecordBatchIterator::new(vec![batch.clone()].into_iter().map(Ok), batch.schema());
let dataset = Dataset::write(reader, tmp_path, Some(write_params))
.await
.unwrap();
assert!(!dataset.manifest().uses_stable_row_ids());
}
#[tokio::test]
async fn test_new_row_ids() {
let num_rows = 25u64;
let batch = sequence_batch(0..num_rows as i32);
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: true,
max_rows_per_file: 10,
..Default::default()
};
let dataset = Dataset::write(reader, "memory://", Some(write_params))
.await
.unwrap();
let index = get_row_id_index(&dataset).await.unwrap().unwrap();
let found_addresses = (0..num_rows)
.map(|i| index.get(i).unwrap())
.collect::<Vec<_>>();
let expected_addresses = (0..num_rows)
.map(|i| {
let fragment_id = i / 10;
RowAddress::new_from_parts(fragment_id as u32, (i % 10) as u32)
})
.collect::<Vec<_>>();
assert_eq!(found_addresses, expected_addresses);
assert_eq!(dataset.manifest().next_row_id, num_rows);
}
#[tokio::test]
async fn test_row_ids_overwrite() {
let num_rows = 10u64;
let batch = sequence_batch(0..num_rows as i32);
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: true,
..Default::default()
};
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let tmp_path = &temp_dir;
let dataset = Dataset::write(reader, tmp_path, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.manifest().next_row_id, num_rows);
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};
let dataset = Dataset::write(reader, tmp_path, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.manifest().next_row_id, 2 * num_rows);
let index = get_row_id_index(&dataset).await.unwrap().unwrap();
assert!(index.get(0).is_none());
assert!(index.get(num_rows).is_some());
}
#[tokio::test]
async fn test_row_ids_append() {
fn write_batch(uri: &str, start: i32) -> impl Future<Output = Result<()>> + '_ {
let batch = sequence_batch(start..(start + 10));
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: true,
mode: WriteMode::Append,
..Default::default()
};
async move {
let _ = Dataset::write(reader, uri, Some(write_params)).await?;
Ok(())
}
}
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let tmp_path = &temp_dir;
let mut start = 0;
write_batch(tmp_path, start).await.unwrap();
start += 10;
let futures = (0..5)
.map(|offset| write_batch(tmp_path, start + offset * 10))
.collect::<Vec<_>>();
futures::future::try_join_all(futures).await.unwrap();
let dataset = DatasetBuilder::from_uri(tmp_path).load().await.unwrap();
assert_eq!(dataset.manifest().next_row_id, 60);
let index = get_row_id_index(&dataset).await.unwrap().unwrap();
assert!(index.get(0).is_some());
assert!(index.get(60).is_none());
}
#[tokio::test]
async fn test_scan_row_ids() {
let batch = sequence_batch(0..6);
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: true,
max_rows_per_file: 2,
..Default::default()
};
let dataset = Dataset::write(reader, "memory://", Some(write_params))
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 3);
for with_row_id in [true, false] {
for with_row_address in &[true, false] {
for projection in &[vec![], vec!["id"]] {
if !with_row_id && !with_row_address && projection.is_empty() {
continue;
}
let mut scan = dataset.scan();
if with_row_id {
scan.with_row_id();
}
if *with_row_address {
scan.with_row_address();
}
let scan = scan.project(projection).unwrap();
let result = scan.try_into_batch().await.unwrap();
if with_row_id {
let row_ids = result[ROW_ID]
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let expected = vec![0, 1, 2, 3, 4, 5].into();
assert_eq!(row_ids, &expected);
}
if *with_row_address {
let row_addrs = result[ROW_ADDR]
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let expected =
vec![0, 1, 1 << 32, (1 << 32) + 1, 2 << 32, (2 << 32) + 1].into();
assert_eq!(row_addrs, &expected);
}
if !projection.is_empty() {
let ids = result["id"].as_any().downcast_ref::<Int32Array>().unwrap();
let expected = vec![0, 1, 2, 3, 4, 5].into();
assert_eq!(ids, &expected);
}
}
}
}
}
#[rstest::rstest]
#[tokio::test]
async fn test_delete_with_row_ids(#[values(true, false)] with_scalar_index: bool) {
let batch = sequence_batch(0..6);
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: true,
max_rows_per_file: 2,
..Default::default()
};
let mut dataset = Dataset::write(reader, "memory://", Some(write_params))
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 3);
if with_scalar_index {
dataset
.create_index(
&["id"],
IndexType::Scalar,
None,
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
}
dataset.delete("id = 3 or id = 4").await.unwrap();
let mut scan = dataset.scan();
scan.with_row_id().with_row_address();
let result = scan.try_into_batch().await.unwrap();
let row_ids = result[ROW_ID]
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let expected = vec![0, 1, 2, 5].into();
assert_eq!(row_ids, &expected);
let row_addrs = result[ROW_ADDR]
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let expected = vec![0, 1, 1 << 32, (2 << 32) + 1].into();
assert_eq!(row_addrs, &expected);
}
#[tokio::test]
async fn test_row_ids_update() {
let num_rows = 5u64;
let batch = sequence_batch(0..num_rows as i32);
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let write_params = WriteParams {
enable_stable_row_ids: true,
..Default::default()
};
let dataset = Dataset::write(reader, "memory://", Some(write_params))
.await
.unwrap();
assert_eq!(dataset.manifest().next_row_id, num_rows);
let update_result = UpdateBuilder::new(Arc::new(dataset))
.update_where("id = 3")
.unwrap()
.set("id", "100")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
let dataset = update_result.new_dataset;
let index = get_row_id_index(&dataset).await.unwrap().unwrap();
assert!(index.get(0).is_some());
assert_eq!(index.get(3), Some(RowAddress::new_from_parts(1, 0)));
assert_eq!(index.get(5), None);
}
fn build_rowid_to_i_map(row_ids: &UInt64Array, i_array: &Int32Array) -> HashMap<u64, i32> {
row_ids
.values()
.iter()
.zip(i_array.values().iter())
.map(|(&row_id, &i)| (row_id, i))
.collect()
}
async fn scan_rowid_map(dataset: &Dataset) -> HashMap<u64, i32> {
let mut scan = dataset.scan();
scan.with_row_id();
scan.scan_in_order(true);
let result = scan.try_into_batch().await.unwrap();
let i = result["i"].as_any().downcast_ref::<Int32Array>().unwrap();
let row_ids = result[ROW_ID]
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
build_rowid_to_i_map(row_ids, i)
}
async fn compact(dataset: &mut Dataset, target_rows: usize) {
let options = CompactionOptions {
target_rows_per_fragment: target_rows,
..Default::default()
};
let _ = compact_files(dataset, options, None).await.unwrap();
}
async fn delete(dataset: &mut Dataset, expr: &str) {
dataset.delete(expr).await.unwrap();
}
#[tokio::test]
async fn test_stable_row_id_after_multiple_deletion_and_compaction() {
async fn delete(dataset: &mut Dataset, expr: &str) {
dataset.delete(expr).await.unwrap();
}
let mut dataset = lance_datagen::gen_batch()
.col("i", lance_datagen::array::step::<Int32Type>())
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col(
"category",
lance_datagen::array::cycle::<Int32Type>(vec![1, 2, 3]),
)
.into_ram_dataset_with_params(
FragmentCount::from(6),
FragmentRowCount::from(10),
Some(WriteParams {
max_rows_per_file: 10,
enable_stable_row_ids: true,
enable_v2_manifest_paths: true,
..Default::default()
}),
)
.await
.unwrap();
delete(&mut dataset, "i = 2 or i = 3 or i = 5").await;
let map_before = scan_rowid_map(&dataset).await;
compact(&mut dataset, 20).await;
let map_after = scan_rowid_map(&dataset).await;
assert_eq!(
map_before.keys().collect::<HashSet<_>>(),
map_after.keys().collect::<HashSet<_>>()
);
for row_id in map_before.keys() {
assert_eq!(map_before[row_id], map_after[row_id]);
}
delete(&mut dataset, "i = 9").await;
let mut scan = dataset.scan();
let result = scan
.filter("i >= 0")
.unwrap()
.try_into_batch()
.await
.unwrap();
let ids = result["i"].as_any().downcast_ref::<Int32Array>().unwrap();
let id_set = ids.values().iter().cloned().collect::<HashSet<_>>();
let expected: Vec<i32> = (0..60)
.filter(|&i| i != 2 && i != 3 && i != 5 && i != 9)
.collect();
assert_eq!(id_set, expected.iter().cloned().collect());
let mut scan = dataset.scan();
scan.with_row_id();
scan.scan_in_order(true);
let result = scan
.filter("i == 15")
.unwrap()
.try_into_batch()
.await
.unwrap();
let row_id_vec = result[ROW_ID]
.as_primitive::<UInt64Type>()
.values()
.to_vec();
delete(&mut dataset, "i = 15 or i = 25").await;
let map_before = scan_rowid_map(&dataset).await;
compact(&mut dataset, 30).await;
let map_after = scan_rowid_map(&dataset).await;
assert_eq!(
map_before.keys().collect::<HashSet<_>>(),
map_after.keys().collect::<HashSet<_>>()
);
for row_id in map_before.keys() {
assert_eq!(map_before[row_id], map_after[row_id]);
}
let result = dataset
.take_rows(&row_id_vec, Schema::try_from(dataset.schema()).unwrap())
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn test_stable_row_id_after_deletion_update_and_compaction() {
let mut dataset = lance_datagen::gen_batch()
.col(
"i",
lance_datagen::array::step::<arrow_array::types::Int32Type>(),
)
.col(
"category",
lance_datagen::array::cycle::<Int32Type>(vec![1, 2, 3]),
)
.into_ram_dataset_with_params(
FragmentCount::from(6),
FragmentRowCount::from(10),
Some(WriteParams {
max_rows_per_file: 10,
enable_stable_row_ids: true,
enable_v2_manifest_paths: true,
..Default::default()
}),
)
.await
.unwrap();
delete(&mut dataset, "i = 2 or i = 3 or i = 5").await;
let map_before = scan_rowid_map(&dataset).await;
let updated_dataset = UpdateBuilder::new(Arc::new(dataset))
.update_where("i >= 15")
.unwrap()
.set("category", "999")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap()
.new_dataset;
let mut dataset = Arc::try_unwrap(updated_dataset).expect("no other Arc references");
compact(&mut dataset, 20).await;
let map_after = scan_rowid_map(&dataset).await;
assert_eq!(
map_before.keys().collect::<HashSet<_>>(),
map_after.keys().collect::<HashSet<_>>()
);
for row_id in map_before.keys() {
assert_eq!(map_before[row_id], map_after[row_id]);
}
let mut scan = dataset.scan();
scan.with_row_id();
scan.scan_in_order(true);
let result = scan.try_into_batch().await.unwrap();
let i = result["i"].as_any().downcast_ref::<Int32Array>().unwrap();
let category = result["category"]
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
for idx in 0..i.len() {
if i.value(idx) >= 15 {
assert_eq!(category.value(idx), 999);
}
}
}
}