use std::{
any::Any,
collections::{BTreeMap, HashMap},
fmt::Debug,
ops::Bound,
sync::Arc,
};
use arrow::array::BinaryBuilder;
use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array, new_null_array};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::ScalarValue;
use deepsize::DeepSizeOf;
use futures::{StreamExt, TryStreamExt, stream};
use lance_core::utils::mask::RowSetOps;
use lance_core::{
Error, ROW_ID, Result,
cache::{CacheKey, LanceCache, WeakLanceCache},
error::LanceOptionExt,
utils::{
mask::{NullableRowAddrSet, RowAddrTreeMap},
tokio::get_num_compute_intensive_cpus,
},
};
use roaring::RoaringBitmap;
use serde::Serialize;
use tracing::instrument;
use super::{AnyQuery, IndexStore, ScalarIndex};
use super::{
BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue,
};
use crate::pbold;
use crate::{Index, IndexType, metrics::MetricsCollector};
use crate::{
frag_reuse::FragReuseIndex,
scalar::{
CreatedIndex, UpdateCriteria,
expression::SargableQueryParser,
registry::{
DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
TrainingRequest, VALUE_COLUMN_NAME,
},
},
};
use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser};
pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024;
const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
const BITMAP_INDEX_VERSION: u32 = 0;
#[derive(Clone)]
struct LazyIndexReader {
index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
store: Arc<dyn IndexStore>,
}
impl std::fmt::Debug for LazyIndexReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazyIndexReader")
.field("store", &self.store)
.finish()
}
}
impl LazyIndexReader {
fn new(store: Arc<dyn IndexStore>) -> Self {
Self {
index_reader: Arc::new(tokio::sync::Mutex::new(None)),
store,
}
}
async fn get(&self) -> Result<Arc<dyn IndexReader>> {
let mut reader = self.index_reader.lock().await;
if reader.is_none() {
let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
*reader = Some(index_reader);
}
Ok(reader.as_ref().unwrap().clone())
}
}
#[derive(Clone, Debug)]
pub struct BitmapIndex {
index_map: BTreeMap<OrderableScalarValue, usize>,
null_map: Arc<RowAddrTreeMap>,
value_type: DataType,
store: Arc<dyn IndexStore>,
index_cache: WeakLanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
lazy_reader: LazyIndexReader,
}
#[derive(Debug, Clone)]
pub struct BitmapKey {
value: OrderableScalarValue,
}
impl CacheKey for BitmapKey {
type ValueType = RowAddrTreeMap;
fn key(&self) -> std::borrow::Cow<'_, str> {
format!("{}", self.value.0).into()
}
}
impl BitmapIndex {
fn new(
index_map: BTreeMap<OrderableScalarValue, usize>,
null_map: Arc<RowAddrTreeMap>,
value_type: DataType,
store: Arc<dyn IndexStore>,
index_cache: WeakLanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
) -> Self {
let lazy_reader = LazyIndexReader::new(store.clone());
Self {
index_map,
null_map,
value_type,
store,
index_cache,
frag_reuse_index,
lazy_reader,
}
}
pub(crate) async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
let total_rows = page_lookup_file.num_rows();
if total_rows == 0 {
let schema = page_lookup_file.schema();
let data_type = schema.fields[0].data_type();
return Ok(Arc::new(Self::new(
BTreeMap::new(),
Arc::new(RowAddrTreeMap::default()),
data_type,
store,
WeakLanceCache::from(index_cache),
frag_reuse_index,
)));
}
let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
let mut null_map = Arc::new(RowAddrTreeMap::default());
let mut value_type: Option<DataType> = None;
let mut null_location: Option<usize> = None;
let mut row_offset = 0;
for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
let chunk = page_lookup_file
.read_range(start_row..end_row, Some(&["keys"]))
.await?;
if chunk.num_rows() == 0 {
continue;
}
if value_type.is_none() {
value_type = Some(chunk.schema().field(0).data_type().clone());
}
let dict_keys = chunk.column(0);
for idx in 0..chunk.num_rows() {
let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
if key.0.is_null() {
null_location = Some(row_offset);
} else {
index_map.insert(key, row_offset);
}
row_offset += 1;
}
}
if let Some(null_loc) = null_location {
let batch = page_lookup_file
.read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
.await?;
let binary_bitmaps = batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
let bitmap_bytes = binary_bitmaps.value(0);
let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
if let Some(fri) = &frag_reuse_index {
bitmap = fri.remap_row_addrs_tree_map(&bitmap);
}
null_map = Arc::new(bitmap);
}
let final_value_type = value_type.expect_ok()?;
Ok(Arc::new(Self::new(
index_map,
null_map,
final_value_type,
store,
WeakLanceCache::from(index_cache),
frag_reuse_index,
)))
}
async fn load_bitmap(
&self,
key: &OrderableScalarValue,
metrics: Option<&dyn MetricsCollector>,
) -> Result<Arc<RowAddrTreeMap>> {
if key.0.is_null() {
return Ok(self.null_map.clone());
}
let cache_key = BitmapKey { value: key.clone() };
if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
return Ok(cached);
}
if let Some(metrics) = metrics {
metrics.record_part_load();
}
let row_offset = match self.index_map.get(key) {
Some(loc) => *loc,
None => return Ok(Arc::new(RowAddrTreeMap::default())),
};
let page_lookup_file = self.lazy_reader.get().await?;
let batch = page_lookup_file
.read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
.await?;
let binary_bitmaps = batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
let bitmap_bytes = binary_bitmaps.value(0); let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
if let Some(fri) = &self.frag_reuse_index {
bitmap = fri.remap_row_addrs_tree_map(&bitmap);
}
self.index_cache
.insert_with_key(&cache_key, Arc::new(bitmap.clone()))
.await;
Ok(Arc::new(bitmap))
}
pub(crate) fn value_type(&self) -> &DataType {
&self.value_type
}
pub(crate) async fn load_bitmap_index_state(
&self,
) -> Result<HashMap<ScalarValue, RowAddrTreeMap>> {
let mut state = HashMap::new();
for key in self.index_map.keys() {
let bitmap = self.load_bitmap(key, None).await?;
state.insert(key.0.clone(), (*bitmap).clone());
}
if !self.null_map.is_empty() {
let existing_null = new_null_array(&self.value_type, 1);
let existing_null = ScalarValue::try_from_array(existing_null.as_ref(), 0)?;
state.insert(existing_null, (*self.null_map).clone());
}
Ok(state)
}
}
impl DeepSizeOf for BitmapIndex {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
let mut total_size = 0;
total_size += self.index_map.deep_size_of_children(context);
total_size += self.store.deep_size_of_children(context);
total_size
}
}
#[derive(Serialize)]
struct BitmapStatistics {
num_bitmaps: usize,
}
#[async_trait]
impl Index for BitmapIndex {
fn as_any(&self) -> &dyn Any {
self
}
fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
self
}
fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
Err(Error::not_supported_source(
"BitmapIndex is not a vector index".into(),
))
}
async fn prewarm(&self) -> Result<()> {
let page_lookup_file = self.lazy_reader.get().await?;
let total_rows = page_lookup_file.num_rows();
if total_rows == 0 {
return Ok(());
}
for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
let chunk = page_lookup_file
.read_range(start_row..end_row, None)
.await?;
if chunk.num_rows() == 0 {
continue;
}
let dict_keys = chunk.column(0);
let binary_bitmaps = chunk.column(1);
let bitmap_binary_array = binary_bitmaps
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
for idx in 0..chunk.num_rows() {
let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
if key.0.is_null() {
continue;
}
let bitmap_bytes = bitmap_binary_array.value(idx);
let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
}
let cache_key = BitmapKey { value: key };
self.index_cache
.insert_with_key(&cache_key, Arc::new(bitmap))
.await;
}
}
Ok(())
}
fn index_type(&self) -> IndexType {
IndexType::Bitmap
}
fn statistics(&self) -> Result<serde_json::Value> {
let stats = BitmapStatistics {
num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
};
serde_json::to_value(stats).map_err(|e| {
Error::internal(format!(
"failed to serialize bitmap index statistics: {}",
e
))
})
}
async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
unimplemented!()
}
}
#[async_trait]
impl ScalarIndex for BitmapIndex {
#[instrument(name = "bitmap_search", level = "debug", skip_all)]
async fn search(
&self,
query: &dyn AnyQuery,
metrics: &dyn MetricsCollector,
) -> Result<SearchResult> {
let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
let (row_ids, null_row_ids) = match query {
SargableQuery::Equals(val) => {
metrics.record_comparisons(1);
if val.is_null() {
((*self.null_map).clone(), None)
} else {
let key = OrderableScalarValue(val.clone());
let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
let null_rows = if !self.null_map.is_empty() {
Some((*self.null_map).clone())
} else {
None
};
((*bitmap).clone(), null_rows)
}
}
SargableQuery::Range(start, end) => {
let range_start = match start {
Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
Bound::Unbounded => Bound::Unbounded,
};
let range_end = match end {
Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
Bound::Unbounded => Bound::Unbounded,
};
let empty_range = match (&range_start, &range_end) {
(Bound::Included(lower), Bound::Included(upper)) => lower > upper,
(Bound::Included(lower), Bound::Excluded(upper))
| (Bound::Excluded(lower), Bound::Included(upper))
| (Bound::Excluded(lower), Bound::Excluded(upper)) => lower >= upper,
_ => false,
};
let keys: Vec<_> = if empty_range {
Vec::new()
} else {
self.index_map
.range((range_start, range_end))
.map(|(k, _v)| k.clone())
.collect()
};
metrics.record_comparisons(keys.len());
let result = if keys.is_empty() {
RowAddrTreeMap::default()
} else {
let bitmaps: Vec<_> = stream::iter(
keys.into_iter()
.map(|key| async move { self.load_bitmap(&key, None).await }),
)
.buffer_unordered(get_num_compute_intensive_cpus())
.try_collect()
.await?;
let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
RowAddrTreeMap::union_all(&bitmap_refs)
};
let null_rows = if !self.null_map.is_empty() {
Some((*self.null_map).clone())
} else {
None
};
(result, null_rows)
}
SargableQuery::IsIn(values) => {
metrics.record_comparisons(values.len());
let mut has_null = false;
let keys: Vec<_> = values
.iter()
.filter_map(|val| {
if val.is_null() {
has_null = true;
None
} else {
let key = OrderableScalarValue(val.clone());
if self.index_map.contains_key(&key) {
Some(key)
} else {
None
}
}
})
.collect();
let mut bitmaps: Vec<_> = stream::iter(
keys.into_iter()
.map(|key| async move { self.load_bitmap(&key, None).await }),
)
.buffer_unordered(get_num_compute_intensive_cpus())
.try_collect()
.await?;
if has_null && !self.null_map.is_empty() {
bitmaps.push(self.null_map.clone());
}
let result = if bitmaps.is_empty() {
RowAddrTreeMap::default()
} else {
let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
RowAddrTreeMap::union_all(&bitmap_refs)
};
let null_rows = if !has_null && !self.null_map.is_empty() {
Some((*self.null_map).clone())
} else {
None
};
(result, null_rows)
}
SargableQuery::IsNull() => {
metrics.record_comparisons(1);
((*self.null_map).clone(), None)
}
SargableQuery::FullTextSearch(_) => {
return Err(Error::not_supported_source(
"full text search is not supported for bitmap indexes".into(),
));
}
SargableQuery::LikePrefix(_) => {
return Err(Error::not_supported_source(
"LIKE prefix queries are not supported for bitmap indexes".into(),
));
}
};
let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
Ok(SearchResult::Exact(selection))
}
fn can_remap(&self) -> bool {
true
}
async fn remap(
&self,
mapping: &HashMap<u64, Option<u64>>,
dest_store: &dyn IndexStore,
) -> Result<CreatedIndex> {
let state = self.load_bitmap_index_state().await?;
let remapped_state = BitmapIndexPlugin::remap_bitmap_state(state, mapping);
BitmapIndexPlugin::write_bitmap_index(remapped_state, dest_store, &self.value_type).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
.unwrap(),
index_version: BITMAP_INDEX_VERSION,
files: Some(dest_store.list_files_with_sizes().await?),
})
}
async fn update(
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_old_data_filter: Option<super::OldIndexDataFilter>,
) -> Result<CreatedIndex> {
let state = self.load_bitmap_index_state().await?;
BitmapIndexPlugin::do_train_bitmap_index(new_data, state, dest_store).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
.unwrap(),
index_version: BITMAP_INDEX_VERSION,
files: Some(dest_store.list_files_with_sizes().await?),
})
}
fn update_criteria(&self) -> UpdateCriteria {
UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None).with_row_id())
}
fn derive_index_params(&self) -> Result<ScalarIndexParams> {
Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
}
}
#[derive(Debug, Default)]
pub struct BitmapIndexPlugin;
impl BitmapIndexPlugin {
fn get_batch_from_arrays(
keys: Arc<dyn Array>,
binary_bitmaps: Arc<dyn Array>,
) -> Result<RecordBatch> {
let schema = Arc::new(Schema::new(vec![
Field::new("keys", keys.data_type().clone(), true),
Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
]));
let columns = vec![keys, binary_bitmaps];
Ok(RecordBatch::try_new(schema, columns)?)
}
async fn write_bitmap_index(
state: HashMap<ScalarValue, RowAddrTreeMap>,
index_store: &dyn IndexStore,
value_type: &DataType,
) -> Result<()> {
Self::write_bitmap_index_with_extras(
state,
index_store,
value_type,
HashMap::new(),
Vec::new(),
)
.await
}
pub(crate) async fn write_bitmap_index_with_extras(
state: HashMap<ScalarValue, RowAddrTreeMap>,
index_store: &dyn IndexStore,
value_type: &DataType,
mut metadata: HashMap<String, String>,
global_buffers: Vec<(String, Bytes)>,
) -> Result<()> {
let num_bitmaps = state.len();
let schema = Arc::new(Schema::new(vec![
Field::new("keys", value_type.clone(), true),
Field::new("bitmaps", DataType::Binary, true),
]));
let mut bitmap_index_file = index_store
.new_index_file(BITMAP_LOOKUP_NAME, schema)
.await?;
for (metadata_key, data) in global_buffers {
let buffer_idx = bitmap_index_file.add_global_buffer(data).await?;
metadata.insert(metadata_key, buffer_idx.to_string());
}
let mut cur_keys = Vec::new();
let mut cur_bitmaps = Vec::new();
let mut cur_bytes = 0;
for (key, bitmap) in state.into_iter() {
let mut bytes = Vec::new();
bitmap.serialize_into(&mut bytes).unwrap();
let bitmap_size = bytes.len();
if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
let mut binary_builder = BinaryBuilder::new();
for b in &cur_bitmaps {
binary_builder.append_value(b);
}
let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
bitmap_index_file.write_record_batch(record_batch).await?;
cur_keys.clear();
cur_bitmaps.clear();
cur_bytes = 0;
}
cur_keys.push(key);
cur_bitmaps.push(bytes);
cur_bytes += bitmap_size;
}
if !cur_keys.is_empty() {
let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
let mut binary_builder = BinaryBuilder::new();
for b in &cur_bitmaps {
binary_builder.append_value(b);
}
let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
bitmap_index_file.write_record_batch(record_batch).await?;
}
let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps })
.map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
bitmap_index_file.finish_with_metadata(metadata).await?;
Ok(())
}
pub(crate) async fn build_bitmap_index_state(
mut data_source: SendableRecordBatchStream,
mut state: HashMap<ScalarValue, RowAddrTreeMap>,
) -> Result<(HashMap<ScalarValue, RowAddrTreeMap>, DataType)> {
let value_type = data_source.schema().field(0).data_type().clone();
while let Some(batch) = data_source.try_next().await? {
let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..values.len() {
let row_id = row_id_column.value(i);
let key = ScalarValue::try_from_array(values.as_ref(), i)?;
state.entry(key.clone()).or_default().insert(row_id);
}
}
Ok((state, value_type))
}
async fn do_train_bitmap_index(
data_source: SendableRecordBatchStream,
state: HashMap<ScalarValue, RowAddrTreeMap>,
index_store: &dyn IndexStore,
) -> Result<()> {
let (state, value_type) = Self::build_bitmap_index_state(data_source, state).await?;
Self::write_bitmap_index(state, index_store, &value_type).await
}
pub async fn train_bitmap_index(
data: SendableRecordBatchStream,
index_store: &dyn IndexStore,
) -> Result<()> {
let dictionary: HashMap<ScalarValue, RowAddrTreeMap> = HashMap::new();
Self::do_train_bitmap_index(data, dictionary, index_store).await
}
pub(crate) fn remap_bitmap_state(
state: HashMap<ScalarValue, RowAddrTreeMap>,
mapping: &HashMap<u64, Option<u64>>,
) -> HashMap<ScalarValue, RowAddrTreeMap> {
state
.into_iter()
.map(|(key, bitmap)| {
let remapped_bitmap =
RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
let addr_as_u64 = u64::from(addr);
mapping
.get(&addr_as_u64)
.copied()
.unwrap_or(Some(addr_as_u64))
}));
(key, remapped_bitmap)
})
.collect()
}
}
#[async_trait]
impl ScalarIndexPlugin for BitmapIndexPlugin {
fn name(&self) -> &str {
"Bitmap"
}
fn new_training_request(
&self,
_params: &str,
field: &Field,
) -> Result<Box<dyn TrainingRequest>> {
if field.data_type().is_nested() {
return Err(Error::invalid_input_source(
"A bitmap index can only be created on a non-nested field.".into(),
));
}
Ok(Box::new(DefaultTrainingRequest::new(
TrainingCriteria::new(TrainingOrdering::None).with_row_id(),
)))
}
fn provides_exact_answer(&self) -> bool {
true
}
fn version(&self) -> u32 {
BITMAP_INDEX_VERSION
}
fn new_query_parser(
&self,
index_name: String,
_index_details: &prost_types::Any,
) -> Option<Box<dyn ScalarQueryParser>> {
Some(Box::new(SargableQueryParser::new(index_name, false)))
}
async fn train_index(
&self,
data: SendableRecordBatchStream,
index_store: &dyn IndexStore,
_request: Box<dyn TrainingRequest>,
fragment_ids: Option<Vec<u32>>,
_progress: Arc<dyn crate::progress::IndexBuildProgress>,
) -> Result<CreatedIndex> {
if fragment_ids.is_some() {
return Err(Error::invalid_input_source(
"Bitmap index does not support fragment training".into(),
));
}
Self::train_bitmap_index(data, index_store).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
.unwrap(),
index_version: BITMAP_INDEX_VERSION,
files: Some(index_store.list_files_with_sizes().await?),
})
}
async fn load_index(
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
}
async fn load_statistics(
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
let stats = serde_json::from_str(value).map_err(|e| {
Error::internal(format!("failed to parse bitmap statistics metadata: {e}"))
})?;
Ok(Some(stats))
} else {
Ok(None)
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::metrics::NoOpMetricsCollector;
use crate::scalar::lance_format::LanceIndexStore;
use arrow_array::{RecordBatch, StringArray, UInt64Array, record_batch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance_core::utils::mask::RowSetOps;
use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
use lance_io::object_store::ObjectStore;
use std::collections::HashMap;
#[tokio::test]
async fn test_bitmap_lazy_loading_and_cache() {
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let colors = vec![
"red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
"red", "red", "blue", "green", "yellow",
];
let row_ids = (0u64..15u64).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Utf8, false),
Field::new("_rowid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(colors.clone())),
Arc::new(UInt64Array::from(row_ids.clone())),
],
)
.unwrap();
let stream = stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
.await
.unwrap();
let cache = LanceCache::with_capacity(1024 * 1024);
let index = BitmapIndex::load(store.clone(), None, &cache)
.await
.unwrap();
assert_eq!(index.index_map.len(), 4); assert!(index.null_map.is_empty());
let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let expected_red_rows = vec![0u64, 3, 6, 10, 11];
if let SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(|id| id.into())
.collect();
actual.sort();
assert_eq!(actual, expected_red_rows);
} else {
panic!("Expected exact search result");
}
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
if let SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(|id| id.into())
.collect();
actual.sort();
assert_eq!(actual, expected_red_rows);
}
let query = SargableQuery::Range(
std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
if let SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(|id| id.into())
.collect();
actual.sort();
assert_eq!(actual, expected_range_rows);
}
let query = SargableQuery::Range(
std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
if let SearchResult::Exact(row_ids) = result {
assert!(row_ids.true_rows().is_empty());
} else {
panic!("Expected exact search result");
}
let query = SargableQuery::IsIn(vec![
ScalarValue::Utf8(Some("red".to_string())),
ScalarValue::Utf8(Some("yellow".to_string())),
]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
if let SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(|id| id.into())
.collect();
actual.sort();
assert_eq!(actual, expected_in_rows);
}
}
#[tokio::test]
#[ignore]
async fn test_big_bitmap_index() {
use super::{BITMAP_LOOKUP_NAME, BitmapIndex};
use crate::scalar::IndexStore;
use crate::scalar::lance_format::LanceIndexStore;
use arrow_schema::DataType;
use datafusion_common::ScalarValue;
use lance_core::cache::LanceCache;
use lance_core::utils::mask::RowAddrTreeMap;
use lance_io::object_store::ObjectStore;
use std::collections::HashMap;
use std::sync::Arc;
let m: u32 = 2_500_000;
let per_bitmap_size = 1000;
let mut state = HashMap::new();
for i in 0..m {
let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
let key = ScalarValue::UInt32(Some(i));
state.insert(key, bitmap);
}
let tmpdir = TempObjDir::default();
let test_store = LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
);
let result =
BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
assert!(
result.is_ok(),
"Failed to write bitmap index: {:?}",
result.err()
);
let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
assert!(
index_file.is_ok(),
"Failed to open index file: {:?}",
index_file.err()
);
let index_file = index_file.unwrap();
tracing::info!(
"Index file contains {} rows in total",
index_file.num_rows()
);
tracing::info!("Loading index from disk...");
let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
.await
.expect("Failed to load bitmap index");
assert_eq!(
loaded_index.index_map.len(),
m as usize,
"Loaded index has incorrect number of keys (expected {}, got {})",
m,
loaded_index.index_map.len()
);
let test_keys = [0, m / 2, m - 1]; for &key_val in &test_keys {
let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
let bitmap = loaded_index
.load_bitmap(&key, None)
.await
.unwrap_or_else(|_| panic!("Key {} should exist", key_val));
let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
assert_eq!(
row_addrs.len(),
per_bitmap_size as usize,
"Bitmap for key {} has wrong size",
key_val
);
for i in 0..5.min(per_bitmap_size) {
assert!(
row_addrs.contains(&i),
"Bitmap for key {} should contain row_id {}",
key_val,
i
);
}
for i in (per_bitmap_size - 5)..per_bitmap_size {
assert!(
row_addrs.contains(&i),
"Bitmap for key {} should contain row_id {}",
key_val,
i
);
}
let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
assert_eq!(
row_addrs, expected_range,
"Bitmap for key {} doesn't contain expected values",
key_val
);
tracing::info!(
"✓ Verified bitmap for key {}: {} rows as expected",
key_val,
row_addrs.len()
);
}
tracing::info!("Test successful! Index properly contains {} keys", m);
}
#[tokio::test]
async fn test_bitmap_prewarm() {
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let colors = vec![
"red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
"red", "red", "blue", "green", "yellow",
];
let row_ids = (0u64..15u64).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Utf8, false),
Field::new("_rowid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(colors.clone())),
Arc::new(UInt64Array::from(row_ids.clone())),
],
)
.unwrap();
let stream = stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
.await
.unwrap();
let cache = LanceCache::with_capacity(1024 * 1024);
let index = BitmapIndex::load(store.clone(), None, &cache)
.await
.unwrap();
let cache_key_red = BitmapKey {
value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
};
let cache_key_blue = BitmapKey {
value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
};
assert!(
cache
.get_with_key::<BitmapKey>(&cache_key_red)
.await
.is_none()
);
assert!(
cache
.get_with_key::<BitmapKey>(&cache_key_blue)
.await
.is_none()
);
index.prewarm().await.unwrap();
assert!(
cache
.get_with_key::<BitmapKey>(&cache_key_red)
.await
.is_some()
);
assert!(
cache
.get_with_key::<BitmapKey>(&cache_key_blue)
.await
.is_some()
);
let cached_red = cache
.get_with_key::<BitmapKey>(&cache_key_red)
.await
.unwrap();
let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
index.prewarm().await.unwrap();
let cached_red_2 = cache
.get_with_key::<BitmapKey>(&cache_key_red)
.await
.unwrap();
let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
}
#[tokio::test]
async fn test_remap_bitmap_with_null() {
use arrow_array::UInt32Array;
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let values = vec![
None, None, Some(1u32), Some(1u32), Some(2u32), Some(2u32), ];
let row_ids: Vec<u64> = vec![
RowAddress::new_from_parts(1, 0).into(),
RowAddress::new_from_parts(1, 1).into(),
RowAddress::new_from_parts(1, 2).into(),
RowAddress::new_from_parts(2, 0).into(),
RowAddress::new_from_parts(2, 1).into(),
RowAddress::new_from_parts(2, 2).into(),
];
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::UInt32, true),
Field::new("_rowid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(values)),
Arc::new(UInt64Array::from(row_ids)),
],
)
.unwrap();
let stream = stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
.await
.unwrap();
let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load bitmap index");
assert_eq!(index.index_map.len(), 2); assert!(!index.null_map.is_empty());
let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
row_addr_map.insert(
RowAddress::new_from_parts(1, 0).into(),
Some(RowAddress::new_from_parts(3, 0).into()),
);
row_addr_map.insert(
RowAddress::new_from_parts(1, 1).into(),
Some(RowAddress::new_from_parts(3, 1).into()),
);
row_addr_map.insert(
RowAddress::new_from_parts(1, 2).into(),
Some(RowAddress::new_from_parts(3, 2).into()),
);
row_addr_map.insert(
RowAddress::new_from_parts(2, 0).into(),
Some(RowAddress::new_from_parts(3, 3).into()),
);
row_addr_map.insert(
RowAddress::new_from_parts(2, 1).into(),
Some(RowAddress::new_from_parts(3, 4).into()),
);
row_addr_map.insert(
RowAddress::new_from_parts(2, 2).into(),
Some(RowAddress::new_from_parts(3, 5).into()),
);
index
.remap(&row_addr_map, test_store.as_ref())
.await
.unwrap();
let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
.await
.expect("Failed to load remapped bitmap index");
let expected_null_addrs: Vec<u64> = vec![
RowAddress::new_from_parts(3, 0).into(),
RowAddress::new_from_parts(3, 1).into(),
];
let actual_null_addrs: Vec<u64> = reloaded_idx
.null_map
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(
actual_null_addrs, expected_null_addrs,
"Null bitmap not remapped correctly"
);
let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
let result = reloaded_idx
.search(&query, &NoOpMetricsCollector)
.await
.unwrap();
if let crate::scalar::SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
actual.sort();
let expected: Vec<u64> = vec![
RowAddress::new_from_parts(3, 2).into(),
RowAddress::new_from_parts(3, 3).into(),
];
assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
}
let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
let result = reloaded_idx
.search(&query, &NoOpMetricsCollector)
.await
.unwrap();
if let crate::scalar::SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
actual.sort();
let expected: Vec<u64> = vec![
RowAddress::new_from_parts(3, 4).into(),
RowAddress::new_from_parts(3, 5).into(),
];
assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
}
let query = SargableQuery::IsNull();
let result = reloaded_idx
.search(&query, &NoOpMetricsCollector)
.await
.unwrap();
if let crate::scalar::SearchResult::Exact(row_ids) = result {
let mut actual: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
actual.sort();
assert_eq!(
actual, expected_null_addrs,
"Null search results not correct"
);
}
}
#[tokio::test]
async fn test_bitmap_null_handling_in_queries() {
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let batch = record_batch!(
("value", Int64, [Some(0), Some(5), None]),
("_rowid", UInt64, [0, 1, 2])
)
.unwrap();
let schema = batch.schema();
let stream = stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
.await
.unwrap();
let cache = LanceCache::with_capacity(1024 * 1024);
let index = BitmapIndex::load(store.clone(), None, &cache)
.await
.unwrap();
let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
match result {
SearchResult::Exact(row_ids) => {
let actual_rows: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
let null_row_ids = row_ids.null_rows();
assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
let null_rows: Vec<u64> =
null_row_ids.row_addrs().unwrap().map(u64::from).collect();
assert_eq!(null_rows, vec![2], "Should report row 2 as null");
}
_ => panic!("Expected Exact search result"),
}
let query = SargableQuery::IsNull();
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
match result {
SearchResult::Exact(row_addrs) => {
let actual_rows: Vec<u64> = row_addrs
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(
actual_rows,
vec![2],
"IsNull should find row 2 where value is null"
);
let null_row_ids = row_addrs.null_rows();
assert!(
null_row_ids.is_empty(),
"null_row_ids should be None for IsNull query"
);
}
_ => panic!("Expected Exact search result"),
}
let query = SargableQuery::Range(
std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
match result {
SearchResult::Exact(row_addrs) => {
let actual_rows: Vec<u64> = row_addrs
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
let null_row_ids = row_addrs.null_rows();
assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
let null_rows: Vec<u64> =
null_row_ids.row_addrs().unwrap().map(u64::from).collect();
assert_eq!(null_rows, vec![2], "Should report row 2 as null");
}
_ => panic!("Expected Exact search result"),
}
}
}