1use std::{
5 any::Any,
6 collections::{BTreeMap, HashMap},
7 fmt::Debug,
8 ops::Bound,
9 sync::Arc,
10};
11
12use arrow::array::BinaryBuilder;
13use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array};
14use arrow_schema::{DataType, Field, Schema};
15use async_trait::async_trait;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::ScalarValue;
18use deepsize::DeepSizeOf;
19use futures::{stream, StreamExt, TryStreamExt};
20use lance_core::utils::mask::RowSetOps;
21use lance_core::{
22 cache::{CacheKey, LanceCache, WeakLanceCache},
23 error::LanceOptionExt,
24 utils::{
25 mask::{NullableRowAddrSet, RowAddrTreeMap},
26 tokio::get_num_compute_intensive_cpus,
27 },
28 Error, Result, ROW_ID,
29};
30use roaring::RoaringBitmap;
31use serde::Serialize;
32use snafu::location;
33use tracing::instrument;
34
35use super::{
36 btree::OrderableScalarValue, BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult,
37};
38use super::{AnyQuery, IndexStore, ScalarIndex};
39use crate::pbold;
40use crate::{
41 frag_reuse::FragReuseIndex,
42 scalar::{
43 expression::SargableQueryParser,
44 registry::{
45 DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
46 TrainingRequest, VALUE_COLUMN_NAME,
47 },
48 CreatedIndex, UpdateCriteria,
49 },
50};
51use crate::{metrics::MetricsCollector, Index, IndexType};
52use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader};
53
54pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
55pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
56
57const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
60
61const BITMAP_INDEX_VERSION: u32 = 0;
62
63#[derive(Clone)]
66struct LazyIndexReader {
67 index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
68 store: Arc<dyn IndexStore>,
69}
70
71impl std::fmt::Debug for LazyIndexReader {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("LazyIndexReader")
74 .field("store", &self.store)
75 .finish()
76 }
77}
78
79impl LazyIndexReader {
80 fn new(store: Arc<dyn IndexStore>) -> Self {
81 Self {
82 index_reader: Arc::new(tokio::sync::Mutex::new(None)),
83 store,
84 }
85 }
86
87 async fn get(&self) -> Result<Arc<dyn IndexReader>> {
88 let mut reader = self.index_reader.lock().await;
89 if reader.is_none() {
90 let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
91 *reader = Some(index_reader);
92 }
93 Ok(reader.as_ref().unwrap().clone())
94 }
95}
96
97#[derive(Clone, Debug)]
102pub struct BitmapIndex {
103 index_map: BTreeMap<OrderableScalarValue, usize>,
107
108 null_map: Arc<RowAddrTreeMap>,
109
110 value_type: DataType,
111
112 store: Arc<dyn IndexStore>,
113
114 index_cache: WeakLanceCache,
115
116 frag_reuse_index: Option<Arc<FragReuseIndex>>,
117
118 lazy_reader: LazyIndexReader,
119}
120
121#[derive(Debug, Clone)]
122pub struct BitmapKey {
123 value: OrderableScalarValue,
124}
125
126impl CacheKey for BitmapKey {
127 type ValueType = RowAddrTreeMap;
128
129 fn key(&self) -> std::borrow::Cow<'_, str> {
130 format!("{}", self.value.0).into()
131 }
132}
133
134impl BitmapIndex {
135 fn new(
136 index_map: BTreeMap<OrderableScalarValue, usize>,
137 null_map: Arc<RowAddrTreeMap>,
138 value_type: DataType,
139 store: Arc<dyn IndexStore>,
140 index_cache: WeakLanceCache,
141 frag_reuse_index: Option<Arc<FragReuseIndex>>,
142 ) -> Self {
143 let lazy_reader = LazyIndexReader::new(store.clone());
144 Self {
145 index_map,
146 null_map,
147 value_type,
148 store,
149 index_cache,
150 frag_reuse_index,
151 lazy_reader,
152 }
153 }
154
155 pub(crate) async fn load(
156 store: Arc<dyn IndexStore>,
157 frag_reuse_index: Option<Arc<FragReuseIndex>>,
158 index_cache: &LanceCache,
159 ) -> Result<Arc<Self>> {
160 let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
161 let total_rows = page_lookup_file.num_rows();
162
163 if total_rows == 0 {
164 let schema = page_lookup_file.schema();
165 let data_type = schema.fields[0].data_type();
166 return Ok(Arc::new(Self::new(
167 BTreeMap::new(),
168 Arc::new(RowAddrTreeMap::default()),
169 data_type,
170 store,
171 WeakLanceCache::from(index_cache),
172 frag_reuse_index,
173 )));
174 }
175
176 let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
177 let mut null_map = Arc::new(RowAddrTreeMap::default());
178 let mut value_type: Option<DataType> = None;
179 let mut null_location: Option<usize> = None;
180 let mut row_offset = 0;
181
182 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
183 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
184 let chunk = page_lookup_file
185 .read_range(start_row..end_row, Some(&["keys"]))
186 .await?;
187
188 if chunk.num_rows() == 0 {
189 continue;
190 }
191
192 if value_type.is_none() {
193 value_type = Some(chunk.schema().field(0).data_type().clone());
194 }
195
196 let dict_keys = chunk.column(0);
197
198 for idx in 0..chunk.num_rows() {
199 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
200
201 if key.0.is_null() {
202 null_location = Some(row_offset);
203 } else {
204 index_map.insert(key, row_offset);
205 }
206
207 row_offset += 1;
208 }
209 }
210
211 if let Some(null_loc) = null_location {
212 let batch = page_lookup_file
213 .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
214 .await?;
215
216 let binary_bitmaps = batch
217 .column(0)
218 .as_any()
219 .downcast_ref::<BinaryArray>()
220 .ok_or_else(|| Error::Internal {
221 message: "Invalid bitmap column type".to_string(),
222 location: location!(),
223 })?;
224 let bitmap_bytes = binary_bitmaps.value(0);
225 let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
226
227 if let Some(fri) = &frag_reuse_index {
229 bitmap = fri.remap_row_addrs_tree_map(&bitmap);
230 }
231
232 null_map = Arc::new(bitmap);
233 }
234
235 let final_value_type = value_type.expect_ok()?;
236
237 Ok(Arc::new(Self::new(
238 index_map,
239 null_map,
240 final_value_type,
241 store,
242 WeakLanceCache::from(index_cache),
243 frag_reuse_index,
244 )))
245 }
246
247 async fn load_bitmap(
248 &self,
249 key: &OrderableScalarValue,
250 metrics: Option<&dyn MetricsCollector>,
251 ) -> Result<Arc<RowAddrTreeMap>> {
252 if key.0.is_null() {
253 return Ok(self.null_map.clone());
254 }
255
256 let cache_key = BitmapKey { value: key.clone() };
257
258 if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
259 return Ok(cached);
260 }
261
262 if let Some(metrics) = metrics {
264 metrics.record_part_load();
265 }
266
267 let row_offset = match self.index_map.get(key) {
268 Some(loc) => *loc,
269 None => return Ok(Arc::new(RowAddrTreeMap::default())),
270 };
271
272 let page_lookup_file = self.lazy_reader.get().await?;
273 let batch = page_lookup_file
274 .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
275 .await?;
276
277 let binary_bitmaps = batch
278 .column(0)
279 .as_any()
280 .downcast_ref::<BinaryArray>()
281 .ok_or_else(|| Error::Internal {
282 message: "Invalid bitmap column type".to_string(),
283 location: location!(),
284 })?;
285 let bitmap_bytes = binary_bitmaps.value(0); let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
287
288 if let Some(fri) = &self.frag_reuse_index {
289 bitmap = fri.remap_row_addrs_tree_map(&bitmap);
290 }
291
292 self.index_cache
293 .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
294 .await;
295
296 Ok(Arc::new(bitmap))
297 }
298}
299
300impl DeepSizeOf for BitmapIndex {
301 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
302 let mut total_size = 0;
303
304 total_size += self.index_map.deep_size_of_children(context);
305 total_size += self.store.deep_size_of_children(context);
306
307 total_size
308 }
309}
310
311#[derive(Serialize)]
312struct BitmapStatistics {
313 num_bitmaps: usize,
314}
315
316#[async_trait]
317impl Index for BitmapIndex {
318 fn as_any(&self) -> &dyn Any {
319 self
320 }
321
322 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
323 self
324 }
325
326 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
327 Err(Error::NotSupported {
328 source: "BitmapIndex is not a vector index".into(),
329 location: location!(),
330 })
331 }
332
333 async fn prewarm(&self) -> Result<()> {
334 let page_lookup_file = self.lazy_reader.get().await?;
335 let total_rows = page_lookup_file.num_rows();
336
337 if total_rows == 0 {
338 return Ok(());
339 }
340
341 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
342 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
343 let chunk = page_lookup_file
344 .read_range(start_row..end_row, None)
345 .await?;
346
347 if chunk.num_rows() == 0 {
348 continue;
349 }
350
351 let dict_keys = chunk.column(0);
352 let binary_bitmaps = chunk.column(1);
353 let bitmap_binary_array = binary_bitmaps
354 .as_any()
355 .downcast_ref::<BinaryArray>()
356 .unwrap();
357
358 for idx in 0..chunk.num_rows() {
359 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
360
361 if key.0.is_null() {
362 continue;
363 }
364
365 let bitmap_bytes = bitmap_binary_array.value(idx);
366 let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
367
368 if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
369 bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
370 }
371
372 let cache_key = BitmapKey { value: key };
373 self.index_cache
374 .insert_with_key(&cache_key, Arc::new(bitmap))
375 .await;
376 }
377 }
378
379 Ok(())
380 }
381
382 fn index_type(&self) -> IndexType {
383 IndexType::Bitmap
384 }
385
386 fn statistics(&self) -> Result<serde_json::Value> {
387 let stats = BitmapStatistics {
388 num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
389 };
390 serde_json::to_value(stats).map_err(|e| Error::Internal {
391 message: format!("failed to serialize bitmap index statistics: {}", e),
392 location: location!(),
393 })
394 }
395
396 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
397 unimplemented!()
398 }
399}
400
401#[async_trait]
402impl ScalarIndex for BitmapIndex {
403 #[instrument(name = "bitmap_search", level = "debug", skip_all)]
404 async fn search(
405 &self,
406 query: &dyn AnyQuery,
407 metrics: &dyn MetricsCollector,
408 ) -> Result<SearchResult> {
409 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
410
411 let (row_ids, null_row_ids) = match query {
412 SargableQuery::Equals(val) => {
413 metrics.record_comparisons(1);
414 if val.is_null() {
415 ((*self.null_map).clone(), None)
417 } else {
418 let key = OrderableScalarValue(val.clone());
419 let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
420 let null_rows = if !self.null_map.is_empty() {
421 Some((*self.null_map).clone())
422 } else {
423 None
424 };
425 ((*bitmap).clone(), null_rows)
426 }
427 }
428 SargableQuery::Range(start, end) => {
429 let range_start = match start {
430 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
431 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
432 Bound::Unbounded => Bound::Unbounded,
433 };
434
435 let range_end = match end {
436 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
437 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
438 Bound::Unbounded => Bound::Unbounded,
439 };
440
441 let keys: Vec<_> = self
442 .index_map
443 .range((range_start, range_end))
444 .map(|(k, _v)| k.clone())
445 .collect();
446
447 metrics.record_comparisons(keys.len());
448
449 let result = if keys.is_empty() {
450 RowAddrTreeMap::default()
451 } else {
452 let bitmaps: Vec<_> = stream::iter(
453 keys.into_iter()
454 .map(|key| async move { self.load_bitmap(&key, None).await }),
455 )
456 .buffer_unordered(get_num_compute_intensive_cpus())
457 .try_collect()
458 .await?;
459
460 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
461 RowAddrTreeMap::union_all(&bitmap_refs)
462 };
463
464 let null_rows = if !self.null_map.is_empty() {
465 Some((*self.null_map).clone())
466 } else {
467 None
468 };
469 (result, null_rows)
470 }
471 SargableQuery::IsIn(values) => {
472 metrics.record_comparisons(values.len());
473
474 let mut has_null = false;
476 let keys: Vec<_> = values
477 .iter()
478 .filter_map(|val| {
479 if val.is_null() {
480 has_null = true;
481 None
482 } else {
483 let key = OrderableScalarValue(val.clone());
484 if self.index_map.contains_key(&key) {
485 Some(key)
486 } else {
487 None
488 }
489 }
490 })
491 .collect();
492
493 let mut bitmaps: Vec<_> = stream::iter(
495 keys.into_iter()
496 .map(|key| async move { self.load_bitmap(&key, None).await }),
497 )
498 .buffer_unordered(get_num_compute_intensive_cpus())
499 .try_collect()
500 .await?;
501
502 if has_null && !self.null_map.is_empty() {
504 bitmaps.push(self.null_map.clone());
505 }
506
507 let result = if bitmaps.is_empty() {
508 RowAddrTreeMap::default()
509 } else {
510 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
512 RowAddrTreeMap::union_all(&bitmap_refs)
513 };
514
515 let null_rows = if !has_null && !self.null_map.is_empty() {
518 Some((*self.null_map).clone())
519 } else {
520 None
521 };
522 (result, null_rows)
523 }
524 SargableQuery::IsNull() => {
525 metrics.record_comparisons(1);
526 ((*self.null_map).clone(), None)
528 }
529 SargableQuery::FullTextSearch(_) => {
530 return Err(Error::NotSupported {
531 source: "full text search is not supported for bitmap indexes".into(),
532 location: location!(),
533 });
534 }
535 };
536
537 let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
538 Ok(SearchResult::Exact(selection))
539 }
540
541 fn can_remap(&self) -> bool {
542 true
543 }
544
545 async fn remap(
547 &self,
548 mapping: &HashMap<u64, Option<u64>>,
549 dest_store: &dyn IndexStore,
550 ) -> Result<CreatedIndex> {
551 let mut state = HashMap::new();
552
553 for key in self.index_map.keys() {
554 let bitmap = self.load_bitmap(key, None).await?;
555 let remapped_bitmap =
556 RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
557 let addr_as_u64 = u64::from(addr);
558 mapping
559 .get(&addr_as_u64)
560 .copied()
561 .unwrap_or(Some(addr_as_u64))
562 }));
563 state.insert(key.0.clone(), remapped_bitmap);
564 }
565
566 if !self.null_map.is_empty() {
567 let remapped_null =
568 RowAddrTreeMap::from_iter(self.null_map.row_addrs().unwrap().filter_map(|addr| {
569 let addr_as_u64 = u64::from(addr);
570 mapping
571 .get(&addr_as_u64)
572 .copied()
573 .unwrap_or(Some(addr_as_u64))
574 }));
575 state.insert(ScalarValue::try_from(&self.value_type)?, remapped_null);
576 }
577
578 BitmapIndexPlugin::write_bitmap_index(state, dest_store, &self.value_type).await?;
579
580 Ok(CreatedIndex {
581 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
582 .unwrap(),
583 index_version: BITMAP_INDEX_VERSION,
584 })
585 }
586
587 async fn update(
589 &self,
590 new_data: SendableRecordBatchStream,
591 dest_store: &dyn IndexStore,
592 ) -> Result<CreatedIndex> {
593 let mut state = HashMap::new();
594
595 for key in self.index_map.keys() {
597 let bitmap = self.load_bitmap(key, None).await?;
598 state.insert(key.0.clone(), (*bitmap).clone());
599 }
600
601 if !self.null_map.is_empty() {
602 let ex_null = new_null_array(&self.value_type, 1);
603 let ex_null = ScalarValue::try_from_array(ex_null.as_ref(), 0)?;
604 state.insert(ex_null, (*self.null_map).clone());
605 }
606
607 BitmapIndexPlugin::do_train_bitmap_index(new_data, state, dest_store).await?;
608
609 Ok(CreatedIndex {
610 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
611 .unwrap(),
612 index_version: BITMAP_INDEX_VERSION,
613 })
614 }
615
616 fn update_criteria(&self) -> UpdateCriteria {
617 UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None).with_row_id())
618 }
619
620 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
621 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
622 }
623}
624
625#[derive(Debug, Default)]
626pub struct BitmapIndexPlugin;
627
628impl BitmapIndexPlugin {
629 fn get_batch_from_arrays(
630 keys: Arc<dyn Array>,
631 binary_bitmaps: Arc<dyn Array>,
632 ) -> Result<RecordBatch> {
633 let schema = Arc::new(Schema::new(vec![
634 Field::new("keys", keys.data_type().clone(), true),
635 Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
636 ]));
637
638 let columns = vec![keys, binary_bitmaps];
639
640 Ok(RecordBatch::try_new(schema, columns)?)
641 }
642
643 async fn write_bitmap_index(
644 state: HashMap<ScalarValue, RowAddrTreeMap>,
645 index_store: &dyn IndexStore,
646 value_type: &DataType,
647 ) -> Result<()> {
648 let num_bitmaps = state.len();
649 let schema = Arc::new(Schema::new(vec![
650 Field::new("keys", value_type.clone(), true),
651 Field::new("bitmaps", DataType::Binary, true),
652 ]));
653
654 let mut bitmap_index_file = index_store
655 .new_index_file(BITMAP_LOOKUP_NAME, schema)
656 .await?;
657
658 let mut cur_keys = Vec::new();
659 let mut cur_bitmaps = Vec::new();
660 let mut cur_bytes = 0;
661
662 for (key, bitmap) in state.into_iter() {
663 let mut bytes = Vec::new();
664 bitmap.serialize_into(&mut bytes).unwrap();
665 let bitmap_size = bytes.len();
666
667 if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
668 let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
669 let mut binary_builder = BinaryBuilder::new();
670 for b in &cur_bitmaps {
671 binary_builder.append_value(b);
672 }
673 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
674
675 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
676 bitmap_index_file.write_record_batch(record_batch).await?;
677
678 cur_keys.clear();
679 cur_bitmaps.clear();
680 cur_bytes = 0;
681 }
682
683 cur_keys.push(key);
684 cur_bitmaps.push(bytes);
685 cur_bytes += bitmap_size;
686 }
687
688 if !cur_keys.is_empty() {
690 let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
691 let mut binary_builder = BinaryBuilder::new();
692 for b in &cur_bitmaps {
693 binary_builder.append_value(b);
694 }
695 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
696
697 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
698 bitmap_index_file.write_record_batch(record_batch).await?;
699 }
700
701 let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps }).map_err(|e| {
703 Error::Internal {
704 message: format!("failed to serialize bitmap statistics: {e}"),
705 location: location!(),
706 }
707 })?;
708 let mut metadata = HashMap::new();
709 metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
710
711 bitmap_index_file.finish_with_metadata(metadata).await?;
712
713 Ok(())
714 }
715
716 async fn do_train_bitmap_index(
717 mut data_source: SendableRecordBatchStream,
718 mut state: HashMap<ScalarValue, RowAddrTreeMap>,
719 index_store: &dyn IndexStore,
720 ) -> Result<()> {
721 let value_type = data_source.schema().field(0).data_type().clone();
722 while let Some(batch) = data_source.try_next().await? {
723 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
724 let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
725 debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
726
727 let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
728
729 for i in 0..values.len() {
730 let row_id = row_id_column.value(i);
731 let key = ScalarValue::try_from_array(values.as_ref(), i)?;
732 state.entry(key.clone()).or_default().insert(row_id);
733 }
734 }
735
736 Self::write_bitmap_index(state, index_store, &value_type).await
737 }
738
739 pub async fn train_bitmap_index(
740 data: SendableRecordBatchStream,
741 index_store: &dyn IndexStore,
742 ) -> Result<()> {
743 let dictionary: HashMap<ScalarValue, RowAddrTreeMap> = HashMap::new();
745
746 Self::do_train_bitmap_index(data, dictionary, index_store).await
747 }
748}
749
750#[async_trait]
751impl ScalarIndexPlugin for BitmapIndexPlugin {
752 fn name(&self) -> &str {
753 "Bitmap"
754 }
755
756 fn new_training_request(
757 &self,
758 _params: &str,
759 field: &Field,
760 ) -> Result<Box<dyn TrainingRequest>> {
761 if field.data_type().is_nested() {
762 return Err(Error::InvalidInput {
763 source: "A bitmap index can only be created on a non-nested field.".into(),
764 location: location!(),
765 });
766 }
767 Ok(Box::new(DefaultTrainingRequest::new(
768 TrainingCriteria::new(TrainingOrdering::None).with_row_id(),
769 )))
770 }
771
772 fn provides_exact_answer(&self) -> bool {
773 true
774 }
775
776 fn version(&self) -> u32 {
777 BITMAP_INDEX_VERSION
778 }
779
780 fn new_query_parser(
781 &self,
782 index_name: String,
783 _index_details: &prost_types::Any,
784 ) -> Option<Box<dyn ScalarQueryParser>> {
785 Some(Box::new(SargableQueryParser::new(index_name, false)))
786 }
787
788 async fn train_index(
789 &self,
790 data: SendableRecordBatchStream,
791 index_store: &dyn IndexStore,
792 _request: Box<dyn TrainingRequest>,
793 fragment_ids: Option<Vec<u32>>,
794 ) -> Result<CreatedIndex> {
795 if fragment_ids.is_some() {
796 return Err(Error::InvalidInput {
797 source: "Bitmap index does not support fragment training".into(),
798 location: location!(),
799 });
800 }
801
802 Self::train_bitmap_index(data, index_store).await?;
803 Ok(CreatedIndex {
804 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
805 .unwrap(),
806 index_version: BITMAP_INDEX_VERSION,
807 })
808 }
809
810 async fn load_index(
812 &self,
813 index_store: Arc<dyn IndexStore>,
814 _index_details: &prost_types::Any,
815 frag_reuse_index: Option<Arc<FragReuseIndex>>,
816 cache: &LanceCache,
817 ) -> Result<Arc<dyn ScalarIndex>> {
818 Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
819 }
820
821 async fn load_statistics(
822 &self,
823 index_store: Arc<dyn IndexStore>,
824 _index_details: &prost_types::Any,
825 ) -> Result<Option<serde_json::Value>> {
826 let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
827 if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
828 let stats = serde_json::from_str(value).map_err(|e| Error::Internal {
829 message: format!("failed to parse bitmap statistics metadata: {e}"),
830 location: location!(),
831 })?;
832 Ok(Some(stats))
833 } else {
834 Ok(None)
835 }
836 }
837}
838
839#[cfg(test)]
840pub mod tests {
841 use super::*;
842 use crate::metrics::NoOpMetricsCollector;
843 use crate::scalar::lance_format::LanceIndexStore;
844 use arrow_array::{record_batch, RecordBatch, StringArray, UInt64Array};
845 use arrow_schema::{DataType, Field, Schema};
846 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
847 use futures::stream;
848 use lance_core::utils::mask::RowSetOps;
849 use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
850 use lance_io::object_store::ObjectStore;
851 use std::collections::HashMap;
852
853 #[tokio::test]
854 async fn test_bitmap_lazy_loading_and_cache() {
855 let tmpdir = TempObjDir::default();
857 let store = Arc::new(LanceIndexStore::new(
858 Arc::new(ObjectStore::local()),
859 tmpdir.clone(),
860 Arc::new(LanceCache::no_cache()),
861 ));
862
863 let colors = vec![
865 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
866 "red", "red", "blue", "green", "yellow",
867 ];
868
869 let row_ids = (0u64..15u64).collect::<Vec<_>>();
870
871 let schema = Arc::new(Schema::new(vec![
872 Field::new("value", DataType::Utf8, false),
873 Field::new("_rowid", DataType::UInt64, false),
874 ]));
875
876 let batch = RecordBatch::try_new(
877 schema.clone(),
878 vec![
879 Arc::new(StringArray::from(colors.clone())),
880 Arc::new(UInt64Array::from(row_ids.clone())),
881 ],
882 )
883 .unwrap();
884
885 let stream = stream::once(async move { Ok(batch) });
886 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
887
888 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
890 .await
891 .unwrap();
892
893 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
898 .await
899 .unwrap();
900
901 assert_eq!(index.index_map.len(), 4); assert!(index.null_map.is_empty()); let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
906 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
907
908 let expected_red_rows = vec![0u64, 3, 6, 10, 11];
910 if let SearchResult::Exact(row_ids) = result {
911 let mut actual: Vec<u64> = row_ids
912 .true_rows()
913 .row_addrs()
914 .unwrap()
915 .map(|id| id.into())
916 .collect();
917 actual.sort();
918 assert_eq!(actual, expected_red_rows);
919 } else {
920 panic!("Expected exact search result");
921 }
922
923 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
925 if let SearchResult::Exact(row_ids) = result {
926 let mut actual: Vec<u64> = row_ids
927 .true_rows()
928 .row_addrs()
929 .unwrap()
930 .map(|id| id.into())
931 .collect();
932 actual.sort();
933 assert_eq!(actual, expected_red_rows);
934 }
935
936 let query = SargableQuery::Range(
938 std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
939 std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
940 );
941 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
942
943 let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
944 if let SearchResult::Exact(row_ids) = result {
945 let mut actual: Vec<u64> = row_ids
946 .true_rows()
947 .row_addrs()
948 .unwrap()
949 .map(|id| id.into())
950 .collect();
951 actual.sort();
952 assert_eq!(actual, expected_range_rows);
953 }
954
955 let query = SargableQuery::IsIn(vec![
957 ScalarValue::Utf8(Some("red".to_string())),
958 ScalarValue::Utf8(Some("yellow".to_string())),
959 ]);
960 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
961
962 let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
963 if let SearchResult::Exact(row_ids) = result {
964 let mut actual: Vec<u64> = row_ids
965 .true_rows()
966 .row_addrs()
967 .unwrap()
968 .map(|id| id.into())
969 .collect();
970 actual.sort();
971 assert_eq!(actual, expected_in_rows);
972 }
973 }
974
975 #[tokio::test]
976 #[ignore]
977 async fn test_big_bitmap_index() {
978 use super::{BitmapIndex, BITMAP_LOOKUP_NAME};
981 use crate::scalar::lance_format::LanceIndexStore;
982 use crate::scalar::IndexStore;
983 use arrow_schema::DataType;
984 use datafusion_common::ScalarValue;
985 use lance_core::cache::LanceCache;
986 use lance_core::utils::mask::RowAddrTreeMap;
987 use lance_io::object_store::ObjectStore;
988 use std::collections::HashMap;
989 use std::sync::Arc;
990
991 let m: u32 = 2_500_000;
997 let per_bitmap_size = 1000; let mut state = HashMap::new();
1000 for i in 0..m {
1001 let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
1003
1004 let key = ScalarValue::UInt32(Some(i));
1005 state.insert(key, bitmap);
1006 }
1007
1008 let tmpdir = TempObjDir::default();
1010 let test_store = LanceIndexStore::new(
1011 Arc::new(ObjectStore::local()),
1012 tmpdir.clone(),
1013 Arc::new(LanceCache::no_cache()),
1014 );
1015
1016 let result =
1019 BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
1020
1021 assert!(
1022 result.is_ok(),
1023 "Failed to write bitmap index: {:?}",
1024 result.err()
1025 );
1026
1027 let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
1029 assert!(
1030 index_file.is_ok(),
1031 "Failed to open index file: {:?}",
1032 index_file.err()
1033 );
1034 let index_file = index_file.unwrap();
1035
1036 tracing::info!(
1038 "Index file contains {} rows in total",
1039 index_file.num_rows()
1040 );
1041
1042 tracing::info!("Loading index from disk...");
1044 let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
1045 .await
1046 .expect("Failed to load bitmap index");
1047
1048 assert_eq!(
1050 loaded_index.index_map.len(),
1051 m as usize,
1052 "Loaded index has incorrect number of keys (expected {}, got {})",
1053 m,
1054 loaded_index.index_map.len()
1055 );
1056
1057 let test_keys = [0, m / 2, m - 1]; for &key_val in &test_keys {
1060 let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
1061 let bitmap = loaded_index
1063 .load_bitmap(&key, None)
1064 .await
1065 .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
1066
1067 let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
1069
1070 assert_eq!(
1072 row_addrs.len(),
1073 per_bitmap_size as usize,
1074 "Bitmap for key {} has wrong size",
1075 key_val
1076 );
1077
1078 for i in 0..5.min(per_bitmap_size) {
1080 assert!(
1081 row_addrs.contains(&i),
1082 "Bitmap for key {} should contain row_id {}",
1083 key_val,
1084 i
1085 );
1086 }
1087
1088 for i in (per_bitmap_size - 5)..per_bitmap_size {
1089 assert!(
1090 row_addrs.contains(&i),
1091 "Bitmap for key {} should contain row_id {}",
1092 key_val,
1093 i
1094 );
1095 }
1096
1097 let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
1099 assert_eq!(
1100 row_addrs, expected_range,
1101 "Bitmap for key {} doesn't contain expected values",
1102 key_val
1103 );
1104
1105 tracing::info!(
1106 "✓ Verified bitmap for key {}: {} rows as expected",
1107 key_val,
1108 row_addrs.len()
1109 );
1110 }
1111
1112 tracing::info!("Test successful! Index properly contains {} keys", m);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_bitmap_prewarm() {
1117 let tmpdir = TempObjDir::default();
1119 let store = Arc::new(LanceIndexStore::new(
1120 Arc::new(ObjectStore::local()),
1121 tmpdir.clone(),
1122 Arc::new(LanceCache::no_cache()),
1123 ));
1124
1125 let colors = vec![
1127 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1128 "red", "red", "blue", "green", "yellow",
1129 ];
1130
1131 let row_ids = (0u64..15u64).collect::<Vec<_>>();
1132
1133 let schema = Arc::new(Schema::new(vec![
1134 Field::new("value", DataType::Utf8, false),
1135 Field::new("_rowid", DataType::UInt64, false),
1136 ]));
1137
1138 let batch = RecordBatch::try_new(
1139 schema.clone(),
1140 vec![
1141 Arc::new(StringArray::from(colors.clone())),
1142 Arc::new(UInt64Array::from(row_ids.clone())),
1143 ],
1144 )
1145 .unwrap();
1146
1147 let stream = stream::once(async move { Ok(batch) });
1148 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1149
1150 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1152 .await
1153 .unwrap();
1154
1155 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
1160 .await
1161 .unwrap();
1162
1163 let cache_key_red = BitmapKey {
1165 value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
1166 };
1167 let cache_key_blue = BitmapKey {
1168 value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
1169 };
1170
1171 assert!(cache
1172 .get_with_key::<BitmapKey>(&cache_key_red)
1173 .await
1174 .is_none());
1175 assert!(cache
1176 .get_with_key::<BitmapKey>(&cache_key_blue)
1177 .await
1178 .is_none());
1179
1180 index.prewarm().await.unwrap();
1182
1183 assert!(cache
1185 .get_with_key::<BitmapKey>(&cache_key_red)
1186 .await
1187 .is_some());
1188 assert!(cache
1189 .get_with_key::<BitmapKey>(&cache_key_blue)
1190 .await
1191 .is_some());
1192
1193 let cached_red = cache
1195 .get_with_key::<BitmapKey>(&cache_key_red)
1196 .await
1197 .unwrap();
1198 let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
1199 assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
1200
1201 index.prewarm().await.unwrap();
1203
1204 let cached_red_2 = cache
1206 .get_with_key::<BitmapKey>(&cache_key_red)
1207 .await
1208 .unwrap();
1209 let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
1210 assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
1211 }
1212
1213 #[tokio::test]
1214 async fn test_remap_bitmap_with_null() {
1215 use arrow_array::UInt32Array;
1216
1217 let tmpdir = TempObjDir::default();
1219 let test_store = Arc::new(LanceIndexStore::new(
1220 Arc::new(ObjectStore::local()),
1221 tmpdir.clone(),
1222 Arc::new(LanceCache::no_cache()),
1223 ));
1224
1225 let values = vec![
1230 None, None, Some(1u32), Some(1u32), Some(2u32), Some(2u32), ];
1237
1238 let row_ids: Vec<u64> = vec![
1240 RowAddress::new_from_parts(1, 0).into(),
1241 RowAddress::new_from_parts(1, 1).into(),
1242 RowAddress::new_from_parts(1, 2).into(),
1243 RowAddress::new_from_parts(2, 0).into(),
1244 RowAddress::new_from_parts(2, 1).into(),
1245 RowAddress::new_from_parts(2, 2).into(),
1246 ];
1247
1248 let schema = Arc::new(Schema::new(vec![
1249 Field::new("value", DataType::UInt32, true),
1250 Field::new("_rowid", DataType::UInt64, false),
1251 ]));
1252
1253 let batch = RecordBatch::try_new(
1254 schema.clone(),
1255 vec![
1256 Arc::new(UInt32Array::from(values)),
1257 Arc::new(UInt64Array::from(row_ids)),
1258 ],
1259 )
1260 .unwrap();
1261
1262 let stream = stream::once(async move { Ok(batch) });
1263 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1264
1265 BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
1267 .await
1268 .unwrap();
1269
1270 let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1272 .await
1273 .expect("Failed to load bitmap index");
1274
1275 assert_eq!(index.index_map.len(), 2); assert!(!index.null_map.is_empty()); let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
1281 row_addr_map.insert(
1282 RowAddress::new_from_parts(1, 0).into(),
1283 Some(RowAddress::new_from_parts(3, 0).into()),
1284 );
1285 row_addr_map.insert(
1286 RowAddress::new_from_parts(1, 1).into(),
1287 Some(RowAddress::new_from_parts(3, 1).into()),
1288 );
1289 row_addr_map.insert(
1290 RowAddress::new_from_parts(1, 2).into(),
1291 Some(RowAddress::new_from_parts(3, 2).into()),
1292 );
1293 row_addr_map.insert(
1294 RowAddress::new_from_parts(2, 0).into(),
1295 Some(RowAddress::new_from_parts(3, 3).into()),
1296 );
1297 row_addr_map.insert(
1298 RowAddress::new_from_parts(2, 1).into(),
1299 Some(RowAddress::new_from_parts(3, 4).into()),
1300 );
1301 row_addr_map.insert(
1302 RowAddress::new_from_parts(2, 2).into(),
1303 Some(RowAddress::new_from_parts(3, 5).into()),
1304 );
1305
1306 index
1308 .remap(&row_addr_map, test_store.as_ref())
1309 .await
1310 .unwrap();
1311
1312 let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
1314 .await
1315 .expect("Failed to load remapped bitmap index");
1316
1317 let expected_null_addrs: Vec<u64> = vec![
1319 RowAddress::new_from_parts(3, 0).into(),
1320 RowAddress::new_from_parts(3, 1).into(),
1321 ];
1322 let actual_null_addrs: Vec<u64> = reloaded_idx
1323 .null_map
1324 .row_addrs()
1325 .unwrap()
1326 .map(u64::from)
1327 .collect();
1328 assert_eq!(
1329 actual_null_addrs, expected_null_addrs,
1330 "Null bitmap not remapped correctly"
1331 );
1332
1333 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
1335 let result = reloaded_idx
1336 .search(&query, &NoOpMetricsCollector)
1337 .await
1338 .unwrap();
1339 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1340 let mut actual: Vec<u64> = row_ids
1341 .true_rows()
1342 .row_addrs()
1343 .unwrap()
1344 .map(u64::from)
1345 .collect();
1346 actual.sort();
1347 let expected: Vec<u64> = vec![
1348 RowAddress::new_from_parts(3, 2).into(),
1349 RowAddress::new_from_parts(3, 3).into(),
1350 ];
1351 assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
1352 }
1353
1354 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
1356 let result = reloaded_idx
1357 .search(&query, &NoOpMetricsCollector)
1358 .await
1359 .unwrap();
1360 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1361 let mut actual: Vec<u64> = row_ids
1362 .true_rows()
1363 .row_addrs()
1364 .unwrap()
1365 .map(u64::from)
1366 .collect();
1367 actual.sort();
1368 let expected: Vec<u64> = vec![
1369 RowAddress::new_from_parts(3, 4).into(),
1370 RowAddress::new_from_parts(3, 5).into(),
1371 ];
1372 assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
1373 }
1374
1375 let query = SargableQuery::IsNull();
1377 let result = reloaded_idx
1378 .search(&query, &NoOpMetricsCollector)
1379 .await
1380 .unwrap();
1381 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1382 let mut actual: Vec<u64> = row_ids
1383 .true_rows()
1384 .row_addrs()
1385 .unwrap()
1386 .map(u64::from)
1387 .collect();
1388 actual.sort();
1389 assert_eq!(
1390 actual, expected_null_addrs,
1391 "Null search results not correct"
1392 );
1393 }
1394 }
1395
1396 #[tokio::test]
1397 async fn test_bitmap_null_handling_in_queries() {
1398 let tmpdir = TempObjDir::default();
1400 let store = Arc::new(LanceIndexStore::new(
1401 Arc::new(ObjectStore::local()),
1402 tmpdir.clone(),
1403 Arc::new(LanceCache::no_cache()),
1404 ));
1405
1406 let batch = record_batch!(
1408 ("value", Int64, [Some(0), Some(5), None]),
1409 ("_rowid", UInt64, [0, 1, 2])
1410 )
1411 .unwrap();
1412 let schema = batch.schema();
1413 let stream = stream::once(async move { Ok(batch) });
1414 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1415
1416 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1418 .await
1419 .unwrap();
1420
1421 let cache = LanceCache::with_capacity(1024 * 1024);
1422 let index = BitmapIndex::load(store.clone(), None, &cache)
1423 .await
1424 .unwrap();
1425
1426 let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
1428 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1429
1430 match result {
1431 SearchResult::Exact(row_ids) => {
1432 let actual_rows: Vec<u64> = row_ids
1433 .true_rows()
1434 .row_addrs()
1435 .unwrap()
1436 .map(u64::from)
1437 .collect();
1438 assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
1439
1440 let null_row_ids = row_ids.null_rows();
1441 assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1443 let null_rows: Vec<u64> =
1444 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1445 assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1446 }
1447 _ => panic!("Expected Exact search result"),
1448 }
1449
1450 let query = SargableQuery::IsNull();
1452 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1453
1454 match result {
1455 SearchResult::Exact(row_addrs) => {
1456 let actual_rows: Vec<u64> = row_addrs
1457 .true_rows()
1458 .row_addrs()
1459 .unwrap()
1460 .map(u64::from)
1461 .collect();
1462 assert_eq!(
1463 actual_rows,
1464 vec![2],
1465 "IsNull should find row 2 where value is null"
1466 );
1467
1468 let null_row_ids = row_addrs.null_rows();
1469 assert!(
1471 null_row_ids.is_empty(),
1472 "null_row_ids should be None for IsNull query"
1473 );
1474 }
1475 _ => panic!("Expected Exact search result"),
1476 }
1477
1478 let query = SargableQuery::Range(
1480 std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
1481 std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
1482 );
1483 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1484
1485 match result {
1486 SearchResult::Exact(row_addrs) => {
1487 let actual_rows: Vec<u64> = row_addrs
1488 .true_rows()
1489 .row_addrs()
1490 .unwrap()
1491 .map(u64::from)
1492 .collect();
1493 assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
1494
1495 let null_row_ids = row_addrs.null_rows();
1497 assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1498 let null_rows: Vec<u64> =
1499 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1500 assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1501 }
1502 _ => panic!("Expected Exact search result"),
1503 }
1504 }
1505}