1use std::{
5 any::Any,
6 collections::{BTreeMap, HashMap},
7 fmt::Debug,
8 ops::Bound,
9 sync::Arc,
10};
11
12use crate::pbold;
13use arrow::array::BinaryBuilder;
14use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array};
15use arrow_schema::{DataType, Field, Schema};
16use async_trait::async_trait;
17use datafusion::physical_plan::SendableRecordBatchStream;
18use datafusion_common::ScalarValue;
19use deepsize::DeepSizeOf;
20use futures::{stream, StreamExt, TryStreamExt};
21use lance_core::{
22 cache::{CacheKey, LanceCache, WeakLanceCache},
23 error::LanceOptionExt,
24 utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus},
25 Error, Result, ROW_ID,
26};
27use roaring::RoaringBitmap;
28use serde::Serialize;
29use snafu::location;
30use tracing::instrument;
31
32use super::{
33 btree::OrderableScalarValue, BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult,
34};
35use super::{AnyQuery, IndexStore, ScalarIndex};
36use crate::{
37 frag_reuse::FragReuseIndex,
38 scalar::{
39 expression::SargableQueryParser,
40 registry::{
41 DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
42 TrainingRequest, VALUE_COLUMN_NAME,
43 },
44 CreatedIndex, UpdateCriteria,
45 },
46};
47use crate::{metrics::MetricsCollector, Index, IndexType};
48use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader};
49
50pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
51
52const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
55
56const BITMAP_INDEX_VERSION: u32 = 0;
57
58#[derive(Clone)]
61struct LazyIndexReader {
62 index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
63 store: Arc<dyn IndexStore>,
64}
65
66impl std::fmt::Debug for LazyIndexReader {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("LazyIndexReader")
69 .field("store", &self.store)
70 .finish()
71 }
72}
73
74impl LazyIndexReader {
75 fn new(store: Arc<dyn IndexStore>) -> Self {
76 Self {
77 index_reader: Arc::new(tokio::sync::Mutex::new(None)),
78 store,
79 }
80 }
81
82 async fn get(&self) -> Result<Arc<dyn IndexReader>> {
83 let mut reader = self.index_reader.lock().await;
84 if reader.is_none() {
85 let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
86 *reader = Some(index_reader);
87 }
88 Ok(reader.as_ref().unwrap().clone())
89 }
90}
91
92#[derive(Clone, Debug)]
97pub struct BitmapIndex {
98 index_map: BTreeMap<OrderableScalarValue, usize>,
102
103 null_map: Arc<RowIdTreeMap>,
104
105 value_type: DataType,
106
107 store: Arc<dyn IndexStore>,
108
109 index_cache: WeakLanceCache,
110
111 frag_reuse_index: Option<Arc<FragReuseIndex>>,
112
113 lazy_reader: LazyIndexReader,
114}
115
116#[derive(Debug, Clone)]
117pub struct BitmapKey {
118 value: OrderableScalarValue,
119}
120
121impl CacheKey for BitmapKey {
122 type ValueType = RowIdTreeMap;
123
124 fn key(&self) -> std::borrow::Cow<'_, str> {
125 format!("{}", self.value.0).into()
126 }
127}
128
129impl BitmapIndex {
130 fn new(
131 index_map: BTreeMap<OrderableScalarValue, usize>,
132 null_map: Arc<RowIdTreeMap>,
133 value_type: DataType,
134 store: Arc<dyn IndexStore>,
135 index_cache: WeakLanceCache,
136 frag_reuse_index: Option<Arc<FragReuseIndex>>,
137 ) -> Self {
138 let lazy_reader = LazyIndexReader::new(store.clone());
139 Self {
140 index_map,
141 null_map,
142 value_type,
143 store,
144 index_cache,
145 frag_reuse_index,
146 lazy_reader,
147 }
148 }
149
150 pub(crate) async fn load(
151 store: Arc<dyn IndexStore>,
152 frag_reuse_index: Option<Arc<FragReuseIndex>>,
153 index_cache: &LanceCache,
154 ) -> Result<Arc<Self>> {
155 let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
156 let total_rows = page_lookup_file.num_rows();
157
158 if total_rows == 0 {
159 let schema = page_lookup_file.schema();
160 let data_type = schema.fields[0].data_type();
161 return Ok(Arc::new(Self::new(
162 BTreeMap::new(),
163 Arc::new(RowIdTreeMap::default()),
164 data_type,
165 store,
166 WeakLanceCache::from(index_cache),
167 frag_reuse_index,
168 )));
169 }
170
171 let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
172 let mut null_map = Arc::new(RowIdTreeMap::default());
173 let mut value_type: Option<DataType> = None;
174 let mut null_location: Option<usize> = None;
175 let mut row_offset = 0;
176
177 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
178 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
179 let chunk = page_lookup_file
180 .read_range(start_row..end_row, Some(&["keys"]))
181 .await?;
182
183 if chunk.num_rows() == 0 {
184 continue;
185 }
186
187 if value_type.is_none() {
188 value_type = Some(chunk.schema().field(0).data_type().clone());
189 }
190
191 let dict_keys = chunk.column(0);
192
193 for idx in 0..chunk.num_rows() {
194 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
195
196 if key.0.is_null() {
197 null_location = Some(row_offset);
198 } else {
199 index_map.insert(key, row_offset);
200 }
201
202 row_offset += 1;
203 }
204 }
205
206 if let Some(null_loc) = null_location {
207 let batch = page_lookup_file
208 .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
209 .await?;
210
211 let binary_bitmaps = batch
212 .column(0)
213 .as_any()
214 .downcast_ref::<BinaryArray>()
215 .ok_or_else(|| Error::Internal {
216 message: "Invalid bitmap column type".to_string(),
217 location: location!(),
218 })?;
219 let bitmap_bytes = binary_bitmaps.value(0);
220 let mut bitmap = RowIdTreeMap::deserialize_from(bitmap_bytes).unwrap();
221
222 if let Some(fri) = &frag_reuse_index {
224 bitmap = fri.remap_row_ids_tree_map(&bitmap);
225 }
226
227 null_map = Arc::new(bitmap);
228 }
229
230 let final_value_type = value_type.expect_ok()?;
231
232 Ok(Arc::new(Self::new(
233 index_map,
234 null_map,
235 final_value_type,
236 store,
237 WeakLanceCache::from(index_cache),
238 frag_reuse_index,
239 )))
240 }
241
242 async fn load_bitmap(
243 &self,
244 key: &OrderableScalarValue,
245 metrics: Option<&dyn MetricsCollector>,
246 ) -> Result<Arc<RowIdTreeMap>> {
247 if key.0.is_null() {
248 return Ok(self.null_map.clone());
249 }
250
251 let cache_key = BitmapKey { value: key.clone() };
252
253 if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
254 return Ok(cached);
255 }
256
257 if let Some(metrics) = metrics {
259 metrics.record_part_load();
260 }
261
262 let row_offset = match self.index_map.get(key) {
263 Some(loc) => *loc,
264 None => return Ok(Arc::new(RowIdTreeMap::default())),
265 };
266
267 let page_lookup_file = self.lazy_reader.get().await?;
268 let batch = page_lookup_file
269 .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
270 .await?;
271
272 let binary_bitmaps = batch
273 .column(0)
274 .as_any()
275 .downcast_ref::<BinaryArray>()
276 .ok_or_else(|| Error::Internal {
277 message: "Invalid bitmap column type".to_string(),
278 location: location!(),
279 })?;
280 let bitmap_bytes = binary_bitmaps.value(0); let mut bitmap = RowIdTreeMap::deserialize_from(bitmap_bytes).unwrap();
282
283 if let Some(fri) = &self.frag_reuse_index {
284 bitmap = fri.remap_row_ids_tree_map(&bitmap);
285 }
286
287 self.index_cache
288 .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
289 .await;
290
291 Ok(Arc::new(bitmap))
292 }
293}
294
295impl DeepSizeOf for BitmapIndex {
296 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
297 let mut total_size = 0;
298
299 total_size += self.index_map.deep_size_of_children(context);
300 total_size += self.store.deep_size_of_children(context);
301
302 total_size
303 }
304}
305
306#[derive(Serialize)]
307struct BitmapStatistics {
308 num_bitmaps: usize,
309}
310
311#[async_trait]
312impl Index for BitmapIndex {
313 fn as_any(&self) -> &dyn Any {
314 self
315 }
316
317 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
318 self
319 }
320
321 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
322 Err(Error::NotSupported {
323 source: "BitmapIndex is not a vector index".into(),
324 location: location!(),
325 })
326 }
327
328 async fn prewarm(&self) -> Result<()> {
329 let page_lookup_file = self.lazy_reader.get().await?;
330 let total_rows = page_lookup_file.num_rows();
331
332 if total_rows == 0 {
333 return Ok(());
334 }
335
336 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
337 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
338 let chunk = page_lookup_file
339 .read_range(start_row..end_row, None)
340 .await?;
341
342 if chunk.num_rows() == 0 {
343 continue;
344 }
345
346 let dict_keys = chunk.column(0);
347 let binary_bitmaps = chunk.column(1);
348 let bitmap_binary_array = binary_bitmaps
349 .as_any()
350 .downcast_ref::<BinaryArray>()
351 .unwrap();
352
353 for idx in 0..chunk.num_rows() {
354 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
355
356 if key.0.is_null() {
357 continue;
358 }
359
360 let bitmap_bytes = bitmap_binary_array.value(idx);
361 let mut bitmap = RowIdTreeMap::deserialize_from(bitmap_bytes).unwrap();
362
363 if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
364 bitmap = frag_reuse_index_ref.remap_row_ids_tree_map(&bitmap);
365 }
366
367 let cache_key = BitmapKey { value: key };
368 self.index_cache
369 .insert_with_key(&cache_key, Arc::new(bitmap))
370 .await;
371 }
372 }
373
374 Ok(())
375 }
376
377 fn index_type(&self) -> IndexType {
378 IndexType::Bitmap
379 }
380
381 fn statistics(&self) -> Result<serde_json::Value> {
382 let stats = BitmapStatistics {
383 num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
384 };
385 serde_json::to_value(stats).map_err(|e| Error::Internal {
386 message: format!("failed to serialize bitmap index statistics: {}", e),
387 location: location!(),
388 })
389 }
390
391 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
392 unimplemented!()
393 }
394}
395
396#[async_trait]
397impl ScalarIndex for BitmapIndex {
398 #[instrument(name = "bitmap_search", level = "debug", skip_all)]
399 async fn search(
400 &self,
401 query: &dyn AnyQuery,
402 metrics: &dyn MetricsCollector,
403 ) -> Result<SearchResult> {
404 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
405
406 let row_ids = match query {
407 SargableQuery::Equals(val) => {
408 metrics.record_comparisons(1);
409 if val.is_null() {
410 (*self.null_map).clone()
411 } else {
412 let key = OrderableScalarValue(val.clone());
413 let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
414 (*bitmap).clone()
415 }
416 }
417 SargableQuery::Range(start, end) => {
418 let range_start = match start {
419 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
420 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
421 Bound::Unbounded => Bound::Unbounded,
422 };
423
424 let range_end = match end {
425 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
426 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
427 Bound::Unbounded => Bound::Unbounded,
428 };
429
430 let keys: Vec<_> = self
431 .index_map
432 .range((range_start, range_end))
433 .map(|(k, _v)| k.clone())
434 .collect();
435
436 metrics.record_comparisons(keys.len());
437
438 if keys.is_empty() {
439 RowIdTreeMap::default()
440 } else {
441 let bitmaps: Vec<_> = stream::iter(
442 keys.into_iter()
443 .map(|key| async move { self.load_bitmap(&key, None).await }),
444 )
445 .buffer_unordered(get_num_compute_intensive_cpus())
446 .try_collect()
447 .await?;
448
449 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
450 RowIdTreeMap::union_all(&bitmap_refs)
451 }
452 }
453 SargableQuery::IsIn(values) => {
454 metrics.record_comparisons(values.len());
455
456 let mut has_null = false;
458 let keys: Vec<_> = values
459 .iter()
460 .filter_map(|val| {
461 if val.is_null() {
462 has_null = true;
463 None
464 } else {
465 let key = OrderableScalarValue(val.clone());
466 if self.index_map.contains_key(&key) {
467 Some(key)
468 } else {
469 None
470 }
471 }
472 })
473 .collect();
474
475 if keys.is_empty() && (!has_null || self.null_map.is_empty()) {
476 RowIdTreeMap::default()
477 } else {
478 let mut bitmaps: Vec<_> = stream::iter(
480 keys.into_iter()
481 .map(|key| async move { self.load_bitmap(&key, None).await }),
482 )
483 .buffer_unordered(get_num_compute_intensive_cpus())
484 .try_collect()
485 .await?;
486
487 if has_null && !self.null_map.is_empty() {
489 bitmaps.push(self.null_map.clone());
490 }
491
492 if bitmaps.is_empty() {
493 RowIdTreeMap::default()
494 } else {
495 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
497 RowIdTreeMap::union_all(&bitmap_refs)
498 }
499 }
500 }
501 SargableQuery::IsNull() => {
502 metrics.record_comparisons(1);
503 (*self.null_map).clone()
504 }
505 SargableQuery::FullTextSearch(_) => {
506 return Err(Error::NotSupported {
507 source: "full text search is not supported for bitmap indexes".into(),
508 location: location!(),
509 });
510 }
511 };
512
513 Ok(SearchResult::Exact(row_ids))
514 }
515
516 fn can_remap(&self) -> bool {
517 true
518 }
519
520 async fn remap(
522 &self,
523 mapping: &HashMap<u64, Option<u64>>,
524 dest_store: &dyn IndexStore,
525 ) -> Result<CreatedIndex> {
526 let mut state = HashMap::new();
527
528 for key in self.index_map.keys() {
529 let bitmap = self.load_bitmap(key, None).await?;
530 let remapped_bitmap =
531 RowIdTreeMap::from_iter(bitmap.row_ids().unwrap().filter_map(|addr| {
532 let addr_as_u64 = u64::from(addr);
533 mapping
534 .get(&addr_as_u64)
535 .copied()
536 .unwrap_or(Some(addr_as_u64))
537 }));
538 state.insert(key.0.clone(), remapped_bitmap);
539 }
540
541 if !self.null_map.is_empty() {
542 let remapped_null =
543 RowIdTreeMap::from_iter(self.null_map.row_ids().unwrap().filter_map(|addr| {
544 let addr_as_u64 = u64::from(addr);
545 mapping
546 .get(&addr_as_u64)
547 .copied()
548 .unwrap_or(Some(addr_as_u64))
549 }));
550 state.insert(ScalarValue::try_from(&self.value_type)?, remapped_null);
551 }
552
553 BitmapIndexPlugin::write_bitmap_index(state, dest_store, &self.value_type).await?;
554
555 Ok(CreatedIndex {
556 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
557 .unwrap(),
558 index_version: BITMAP_INDEX_VERSION,
559 })
560 }
561
562 async fn update(
564 &self,
565 new_data: SendableRecordBatchStream,
566 dest_store: &dyn IndexStore,
567 ) -> Result<CreatedIndex> {
568 let mut state = HashMap::new();
569
570 for key in self.index_map.keys() {
572 let bitmap = self.load_bitmap(key, None).await?;
573 state.insert(key.0.clone(), (*bitmap).clone());
574 }
575
576 if !self.null_map.is_empty() {
577 let ex_null = new_null_array(&self.value_type, 1);
578 let ex_null = ScalarValue::try_from_array(ex_null.as_ref(), 0)?;
579 state.insert(ex_null, (*self.null_map).clone());
580 }
581
582 BitmapIndexPlugin::do_train_bitmap_index(new_data, state, dest_store).await?;
583
584 Ok(CreatedIndex {
585 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
586 .unwrap(),
587 index_version: BITMAP_INDEX_VERSION,
588 })
589 }
590
591 fn update_criteria(&self) -> UpdateCriteria {
592 UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None).with_row_id())
593 }
594
595 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
596 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
597 }
598}
599
600#[derive(Debug, Default)]
601pub struct BitmapIndexPlugin;
602
603impl BitmapIndexPlugin {
604 fn get_batch_from_arrays(
605 keys: Arc<dyn Array>,
606 binary_bitmaps: Arc<dyn Array>,
607 ) -> Result<RecordBatch> {
608 let schema = Arc::new(Schema::new(vec![
609 Field::new("keys", keys.data_type().clone(), true),
610 Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
611 ]));
612
613 let columns = vec![keys, binary_bitmaps];
614
615 Ok(RecordBatch::try_new(schema, columns)?)
616 }
617
618 async fn write_bitmap_index(
619 state: HashMap<ScalarValue, RowIdTreeMap>,
620 index_store: &dyn IndexStore,
621 value_type: &DataType,
622 ) -> Result<()> {
623 let schema = Arc::new(Schema::new(vec![
624 Field::new("keys", value_type.clone(), true),
625 Field::new("bitmaps", DataType::Binary, true),
626 ]));
627
628 let mut bitmap_index_file = index_store
629 .new_index_file(BITMAP_LOOKUP_NAME, schema)
630 .await?;
631
632 let mut cur_keys = Vec::new();
633 let mut cur_bitmaps = Vec::new();
634 let mut cur_bytes = 0;
635
636 for (key, bitmap) in state.into_iter() {
637 let mut bytes = Vec::new();
638 bitmap.serialize_into(&mut bytes).unwrap();
639 let bitmap_size = bytes.len();
640
641 if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
642 let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
643 let mut binary_builder = BinaryBuilder::new();
644 for b in &cur_bitmaps {
645 binary_builder.append_value(b);
646 }
647 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
648
649 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
650 bitmap_index_file.write_record_batch(record_batch).await?;
651
652 cur_keys.clear();
653 cur_bitmaps.clear();
654 cur_bytes = 0;
655 }
656
657 cur_keys.push(key);
658 cur_bitmaps.push(bytes);
659 cur_bytes += bitmap_size;
660 }
661
662 if !cur_keys.is_empty() {
664 let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
665 let mut binary_builder = BinaryBuilder::new();
666 for b in &cur_bitmaps {
667 binary_builder.append_value(b);
668 }
669 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
670
671 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
672 bitmap_index_file.write_record_batch(record_batch).await?;
673 }
674
675 bitmap_index_file.finish().await?;
677
678 Ok(())
679 }
680
681 async fn do_train_bitmap_index(
682 mut data_source: SendableRecordBatchStream,
683 mut state: HashMap<ScalarValue, RowIdTreeMap>,
684 index_store: &dyn IndexStore,
685 ) -> Result<()> {
686 let value_type = data_source.schema().field(0).data_type().clone();
687 while let Some(batch) = data_source.try_next().await? {
688 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
689 let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
690 debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
691
692 let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
693
694 for i in 0..values.len() {
695 let row_id = row_id_column.value(i);
696 let key = ScalarValue::try_from_array(values.as_ref(), i)?;
697 state.entry(key.clone()).or_default().insert(row_id);
698 }
699 }
700
701 Self::write_bitmap_index(state, index_store, &value_type).await
702 }
703
704 pub async fn train_bitmap_index(
705 data: SendableRecordBatchStream,
706 index_store: &dyn IndexStore,
707 ) -> Result<()> {
708 let dictionary: HashMap<ScalarValue, RowIdTreeMap> = HashMap::new();
710
711 Self::do_train_bitmap_index(data, dictionary, index_store).await
712 }
713}
714
715#[async_trait]
716impl ScalarIndexPlugin for BitmapIndexPlugin {
717 fn name(&self) -> &str {
718 "Bitmap"
719 }
720
721 fn new_training_request(
722 &self,
723 _params: &str,
724 field: &Field,
725 ) -> Result<Box<dyn TrainingRequest>> {
726 if field.data_type().is_nested() {
727 return Err(Error::InvalidInput {
728 source: "A bitmap index can only be created on a non-nested field.".into(),
729 location: location!(),
730 });
731 }
732 Ok(Box::new(DefaultTrainingRequest::new(
733 TrainingCriteria::new(TrainingOrdering::None).with_row_id(),
734 )))
735 }
736
737 fn provides_exact_answer(&self) -> bool {
738 true
739 }
740
741 fn version(&self) -> u32 {
742 BITMAP_INDEX_VERSION
743 }
744
745 fn new_query_parser(
746 &self,
747 index_name: String,
748 _index_details: &prost_types::Any,
749 ) -> Option<Box<dyn ScalarQueryParser>> {
750 Some(Box::new(SargableQueryParser::new(index_name, false)))
751 }
752
753 async fn train_index(
754 &self,
755 data: SendableRecordBatchStream,
756 index_store: &dyn IndexStore,
757 _request: Box<dyn TrainingRequest>,
758 fragment_ids: Option<Vec<u32>>,
759 ) -> Result<CreatedIndex> {
760 if fragment_ids.is_some() {
761 return Err(Error::InvalidInput {
762 source: "Bitmap index does not support fragment training".into(),
763 location: location!(),
764 });
765 }
766
767 Self::train_bitmap_index(data, index_store).await?;
768 Ok(CreatedIndex {
769 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
770 .unwrap(),
771 index_version: BITMAP_INDEX_VERSION,
772 })
773 }
774
775 async fn load_index(
777 &self,
778 index_store: Arc<dyn IndexStore>,
779 _index_details: &prost_types::Any,
780 frag_reuse_index: Option<Arc<FragReuseIndex>>,
781 cache: &LanceCache,
782 ) -> Result<Arc<dyn ScalarIndex>> {
783 Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
784 }
785}
786
787#[cfg(test)]
788pub mod tests {
789 use super::*;
790 use crate::metrics::NoOpMetricsCollector;
791 use crate::scalar::lance_format::LanceIndexStore;
792 use arrow_array::{RecordBatch, StringArray, UInt64Array};
793 use arrow_schema::{Field, Schema};
794 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
795 use futures::stream;
796 use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
797 use lance_io::object_store::ObjectStore;
798
799 #[tokio::test]
800 async fn test_bitmap_lazy_loading_and_cache() {
801 let tmpdir = TempObjDir::default();
803 let store = Arc::new(LanceIndexStore::new(
804 Arc::new(ObjectStore::local()),
805 tmpdir.clone(),
806 Arc::new(LanceCache::no_cache()),
807 ));
808
809 let colors = vec![
811 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
812 "red", "red", "blue", "green", "yellow",
813 ];
814
815 let row_ids = (0u64..15u64).collect::<Vec<_>>();
816
817 let schema = Arc::new(Schema::new(vec![
818 Field::new("value", DataType::Utf8, false),
819 Field::new("_rowid", DataType::UInt64, false),
820 ]));
821
822 let batch = RecordBatch::try_new(
823 schema.clone(),
824 vec![
825 Arc::new(StringArray::from(colors.clone())),
826 Arc::new(UInt64Array::from(row_ids.clone())),
827 ],
828 )
829 .unwrap();
830
831 let stream = stream::once(async move { Ok(batch) });
832 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
833
834 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
836 .await
837 .unwrap();
838
839 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
844 .await
845 .unwrap();
846
847 assert_eq!(index.index_map.len(), 4); assert!(index.null_map.is_empty()); let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
852 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
853
854 let expected_red_rows = vec![0u64, 3, 6, 10, 11];
856 if let SearchResult::Exact(row_ids) = result {
857 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
858 actual.sort();
859 assert_eq!(actual, expected_red_rows);
860 } else {
861 panic!("Expected exact search result");
862 }
863
864 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
866 if let SearchResult::Exact(row_ids) = result {
867 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
868 actual.sort();
869 assert_eq!(actual, expected_red_rows);
870 }
871
872 let query = SargableQuery::Range(
874 std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
875 std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
876 );
877 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
878
879 let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
880 if let SearchResult::Exact(row_ids) = result {
881 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
882 actual.sort();
883 assert_eq!(actual, expected_range_rows);
884 }
885
886 let query = SargableQuery::IsIn(vec![
888 ScalarValue::Utf8(Some("red".to_string())),
889 ScalarValue::Utf8(Some("yellow".to_string())),
890 ]);
891 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
892
893 let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
894 if let SearchResult::Exact(row_ids) = result {
895 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
896 actual.sort();
897 assert_eq!(actual, expected_in_rows);
898 }
899 }
900
901 #[tokio::test]
902 #[ignore]
903 async fn test_big_bitmap_index() {
904 use super::{BitmapIndex, BITMAP_LOOKUP_NAME};
907 use crate::scalar::lance_format::LanceIndexStore;
908 use crate::scalar::IndexStore;
909 use arrow_schema::DataType;
910 use datafusion_common::ScalarValue;
911 use lance_core::cache::LanceCache;
912 use lance_core::utils::mask::RowIdTreeMap;
913 use lance_io::object_store::ObjectStore;
914 use std::collections::HashMap;
915 use std::sync::Arc;
916
917 let m: u32 = 2_500_000;
923 let per_bitmap_size = 1000; let mut state = HashMap::new();
926 for i in 0..m {
927 let bitmap = RowIdTreeMap::from_iter(0..per_bitmap_size);
929
930 let key = ScalarValue::UInt32(Some(i));
931 state.insert(key, bitmap);
932 }
933
934 let tmpdir = TempObjDir::default();
936 let test_store = LanceIndexStore::new(
937 Arc::new(ObjectStore::local()),
938 tmpdir.clone(),
939 Arc::new(LanceCache::no_cache()),
940 );
941
942 let result =
945 BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
946
947 assert!(
948 result.is_ok(),
949 "Failed to write bitmap index: {:?}",
950 result.err()
951 );
952
953 let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
955 assert!(
956 index_file.is_ok(),
957 "Failed to open index file: {:?}",
958 index_file.err()
959 );
960 let index_file = index_file.unwrap();
961
962 tracing::info!(
964 "Index file contains {} rows in total",
965 index_file.num_rows()
966 );
967
968 tracing::info!("Loading index from disk...");
970 let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
971 .await
972 .expect("Failed to load bitmap index");
973
974 assert_eq!(
976 loaded_index.index_map.len(),
977 m as usize,
978 "Loaded index has incorrect number of keys (expected {}, got {})",
979 m,
980 loaded_index.index_map.len()
981 );
982
983 let test_keys = [0, m / 2, m - 1]; for &key_val in &test_keys {
986 let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
987 let bitmap = loaded_index
989 .load_bitmap(&key, None)
990 .await
991 .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
992
993 let row_ids: Vec<u64> = bitmap.row_ids().unwrap().map(u64::from).collect();
995
996 assert_eq!(
998 row_ids.len(),
999 per_bitmap_size as usize,
1000 "Bitmap for key {} has wrong size",
1001 key_val
1002 );
1003
1004 for i in 0..5.min(per_bitmap_size) {
1006 assert!(
1007 row_ids.contains(&i),
1008 "Bitmap for key {} should contain row_id {}",
1009 key_val,
1010 i
1011 );
1012 }
1013
1014 for i in (per_bitmap_size - 5)..per_bitmap_size {
1015 assert!(
1016 row_ids.contains(&i),
1017 "Bitmap for key {} should contain row_id {}",
1018 key_val,
1019 i
1020 );
1021 }
1022
1023 let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
1025 assert_eq!(
1026 row_ids, expected_range,
1027 "Bitmap for key {} doesn't contain expected values",
1028 key_val
1029 );
1030
1031 tracing::info!(
1032 "✓ Verified bitmap for key {}: {} rows as expected",
1033 key_val,
1034 row_ids.len()
1035 );
1036 }
1037
1038 tracing::info!("Test successful! Index properly contains {} keys", m);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_bitmap_prewarm() {
1043 let tmpdir = TempObjDir::default();
1045 let store = Arc::new(LanceIndexStore::new(
1046 Arc::new(ObjectStore::local()),
1047 tmpdir.clone(),
1048 Arc::new(LanceCache::no_cache()),
1049 ));
1050
1051 let colors = vec![
1053 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1054 "red", "red", "blue", "green", "yellow",
1055 ];
1056
1057 let row_ids = (0u64..15u64).collect::<Vec<_>>();
1058
1059 let schema = Arc::new(Schema::new(vec![
1060 Field::new("value", DataType::Utf8, false),
1061 Field::new("_rowid", DataType::UInt64, false),
1062 ]));
1063
1064 let batch = RecordBatch::try_new(
1065 schema.clone(),
1066 vec![
1067 Arc::new(StringArray::from(colors.clone())),
1068 Arc::new(UInt64Array::from(row_ids.clone())),
1069 ],
1070 )
1071 .unwrap();
1072
1073 let stream = stream::once(async move { Ok(batch) });
1074 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1075
1076 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1078 .await
1079 .unwrap();
1080
1081 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
1086 .await
1087 .unwrap();
1088
1089 let cache_key_red = BitmapKey {
1091 value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
1092 };
1093 let cache_key_blue = BitmapKey {
1094 value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
1095 };
1096
1097 assert!(cache
1098 .get_with_key::<BitmapKey>(&cache_key_red)
1099 .await
1100 .is_none());
1101 assert!(cache
1102 .get_with_key::<BitmapKey>(&cache_key_blue)
1103 .await
1104 .is_none());
1105
1106 index.prewarm().await.unwrap();
1108
1109 assert!(cache
1111 .get_with_key::<BitmapKey>(&cache_key_red)
1112 .await
1113 .is_some());
1114 assert!(cache
1115 .get_with_key::<BitmapKey>(&cache_key_blue)
1116 .await
1117 .is_some());
1118
1119 let cached_red = cache
1121 .get_with_key::<BitmapKey>(&cache_key_red)
1122 .await
1123 .unwrap();
1124 let red_rows: Vec<u64> = cached_red.row_ids().unwrap().map(u64::from).collect();
1125 assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
1126
1127 index.prewarm().await.unwrap();
1129
1130 let cached_red_2 = cache
1132 .get_with_key::<BitmapKey>(&cache_key_red)
1133 .await
1134 .unwrap();
1135 let red_rows_2: Vec<u64> = cached_red_2.row_ids().unwrap().map(u64::from).collect();
1136 assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
1137 }
1138
1139 #[tokio::test]
1140 async fn test_remap_bitmap_with_null() {
1141 use arrow_array::UInt32Array;
1142
1143 let tmpdir = TempObjDir::default();
1145 let test_store = Arc::new(LanceIndexStore::new(
1146 Arc::new(ObjectStore::local()),
1147 tmpdir.clone(),
1148 Arc::new(LanceCache::no_cache()),
1149 ));
1150
1151 let values = vec![
1156 None, None, Some(1u32), Some(1u32), Some(2u32), Some(2u32), ];
1163
1164 let row_ids: Vec<u64> = vec![
1166 RowAddress::new_from_parts(1, 0).into(),
1167 RowAddress::new_from_parts(1, 1).into(),
1168 RowAddress::new_from_parts(1, 2).into(),
1169 RowAddress::new_from_parts(2, 0).into(),
1170 RowAddress::new_from_parts(2, 1).into(),
1171 RowAddress::new_from_parts(2, 2).into(),
1172 ];
1173
1174 let schema = Arc::new(Schema::new(vec![
1175 Field::new("value", DataType::UInt32, true),
1176 Field::new("_rowid", DataType::UInt64, false),
1177 ]));
1178
1179 let batch = RecordBatch::try_new(
1180 schema.clone(),
1181 vec![
1182 Arc::new(UInt32Array::from(values)),
1183 Arc::new(UInt64Array::from(row_ids)),
1184 ],
1185 )
1186 .unwrap();
1187
1188 let stream = stream::once(async move { Ok(batch) });
1189 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1190
1191 BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
1193 .await
1194 .unwrap();
1195
1196 let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1198 .await
1199 .expect("Failed to load bitmap index");
1200
1201 assert_eq!(index.index_map.len(), 2); assert!(!index.null_map.is_empty()); let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
1207 row_addr_map.insert(
1208 RowAddress::new_from_parts(1, 0).into(),
1209 Some(RowAddress::new_from_parts(3, 0).into()),
1210 );
1211 row_addr_map.insert(
1212 RowAddress::new_from_parts(1, 1).into(),
1213 Some(RowAddress::new_from_parts(3, 1).into()),
1214 );
1215 row_addr_map.insert(
1216 RowAddress::new_from_parts(1, 2).into(),
1217 Some(RowAddress::new_from_parts(3, 2).into()),
1218 );
1219 row_addr_map.insert(
1220 RowAddress::new_from_parts(2, 0).into(),
1221 Some(RowAddress::new_from_parts(3, 3).into()),
1222 );
1223 row_addr_map.insert(
1224 RowAddress::new_from_parts(2, 1).into(),
1225 Some(RowAddress::new_from_parts(3, 4).into()),
1226 );
1227 row_addr_map.insert(
1228 RowAddress::new_from_parts(2, 2).into(),
1229 Some(RowAddress::new_from_parts(3, 5).into()),
1230 );
1231
1232 index
1234 .remap(&row_addr_map, test_store.as_ref())
1235 .await
1236 .unwrap();
1237
1238 let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
1240 .await
1241 .expect("Failed to load remapped bitmap index");
1242
1243 let expected_null_addrs: Vec<u64> = vec![
1245 RowAddress::new_from_parts(3, 0).into(),
1246 RowAddress::new_from_parts(3, 1).into(),
1247 ];
1248 let actual_null_addrs: Vec<u64> = reloaded_idx
1249 .null_map
1250 .row_ids()
1251 .unwrap()
1252 .map(u64::from)
1253 .collect();
1254 assert_eq!(
1255 actual_null_addrs, expected_null_addrs,
1256 "Null bitmap not remapped correctly"
1257 );
1258
1259 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
1261 let result = reloaded_idx
1262 .search(&query, &NoOpMetricsCollector)
1263 .await
1264 .unwrap();
1265 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1266 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(u64::from).collect();
1267 actual.sort();
1268 let expected: Vec<u64> = vec![
1269 RowAddress::new_from_parts(3, 2).into(),
1270 RowAddress::new_from_parts(3, 3).into(),
1271 ];
1272 assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
1273 }
1274
1275 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
1277 let result = reloaded_idx
1278 .search(&query, &NoOpMetricsCollector)
1279 .await
1280 .unwrap();
1281 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1282 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(u64::from).collect();
1283 actual.sort();
1284 let expected: Vec<u64> = vec![
1285 RowAddress::new_from_parts(3, 4).into(),
1286 RowAddress::new_from_parts(3, 5).into(),
1287 ];
1288 assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
1289 }
1290
1291 let query = SargableQuery::IsNull();
1293 let result = reloaded_idx
1294 .search(&query, &NoOpMetricsCollector)
1295 .await
1296 .unwrap();
1297 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1298 let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(u64::from).collect();
1299 actual.sort();
1300 assert_eq!(
1301 actual, expected_null_addrs,
1302 "Null search results not correct"
1303 );
1304 }
1305 }
1306}