use crate::scalar::bloomfilter::sbbf::{Sbbf, SbbfBuilder};
use crate::scalar::expression::{BloomFilterQueryParser, ScalarQueryParser};
use crate::scalar::registry::{
ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
};
use crate::scalar::{
BloomFilterQuery, BuiltinIndexType, CreatedIndex, ScalarIndexParams, UpdateCriteria,
};
use crate::{Any, pb};
use arrow_array::{Array, UInt64Array};
mod as_bytes;
pub mod sbbf;
use arrow_schema::{DataType, Field};
use serde::{Deserialize, Serialize};
use std::sync::LazyLock;
use datafusion::execution::SendableRecordBatchStream;
use std::{collections::HashMap, sync::Arc};
use crate::scalar::FragReuseIndex;
use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult};
use crate::vector::VectorIndex;
use crate::{Index, IndexType};
use arrow_array::{ArrayRef, RecordBatch};
use async_trait::async_trait;
use deepsize::DeepSizeOf;
use lance_core::Error;
use lance_core::Result;
use lance_core::cache::LanceCache;
use roaring::RoaringBitmap;
use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_zones};
const BLOOMFILTER_FILENAME: &str = "bloomfilter.lance";
const BLOOMFILTER_ITEM_META_KEY: &str = "bloomfilter_item";
const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability";
const BLOOMFILTER_INDEX_VERSION: u32 = 0;
#[derive(Debug, Clone)]
struct BloomFilterStatistics {
bound: ZoneBound,
has_null: bool,
bloom_filter: Sbbf,
}
impl DeepSizeOf for BloomFilterStatistics {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
self.bloom_filter.to_bytes().len()
}
}
impl AsRef<ZoneBound> for BloomFilterStatistics {
fn as_ref(&self) -> &ZoneBound {
&self.bound
}
}
#[derive(Debug, Clone)]
pub struct BloomFilterIndex {
zones: Vec<BloomFilterStatistics>,
number_of_items: u64,
probability: f64,
}
impl DeepSizeOf for BloomFilterIndex {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.zones.deep_size_of_children(context)
}
}
impl BloomFilterIndex {
async fn load(
store: Arc<dyn IndexStore>,
_fri: Option<Arc<FragReuseIndex>>,
_index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let index_file = store.open_index_file(BLOOMFILTER_FILENAME).await?;
let bloom_data = index_file
.read_range(0..index_file.num_rows(), None)
.await?;
let file_schema = index_file.schema();
let number_of_items: u64 = file_schema
.metadata
.get(BLOOMFILTER_ITEM_META_KEY)
.and_then(|bs| bs.parse().ok())
.unwrap_or(*DEFAULT_NUMBER_OF_ITEMS);
let probability: f64 = file_schema
.metadata
.get(BLOOMFILTER_PROBABILITY_META_KEY)
.and_then(|bs| bs.parse().ok())
.unwrap_or(*DEFAULT_PROBABILITY);
Ok(Arc::new(Self::try_from_serialized(
bloom_data,
number_of_items,
probability,
)?))
}
fn try_from_serialized(
data: RecordBatch,
number_of_items: u64,
probability: f64,
) -> Result<Self> {
if data.num_rows() == 0 {
return Ok(Self {
zones: Vec::new(),
number_of_items,
probability,
});
}
let fragment_id_col = data
.column_by_name("fragment_id")
.ok_or_else(|| Error::invalid_input("BloomFilterIndex: missing 'fragment_id' column"))?
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.ok_or_else(|| {
Error::invalid_input("BloomFilterIndex: 'fragment_id' column is not UInt64")
})?;
let zone_start_col = data
.column_by_name("zone_start")
.ok_or_else(|| Error::invalid_input("BloomFilterIndex: missing 'zone_start' column"))?
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.ok_or_else(|| {
Error::invalid_input("BloomFilterIndex: 'zone_start' column is not UInt64")
})?;
let zone_length_col = data
.column_by_name("zone_length")
.ok_or_else(|| Error::invalid_input("BloomFilterIndex: missing 'zone_length' column"))?
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.ok_or_else(|| {
Error::invalid_input("BloomFilterIndex: 'zone_length' column is not UInt64")
})?;
let bloom_filter_data_col = data
.column_by_name("bloom_filter_data")
.ok_or_else(|| {
Error::invalid_input("BloomFilterIndex: missing 'bloom_filter_data' column")
})?
.as_any()
.downcast_ref::<arrow_array::BinaryArray>()
.ok_or_else(|| {
Error::invalid_input("BloomFilterIndex: 'bloom_filter_data' column is not Binary")
})?;
let has_null_col = data
.column_by_name("has_null")
.ok_or_else(|| Error::invalid_input("BloomFilterIndex: missing 'has_null' column"))?
.as_any()
.downcast_ref::<arrow_array::BooleanArray>()
.ok_or_else(|| {
Error::invalid_input("BloomFilterIndex: 'has_null' column is not Boolean")
})?;
let num_blocks = data.num_rows();
let mut blocks = Vec::with_capacity(num_blocks);
for i in 0..num_blocks {
let bloom_filter_bytes = if bloom_filter_data_col.is_valid(i) {
bloom_filter_data_col.value(i).to_vec()
} else {
Vec::new()
};
let bloom_filter = Sbbf::new(&bloom_filter_bytes).map_err(|e| {
Error::invalid_input(format!("Failed to deserialize bloom filter: {:?}", e))
})?;
blocks.push(BloomFilterStatistics {
bound: ZoneBound {
fragment_id: fragment_id_col.value(i),
start: zone_start_col.value(i),
length: zone_length_col.value(i) as usize,
},
has_null: has_null_col.value(i),
bloom_filter,
});
}
Ok(Self {
zones: blocks,
number_of_items,
probability,
})
}
fn evaluate_block_against_query(
&self,
block: &BloomFilterStatistics,
query: &BloomFilterQuery,
) -> Result<bool> {
let sbbf = &block.bloom_filter;
match query {
BloomFilterQuery::IsNull() => {
Ok(block.has_null)
}
BloomFilterQuery::Equals(target) => {
if target.is_null() {
return Ok(block.has_null);
}
match target {
datafusion_common::ScalarValue::Int8(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Int16(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Int32(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Int64(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::UInt8(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::UInt16(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::UInt32(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::UInt64(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Float32(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Float64(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Utf8(Some(val)) => Ok(sbbf.check(val.as_str())),
datafusion_common::ScalarValue::LargeUtf8(Some(val)) => {
Ok(sbbf.check(val.as_str()))
}
datafusion_common::ScalarValue::Binary(Some(val)) => {
Ok(sbbf.check(val.as_slice()))
}
datafusion_common::ScalarValue::LargeBinary(Some(val)) => {
Ok(sbbf.check(val.as_slice()))
}
datafusion_common::ScalarValue::Date32(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Date64(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Time32Second(Some(val)) => Ok(sbbf.check(val)),
datafusion_common::ScalarValue::Time32Millisecond(Some(val)) => {
Ok(sbbf.check(val))
}
datafusion_common::ScalarValue::Time64Microsecond(Some(val)) => {
Ok(sbbf.check(val))
}
datafusion_common::ScalarValue::Time64Nanosecond(Some(val)) => {
Ok(sbbf.check(val))
}
datafusion_common::ScalarValue::TimestampSecond(Some(val), _) => {
Ok(sbbf.check(val))
}
datafusion_common::ScalarValue::TimestampMillisecond(Some(val), _) => {
Ok(sbbf.check(val))
}
datafusion_common::ScalarValue::TimestampMicrosecond(Some(val), _) => {
Ok(sbbf.check(val))
}
datafusion_common::ScalarValue::TimestampNanosecond(Some(val), _) => {
Ok(sbbf.check(val))
}
_ => Err(Error::invalid_input_source(
format!("Unsupported data type in bloom filter query: {:?}", target).into(),
)),
}
}
BloomFilterQuery::IsIn(values) => {
for value in values {
if value.is_null() {
if block.has_null {
return Ok(true);
}
continue;
}
let found = match value {
datafusion_common::ScalarValue::Int8(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Int16(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Int32(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Int64(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::UInt8(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::UInt16(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::UInt32(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::UInt64(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Float32(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Float64(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Utf8(Some(val)) => sbbf.check(val.as_str()),
datafusion_common::ScalarValue::LargeUtf8(Some(val)) => {
sbbf.check(val.as_str())
}
datafusion_common::ScalarValue::Binary(Some(val)) => {
sbbf.check(val.as_slice())
}
datafusion_common::ScalarValue::LargeBinary(Some(val)) => {
sbbf.check(val.as_slice())
}
datafusion_common::ScalarValue::Date32(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Date64(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Time32Second(Some(val)) => sbbf.check(val),
datafusion_common::ScalarValue::Time32Millisecond(Some(val)) => {
sbbf.check(val)
}
datafusion_common::ScalarValue::Time64Microsecond(Some(val)) => {
sbbf.check(val)
}
datafusion_common::ScalarValue::Time64Nanosecond(Some(val)) => {
sbbf.check(val)
}
datafusion_common::ScalarValue::TimestampSecond(Some(val), _) => {
sbbf.check(val)
}
datafusion_common::ScalarValue::TimestampMillisecond(Some(val), _) => {
sbbf.check(val)
}
datafusion_common::ScalarValue::TimestampMicrosecond(Some(val), _) => {
sbbf.check(val)
}
datafusion_common::ScalarValue::TimestampNanosecond(Some(val), _) => {
sbbf.check(val)
}
_ => {
return Err(Error::invalid_input_source(
format!("Unsupported data type in bloom filter query: {:?}", value)
.into(),
));
}
};
if found {
return Ok(true);
}
}
Ok(false) }
}
}
}
#[async_trait]
impl Index for BloomFilterIndex {
fn as_any(&self) -> &dyn Any {
self
}
fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
self
}
fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
Err(Error::invalid_input_source(
"BloomFilter is not a vector index".into(),
))
}
async fn prewarm(&self) -> Result<()> {
Ok(())
}
fn statistics(&self) -> Result<serde_json::Value> {
Ok(serde_json::json!({
"type": "BloomFilter",
"num_blocks": self.zones.len(),
"number_of_items": self.number_of_items,
"probability": self.probability,
}))
}
fn index_type(&self) -> IndexType {
IndexType::BloomFilter
}
async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
let mut frag_ids = RoaringBitmap::new();
for block in &self.zones {
frag_ids.insert(block.bound.fragment_id as u32);
}
Ok(frag_ids)
}
}
#[async_trait]
impl ScalarIndex for BloomFilterIndex {
async fn search(
&self,
query: &dyn AnyQuery,
metrics: &dyn MetricsCollector,
) -> Result<SearchResult> {
let query = query.as_any().downcast_ref::<BloomFilterQuery>().unwrap();
search_zones(&self.zones, metrics, |block| {
self.evaluate_block_against_query(block, query)
})
}
fn can_remap(&self) -> bool {
false
}
async fn remap(
&self,
_mapping: &HashMap<u64, Option<u64>>,
_dest_store: &dyn IndexStore,
) -> Result<CreatedIndex> {
Err(Error::invalid_input_source(
"BloomFilter does not support remap".into(),
))
}
async fn update(
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_old_data_filter: Option<super::OldIndexDataFilter>,
) -> Result<CreatedIndex> {
let params = BloomFilterIndexBuilderParams {
number_of_items: self.number_of_items,
probability: self.probability,
};
let processor = BloomFilterProcessor::new(params.clone())?;
let trainer = ZoneTrainer::new(processor, params.number_of_items)?;
let updated_blocks = rebuild_zones(&self.zones, trainer, new_data).await?;
let mut builder = BloomFilterIndexBuilder::try_new(params)?;
builder.blocks = updated_blocks;
builder.write_index(dest_store).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pb::BloomFilterIndexDetails::default())
.unwrap(),
index_version: BLOOMFILTER_INDEX_VERSION,
files: Some(dest_store.list_files_with_sizes().await?),
})
}
fn update_criteria(&self) -> UpdateCriteria {
UpdateCriteria::only_new_data(
TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(),
)
}
fn derive_index_params(&self) -> Result<ScalarIndexParams> {
let params = serde_json::to_value(BloomFilterIndexBuilderParams {
number_of_items: self.number_of_items,
probability: self.probability,
})?;
Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter).with_params(¶ms))
}
}
fn default_number_of_items() -> u64 {
*DEFAULT_NUMBER_OF_ITEMS
}
fn default_probability() -> f64 {
*DEFAULT_PROBABILITY
}
static DEFAULT_NUMBER_OF_ITEMS: LazyLock<u64> = LazyLock::new(|| {
std::env::var("LANCE_BLOOMFILTER_DEFAULT_NUMBER_OF_ITEMS")
.unwrap_or_else(|_| "8192".to_string())
.parse()
.expect("failed to parse Lance_BLOOMFILTER_DEFAULT_NUMBER_OF_ITEMS")
});
#[allow(clippy::manual_inspect)]
static DEFAULT_PROBABILITY: LazyLock<f64> = LazyLock::new(|| {
std::env::var("LANCE_BLOOMFILTER_DEFAULT_PROBABILITY")
.unwrap_or_else(|_| "0.00057".to_string())
.parse()
.map(|prob: f64| {
assert!(
(0.0..=1.0).contains(&prob),
"LANCE_BLOOMFILTER_DEFAULT_PROBABILITY must be between 0 and 1, got {}",
prob
);
prob
})
.expect("failed to parse LANCE_BLOOMFILTER_DEFAULT_PROBABILITY")
});
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BloomFilterIndexBuilderParams {
#[serde(default = "default_number_of_items")]
number_of_items: u64,
#[serde(default = "default_probability")]
probability: f64,
}
impl Default for BloomFilterIndexBuilderParams {
fn default() -> Self {
Self {
number_of_items: *DEFAULT_NUMBER_OF_ITEMS,
probability: *DEFAULT_PROBABILITY,
}
}
}
impl BloomFilterIndexBuilderParams {
#[cfg(test)]
fn new(number_of_items: u64, probability: f64) -> Self {
Self {
number_of_items,
probability,
}
}
}
pub struct BloomFilterIndexBuilder {
params: BloomFilterIndexBuilderParams,
blocks: Vec<BloomFilterStatistics>,
}
impl BloomFilterIndexBuilder {
pub fn try_new(params: BloomFilterIndexBuilderParams) -> Result<Self> {
Ok(Self {
params,
blocks: Vec::new(),
})
}
pub async fn train(&mut self, batches_source: SendableRecordBatchStream) -> Result<()> {
let processor = BloomFilterProcessor::new(self.params.clone())?;
let trainer = ZoneTrainer::new(processor, self.params.number_of_items)?;
self.blocks = trainer.train(batches_source).await?;
Ok(())
}
fn bloomfilter_stats_as_batch(&self) -> Result<RecordBatch> {
let fragment_ids =
UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.bound.fragment_id));
let zone_starts =
UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.bound.start));
let zone_lengths = UInt64Array::from_iter_values(
self.blocks.iter().map(|block| block.bound.length as u64),
);
let has_nulls = arrow_array::BooleanArray::from(
self.blocks
.iter()
.map(|block| block.has_null)
.collect::<Vec<bool>>(),
);
let bloom_filter_data = if self.blocks.is_empty() {
Arc::new(arrow_array::BinaryArray::new_null(0)) as ArrayRef
} else {
let binary_data: Vec<Vec<u8>> = self
.blocks
.iter()
.map(|block| block.bloom_filter.to_bytes())
.collect();
let binary_refs: Vec<Option<&[u8]>> = binary_data
.iter()
.map(|bytes| Some(bytes.as_slice()))
.collect();
Arc::new(arrow_array::BinaryArray::from_opt_vec(binary_refs)) as ArrayRef
};
let schema = Arc::new(arrow_schema::Schema::new(vec![
Field::new("fragment_id", DataType::UInt64, false),
Field::new("zone_start", DataType::UInt64, false),
Field::new("zone_length", DataType::UInt64, false),
Field::new("has_null", DataType::Boolean, false),
Field::new("bloom_filter_data", DataType::Binary, false),
]));
let columns: Vec<ArrayRef> = vec![
Arc::new(fragment_ids) as ArrayRef,
Arc::new(zone_starts) as ArrayRef,
Arc::new(zone_lengths) as ArrayRef,
Arc::new(has_nulls) as ArrayRef,
bloom_filter_data,
];
Ok(RecordBatch::try_new(schema, columns)?)
}
pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<()> {
let record_batch = self.bloomfilter_stats_as_batch()?;
let mut file_schema = record_batch.schema().as_ref().clone();
file_schema.metadata.insert(
BLOOMFILTER_ITEM_META_KEY.to_string(),
self.params.number_of_items.to_string(),
);
file_schema.metadata.insert(
BLOOMFILTER_PROBABILITY_META_KEY.to_string(),
self.params.probability.to_string(),
);
let mut index_file = index_store
.new_index_file(BLOOMFILTER_FILENAME, Arc::new(file_schema))
.await?;
index_file.write_record_batch(record_batch).await?;
index_file.finish().await?;
Ok(())
}
}
struct BloomFilterProcessor {
params: BloomFilterIndexBuilderParams,
sbbf: Option<Sbbf>,
cur_zone_has_null: bool,
}
impl BloomFilterProcessor {
fn new(params: BloomFilterIndexBuilderParams) -> Result<Self> {
let mut processor = Self {
params,
sbbf: None,
cur_zone_has_null: false,
};
processor.reset()?;
Ok(processor)
}
fn build_filter(params: &BloomFilterIndexBuilderParams) -> Result<Sbbf> {
SbbfBuilder::new()
.expected_items(params.number_of_items)
.false_positive_probability(params.probability)
.build()
.map_err(|e| {
Error::invalid_input_source(format!("Failed to build SBBF: {:?}", e).into())
})
}
fn process_primitive_array<T>(sbbf: &mut Sbbf, array: &arrow_array::PrimitiveArray<T>) -> bool
where
T: arrow_array::ArrowPrimitiveType,
T::Native: as_bytes::AsBytes,
{
let mut has_null = false;
for i in 0..array.len() {
if array.is_valid(i) {
sbbf.insert(&array.value(i));
} else {
has_null = true;
}
}
has_null
}
fn process_string_array(sbbf: &mut Sbbf, array: &arrow_array::StringArray) -> bool {
let mut has_null = false;
for i in 0..array.len() {
if array.is_valid(i) {
sbbf.insert(array.value(i));
} else {
has_null = true;
}
}
has_null
}
fn process_large_string_array(sbbf: &mut Sbbf, array: &arrow_array::LargeStringArray) -> bool {
let mut has_null = false;
for i in 0..array.len() {
if array.is_valid(i) {
sbbf.insert(array.value(i));
} else {
has_null = true;
}
}
has_null
}
fn process_binary_array(sbbf: &mut Sbbf, array: &arrow_array::BinaryArray) -> bool {
let mut has_null = false;
for i in 0..array.len() {
if array.is_valid(i) {
sbbf.insert(array.value(i));
} else {
has_null = true;
}
}
has_null
}
fn process_large_binary_array(sbbf: &mut Sbbf, array: &arrow_array::LargeBinaryArray) -> bool {
let mut has_null = false;
for i in 0..array.len() {
if array.is_valid(i) {
sbbf.insert(array.value(i));
} else {
has_null = true;
}
}
has_null
}
}
impl ZoneProcessor for BloomFilterProcessor {
type ZoneStatistics = BloomFilterStatistics;
fn process_chunk(&mut self, array: &ArrayRef) -> Result<()> {
let sbbf = self.sbbf.as_mut().ok_or_else(|| {
Error::invalid_input("BloomFilterProcessor did not initialize bloom filter")
})?;
let has_null = match array.data_type() {
DataType::Int8 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Int8Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Int16 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Int16Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Int32 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Int64 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::UInt8 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::UInt8Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::UInt16 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::UInt16Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::UInt32 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::UInt32Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::UInt64 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Float32 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Float32Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Float64 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Float64Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Date32 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Date32Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Time32(time_unit) => match time_unit {
arrow_schema::TimeUnit::Second => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Time32SecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
arrow_schema::TimeUnit::Millisecond => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Time32MillisecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
_ => {
return Err(Error::invalid_input_source(
format!("Unsupported Time32 unit: {:?}", time_unit).into(),
));
}
},
DataType::Date64 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Date64Array>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
DataType::Time64(time_unit) => match time_unit {
arrow_schema::TimeUnit::Microsecond => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Time64MicrosecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
arrow_schema::TimeUnit::Nanosecond => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::Time64NanosecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
_ => {
return Err(Error::invalid_input_source(
format!("Unsupported Time64 unit: {:?}", time_unit).into(),
));
}
},
DataType::Timestamp(time_unit, _) => match time_unit {
arrow_schema::TimeUnit::Second => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::TimestampSecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
arrow_schema::TimeUnit::Millisecond => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::TimestampMillisecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
arrow_schema::TimeUnit::Microsecond => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
arrow_schema::TimeUnit::Nanosecond => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::TimestampNanosecondArray>()
.unwrap();
Self::process_primitive_array(sbbf, typed_array)
}
},
DataType::Utf8 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
Self::process_string_array(sbbf, typed_array)
}
DataType::LargeUtf8 => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap();
Self::process_large_string_array(sbbf, typed_array)
}
DataType::Binary => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::BinaryArray>()
.unwrap();
Self::process_binary_array(sbbf, typed_array)
}
DataType::LargeBinary => {
let typed_array = array
.as_any()
.downcast_ref::<arrow_array::LargeBinaryArray>()
.unwrap();
Self::process_large_binary_array(sbbf, typed_array)
}
_ => {
return Err(Error::invalid_input_source(
format!(
"Bloom filter does not support data type: {:?}",
array.data_type()
)
.into(),
));
}
};
self.cur_zone_has_null = self.cur_zone_has_null || has_null;
Ok(())
}
fn finish_zone(&mut self, bound: ZoneBound) -> Result<Self::ZoneStatistics> {
let bloom_filter = self.sbbf.as_ref().ok_or_else(|| {
Error::invalid_input("BloomFilterProcessor did not initialize bloom filter")
})?;
Ok(BloomFilterStatistics {
bound,
has_null: self.cur_zone_has_null,
bloom_filter: bloom_filter.clone(),
})
}
fn reset(&mut self) -> Result<()> {
self.sbbf = Some(Self::build_filter(&self.params)?);
self.cur_zone_has_null = false;
Ok(())
}
}
#[derive(Debug, Default)]
pub struct BloomFilterIndexPlugin;
impl BloomFilterIndexPlugin {
async fn train_bloomfilter_index(
batches_source: SendableRecordBatchStream,
index_store: &dyn IndexStore,
options: Option<BloomFilterIndexBuilderParams>,
) -> Result<()> {
let mut builder = BloomFilterIndexBuilder::try_new(options.unwrap_or_default())?;
builder.train(batches_source).await?;
builder.write_index(index_store).await?;
Ok(())
}
}
#[async_trait]
impl ScalarIndexPlugin for BloomFilterIndexPlugin {
fn name(&self) -> &str {
"BloomFilter"
}
fn new_training_request(
&self,
params: &str,
field: &Field,
) -> Result<Box<dyn TrainingRequest>> {
if field.data_type().is_nested() {
return Err(Error::invalid_input_source(
"A bloom filter index can only be created on a non-nested field.".into(),
));
}
match field.data_type() {
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float32
| DataType::Float64
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Timestamp(_, _) => {
}
_ => {
return Err(Error::invalid_input_source(format!(
"Bloom filter index does not support data type: {:?}. Supported types: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Utf8, LargeUtf8, Binary, LargeBinary, Date32, Date64, Time32, Time64, Timestamp",
field.data_type()
).into()));
}
}
let params = serde_json::from_str::<BloomFilterIndexBuilderParams>(params)?;
Ok(Box::new(BloomFilterIndexTrainingRequest::new(params)))
}
async fn train_index(
&self,
data: SendableRecordBatchStream,
index_store: &dyn IndexStore,
request: Box<dyn TrainingRequest>,
fragment_ids: Option<Vec<u32>>,
_progress: Arc<dyn crate::progress::IndexBuildProgress>,
) -> Result<CreatedIndex> {
if fragment_ids.is_some() {
return Err(Error::invalid_input_source(
"BloomFilter index does not support fragment training".into(),
));
}
let request = (request as Box<dyn std::any::Any>)
.downcast::<BloomFilterIndexTrainingRequest>()
.map_err(|_| {
Error::invalid_input_source(
"must provide training request created by new_training_request".into(),
)
})?;
Self::train_bloomfilter_index(data, index_store, Some(request.params)).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pb::BloomFilterIndexDetails::default())
.unwrap(),
index_version: BLOOMFILTER_INDEX_VERSION,
files: Some(index_store.list_files_with_sizes().await?),
})
}
fn provides_exact_answer(&self) -> bool {
false
}
fn version(&self) -> u32 {
BLOOMFILTER_INDEX_VERSION
}
fn new_query_parser(
&self,
index_name: String,
_index_details: &prost_types::Any,
) -> Option<Box<dyn ScalarQueryParser>> {
Some(Box::new(BloomFilterQueryParser::new(index_name, true)))
}
async fn load_index(
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(
BloomFilterIndex::load(index_store, frag_reuse_index, cache).await?
as Arc<dyn ScalarIndex>,
)
}
async fn load_statistics(
&self,
_index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
Ok(None)
}
}
#[derive(Debug)]
pub struct BloomFilterIndexTrainingRequest {
pub params: BloomFilterIndexBuilderParams,
pub criteria: TrainingCriteria,
}
impl BloomFilterIndexTrainingRequest {
pub fn new(params: BloomFilterIndexBuilderParams) -> Self {
Self {
params,
criteria: TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(),
}
}
}
impl TrainingRequest for BloomFilterIndexTrainingRequest {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn criteria(&self) -> &TrainingCriteria {
&self.criteria
}
}
#[cfg(test)]
mod tests {
use crate::scalar::registry::VALUE_COLUMN_NAME;
use std::sync::Arc;
use crate::scalar::bloomfilter::BloomFilterIndexPlugin;
use arrow_array::{RecordBatch, UInt64Array, record_batch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_common::ScalarValue;
use futures::{StreamExt, stream};
use lance_core::{
ROW_ADDR,
cache::LanceCache,
utils::{mask::RowAddrTreeMap, tempfile::TempObjDir},
};
use lance_io::object_store::ObjectStore;
use crate::scalar::{
BloomFilterQuery, ScalarIndex, SearchResult,
bloomfilter::{BloomFilterIndex, BloomFilterIndexBuilderParams},
lance_format::LanceIndexStore,
};
use crate::Index; use crate::metrics::NoOpMetricsCollector;
use roaring::RoaringBitmap;
fn add_row_addr(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
let schema = stream.schema();
let schema_with_row_addr = Arc::new(Schema::new(vec![
schema.field(0).clone(),
Field::new(ROW_ADDR, DataType::UInt64, false),
]));
let schema = schema_with_row_addr.clone();
let stream = stream.enumerate().map(move |(frag_id, batch)| {
let batch = batch.unwrap();
let row_addr = Arc::new(UInt64Array::from_iter_values(
(0..batch.num_rows() as u64).map(|off| off + ((frag_id as u64) << 32)),
));
Ok(RecordBatch::try_new(
schema_with_row_addr.clone(),
vec![batch.column(0).clone(), row_addr],
)?)
});
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}
#[tokio::test]
async fn test_empty_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let data = arrow_array::Int32Array::from(Vec::<i32>::new());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Int32,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(data_stream, test_store.as_ref(), None)
.await
.unwrap();
log::debug!("Successfully wrote the index file");
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 0);
assert_eq!(index.number_of_items, 8192);
assert_eq!(index.probability, 0.00057);
let query = BloomFilterQuery::Equals(ScalarValue::Int32(None));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
}
#[tokio::test]
async fn test_basic_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let data = arrow_array::Int32Array::from_iter_values(0..100);
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Int32,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(100, 0.01)), )
.await
.unwrap();
log::debug!("Successfully wrote the index file");
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 1);
assert_eq!(index.number_of_items, 100);
assert_eq!(index.probability, 0.01);
assert_eq!(index.zones[0].bound.fragment_id, 0u64);
assert_eq!(index.zones[0].bound.start, 0u64);
assert_eq!(index.zones[0].bound.length, 100);
let query = BloomFilterQuery::Equals(ScalarValue::Int32(Some(50)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..100);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Int32(Some(500))); let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
assert_eq!(
index.calculate_included_frags().await.unwrap(),
RoaringBitmap::from_iter(0..1)
);
}
#[tokio::test]
async fn test_multiple_fragments_bloomfilter() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Int64,
false,
)]));
let fragment0_data = arrow_array::Int64Array::from_iter_values(0..100);
let fragment0_batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(fragment0_data)]).unwrap();
let fragment1_data = arrow_array::Int64Array::from_iter_values(100..200);
let fragment1_batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(fragment1_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
stream::iter(vec![
Ok(fragment0_batch.clone()),
Ok(fragment1_batch.clone()),
]),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(50, 0.05)), )
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 4);
assert_eq!(index.zones[0].bound.fragment_id, 0u64);
assert_eq!(index.zones[0].bound.start, 0u64);
assert_eq!(index.zones[0].bound.length, 50);
assert_eq!(index.zones[1].bound.fragment_id, 0u64);
assert_eq!(index.zones[1].bound.start, 50u64);
assert_eq!(index.zones[1].bound.length, 50);
assert_eq!(index.zones[2].bound.fragment_id, 1u64);
assert_eq!(index.zones[2].bound.start, 0u64);
assert_eq!(index.zones[2].bound.length, 50);
assert_eq!(index.zones[3].bound.fragment_id, 1u64);
assert_eq!(index.zones[3].bound.start, 50u64);
assert_eq!(index.zones[3].bound.length, 50);
let query = BloomFilterQuery::Equals(ScalarValue::Int64(Some(150)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range((1u64 << 32) + 50..((1u64 << 32) + 100)); assert_eq!(result, SearchResult::at_most(expected));
assert_eq!(
index.calculate_included_frags().await.unwrap(),
RoaringBitmap::from_iter(0..2)
);
}
#[tokio::test]
async fn test_nan_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let mut values = Vec::new();
for i in 0..500 {
if i % 5 == 2 {
values.push(f32::NAN);
} else {
values.push(i as f32);
}
}
let float_data = arrow_array::Float32Array::from(values);
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Float32,
true,
)]));
let data =
RecordBatch::try_new(schema.clone(), vec![Arc::new(float_data.clone())]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(100, 0.01)), )
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 5);
let query = BloomFilterQuery::Equals(ScalarValue::Float32(Some(f32::NAN)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..500); assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Float32(Some(5.0)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..100);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Float32(Some(250.0)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(200..300);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Float32(Some(10000.0)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
let query = BloomFilterQuery::IsIn(vec![
ScalarValue::Float32(Some(f32::NAN)),
ScalarValue::Float32(Some(5.0)),
ScalarValue::Float32(Some(150.0)), ]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..500);
assert_eq!(result, SearchResult::at_most(expected));
}
#[tokio::test]
async fn test_complex_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let data_size = 10000;
let data = arrow_array::Int64Array::from_iter_values(0..data_size as i64);
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Int64,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(1000, 0.001)), )
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 10);
assert_eq!(index.number_of_items, 1000);
assert_eq!(index.probability, 0.001);
for (i, block) in index.zones.iter().enumerate() {
assert_eq!(block.bound.fragment_id, 0u64);
assert_eq!(block.bound.start, (i * 1000) as u64);
assert_eq!(block.bound.length, 1000);
assert!(!block.bloom_filter.to_bytes().is_empty());
}
let query = BloomFilterQuery::Equals(ScalarValue::Int64(Some(2500))); let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(2000..3000);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Int64(Some(50000)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
let query = BloomFilterQuery::IsIn(vec![
ScalarValue::Int64(Some(500)), ScalarValue::Int64(Some(2500)), ScalarValue::Int64(Some(7500)), ScalarValue::Int64(Some(50000)), ]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..1000); expected.insert_range(2000..3000); expected.insert_range(7000..8000); assert_eq!(result, SearchResult::at_most(expected));
assert_eq!(
index.calculate_included_frags().await.unwrap(),
RoaringBitmap::from_iter(0..1)
);
}
#[tokio::test]
async fn test_string_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let string_values: Vec<String> = (0..200).map(|i| format!("value_{:03}", i)).collect();
let string_data = arrow_array::StringArray::from_iter_values(string_values.iter());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Utf8,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(string_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(100, 0.01)), )
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 2);
let query = BloomFilterQuery::Equals(ScalarValue::Utf8(Some("value_050".to_string())));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..100);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Utf8(Some("value_150".to_string())));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(100..200);
assert_eq!(result, SearchResult::at_most(expected));
let query =
BloomFilterQuery::Equals(ScalarValue::Utf8(Some("nonexistent_value".to_string())));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
let query = BloomFilterQuery::IsIn(vec![
ScalarValue::Utf8(Some("value_025".to_string())), ScalarValue::Utf8(Some("value_175".to_string())), ScalarValue::Utf8(Some("nonexistent".to_string())), ]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..200);
assert_eq!(result, SearchResult::at_most(expected));
}
#[tokio::test]
async fn test_binary_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let binary_values: Vec<Vec<u8>> = (0..100)
.map(|i| vec![i as u8, (i + 1) as u8, (i + 2) as u8])
.collect();
let binary_data = arrow_array::BinaryArray::from_iter_values(binary_values.iter());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Binary,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(binary_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(50, 0.05)),
)
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 2);
let query = BloomFilterQuery::Equals(ScalarValue::Binary(Some(vec![25, 26, 27])));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..50);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Binary(Some(vec![75, 76, 77])));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(50..100);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Binary(Some(vec![255, 254, 253])));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
}
#[tokio::test]
async fn test_large_data_types_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let large_string_values: Vec<String> =
(0..100).map(|i| format!("large_value_{:05}", i)).collect();
let large_string_data =
arrow_array::LargeStringArray::from_iter_values(large_string_values.iter());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::LargeUtf8,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(large_string_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(50, 0.05)),
)
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 2);
let query = BloomFilterQuery::Equals(ScalarValue::LargeUtf8(Some(
"large_value_00025".to_string(),
)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..50);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::LargeUtf8(Some(
"nonexistent_large_value".to_string(),
)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
}
#[tokio::test]
async fn test_timestamp_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let date32_values: Vec<i32> = (0..100).collect(); let date32_data = arrow_array::Date32Array::from(date32_values.clone());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Date32,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(date32_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(50, 0.01)),
)
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load Date32 BloomFilterIndex");
assert_eq!(index.zones.len(), 2);
let query = BloomFilterQuery::Equals(ScalarValue::Date32(Some(25)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..50);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Date32(Some(75)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(50..100);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Date32(Some(500)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
}
#[tokio::test]
async fn test_timestamp_types_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let timestamp_values: Vec<i64> = (0..100).map(|i| 1_000_000_000i64 + (i as i64)).collect();
let timestamp_data = arrow_array::TimestampNanosecondArray::from(timestamp_values.clone());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(timestamp_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(50, 0.01)),
)
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load Timestamp BloomFilterIndex");
assert_eq!(index.zones.len(), 2);
let first_timestamp = timestamp_values[25];
let query = BloomFilterQuery::Equals(ScalarValue::TimestampNanosecond(
Some(first_timestamp),
None,
));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..50);
assert_eq!(result, SearchResult::at_most(expected));
let second_timestamp = timestamp_values[75];
let query = BloomFilterQuery::Equals(ScalarValue::TimestampNanosecond(
Some(second_timestamp),
None,
));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(50..100);
assert_eq!(result, SearchResult::at_most(expected));
let query =
BloomFilterQuery::Equals(ScalarValue::TimestampNanosecond(Some(999_999_999i64), None));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
let query = BloomFilterQuery::IsIn(vec![
ScalarValue::TimestampNanosecond(Some(timestamp_values[10]), None), ScalarValue::TimestampNanosecond(Some(timestamp_values[85]), None), ScalarValue::TimestampNanosecond(Some(999_999_999i64), None), ]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..100); assert_eq!(result, SearchResult::at_most(expected));
}
#[tokio::test]
async fn test_time_types_bloomfilter_index() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let time_values: Vec<i64> = (0..100)
.map(|i| (i as i64) * 3_600_000_000) .collect();
let time_data = arrow_array::Time64MicrosecondArray::from(time_values.clone());
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Time64(arrow_schema::TimeUnit::Microsecond),
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(25, 0.05)),
)
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load Time64 BloomFilterIndex");
assert_eq!(index.zones.len(), 4);
let first_time = time_values[10];
let query = BloomFilterQuery::Equals(ScalarValue::Time64Microsecond(Some(first_time)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..25);
assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::Equals(ScalarValue::Time64Microsecond(Some(999_999_999i64)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
}
#[tokio::test]
async fn test_bloomfilter_supported_operations() {
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let data = arrow_array::Int32Array::from_iter_values(0..1000);
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
DataType::Int32,
false,
)]));
let data = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::once(std::future::ready(Ok(data))),
));
let data_stream = add_row_addr(data_stream);
BloomFilterIndexPlugin::train_bloomfilter_index(
data_stream,
test_store.as_ref(),
Some(BloomFilterIndexBuilderParams::new(250, 0.01)), )
.await
.unwrap();
let index = BloomFilterIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load BloomFilterIndex");
assert_eq!(index.zones.len(), 4);
let query = BloomFilterQuery::Equals(ScalarValue::Int32(Some(500)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(500..750); assert_eq!(result, SearchResult::at_most(expected));
let query = BloomFilterQuery::IsNull();
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
let query = BloomFilterQuery::IsIn(vec![
ScalarValue::Int32(Some(100)),
ScalarValue::Int32(Some(600)),
]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
let mut expected = RowAddrTreeMap::new();
expected.insert_range(0..250); expected.insert_range(500..750); assert_eq!(result, SearchResult::at_most(expected));
}
#[tokio::test]
async fn test_bloomfilter_null_handling_in_queries() {
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let batch = record_batch!(
(VALUE_COLUMN_NAME, Int64, [Some(0), Some(5), None]),
(ROW_ADDR, UInt64, [0, 1, 2])
)
.unwrap();
let schema = batch.schema();
let stream = stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
BloomFilterIndexPlugin::train_bloomfilter_index(stream, store.as_ref(), None)
.await
.unwrap();
let cache = LanceCache::with_capacity(1024 * 1024);
let index = BloomFilterIndex::load(store.clone(), None, &cache)
.await
.unwrap();
let query = BloomFilterQuery::Equals(ScalarValue::Int64(Some(5)));
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
match result {
SearchResult::AtMost(row_addrs) => {
let all_rows: Vec<u64> = row_addrs
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(
all_rows,
vec![0, 1, 2],
"Should return all rows (including nulls) since BloomFilter is inexact"
);
}
_ => panic!("Expected AtMost search result from bloomfilter"),
}
let query = BloomFilterQuery::IsIn(vec![
ScalarValue::Int64(Some(0)),
ScalarValue::Int64(Some(10)),
]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
match result {
SearchResult::AtMost(row_addrs) => {
let all_rows: Vec<u64> = row_addrs
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(
all_rows,
vec![0, 1, 2],
"Should return all rows in zone as possible matches"
);
}
_ => panic!("Expected AtMost search result from bloomfilter"),
}
}
}