1use std::{
5 any::Any,
6 collections::{BTreeMap, HashMap},
7 fmt::Debug,
8 ops::Bound,
9 sync::Arc,
10};
11
12use arrow::array::BinaryBuilder;
13use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array, new_null_array};
14use arrow_schema::{DataType, Field, Schema};
15use async_trait::async_trait;
16use bytes::Bytes;
17use datafusion::physical_plan::SendableRecordBatchStream;
18use datafusion_common::ScalarValue;
19use deepsize::DeepSizeOf;
20use futures::{StreamExt, TryStreamExt, stream};
21use lance_core::utils::mask::RowSetOps;
22use lance_core::{
23 Error, ROW_ID, Result,
24 cache::{CacheKey, LanceCache, WeakLanceCache},
25 error::LanceOptionExt,
26 utils::{
27 mask::{NullableRowAddrSet, RowAddrTreeMap},
28 tokio::get_num_compute_intensive_cpus,
29 },
30};
31use roaring::RoaringBitmap;
32use serde::Serialize;
33use tracing::instrument;
34
35use super::{AnyQuery, IndexStore, ScalarIndex};
36use super::{
37 BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue,
38};
39use crate::pbold;
40use crate::{Index, IndexType, metrics::MetricsCollector};
41use crate::{
42 frag_reuse::FragReuseIndex,
43 scalar::{
44 CreatedIndex, UpdateCriteria,
45 expression::SargableQueryParser,
46 registry::{
47 DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
48 TrainingRequest, VALUE_COLUMN_NAME,
49 },
50 },
51};
52use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser};
53
54pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
55pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
56
57const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
60
61const BITMAP_INDEX_VERSION: u32 = 0;
62
63#[derive(Clone)]
66struct LazyIndexReader {
67 index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
68 store: Arc<dyn IndexStore>,
69}
70
71impl std::fmt::Debug for LazyIndexReader {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("LazyIndexReader")
74 .field("store", &self.store)
75 .finish()
76 }
77}
78
79impl LazyIndexReader {
80 fn new(store: Arc<dyn IndexStore>) -> Self {
81 Self {
82 index_reader: Arc::new(tokio::sync::Mutex::new(None)),
83 store,
84 }
85 }
86
87 async fn get(&self) -> Result<Arc<dyn IndexReader>> {
88 let mut reader = self.index_reader.lock().await;
89 if reader.is_none() {
90 let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
91 *reader = Some(index_reader);
92 }
93 Ok(reader.as_ref().unwrap().clone())
94 }
95}
96
97#[derive(Clone, Debug)]
102pub struct BitmapIndex {
103 index_map: BTreeMap<OrderableScalarValue, usize>,
107
108 null_map: Arc<RowAddrTreeMap>,
109
110 value_type: DataType,
111
112 store: Arc<dyn IndexStore>,
113
114 index_cache: WeakLanceCache,
115
116 frag_reuse_index: Option<Arc<FragReuseIndex>>,
117
118 lazy_reader: LazyIndexReader,
119}
120
121#[derive(Debug, Clone)]
122pub struct BitmapKey {
123 value: OrderableScalarValue,
124}
125
126impl CacheKey for BitmapKey {
127 type ValueType = RowAddrTreeMap;
128
129 fn key(&self) -> std::borrow::Cow<'_, str> {
130 format!("{}", self.value.0).into()
131 }
132
133 fn type_name() -> &'static str {
134 "Bitmap"
135 }
136}
137
138impl BitmapIndex {
139 fn new(
140 index_map: BTreeMap<OrderableScalarValue, usize>,
141 null_map: Arc<RowAddrTreeMap>,
142 value_type: DataType,
143 store: Arc<dyn IndexStore>,
144 index_cache: WeakLanceCache,
145 frag_reuse_index: Option<Arc<FragReuseIndex>>,
146 ) -> Self {
147 let lazy_reader = LazyIndexReader::new(store.clone());
148 Self {
149 index_map,
150 null_map,
151 value_type,
152 store,
153 index_cache,
154 frag_reuse_index,
155 lazy_reader,
156 }
157 }
158
159 pub(crate) async fn load(
160 store: Arc<dyn IndexStore>,
161 frag_reuse_index: Option<Arc<FragReuseIndex>>,
162 index_cache: &LanceCache,
163 ) -> Result<Arc<Self>> {
164 let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
165 let total_rows = page_lookup_file.num_rows();
166
167 if total_rows == 0 {
168 let schema = page_lookup_file.schema();
169 let data_type = schema.fields[0].data_type();
170 return Ok(Arc::new(Self::new(
171 BTreeMap::new(),
172 Arc::new(RowAddrTreeMap::default()),
173 data_type,
174 store,
175 WeakLanceCache::from(index_cache),
176 frag_reuse_index,
177 )));
178 }
179
180 let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
181 let mut null_map = Arc::new(RowAddrTreeMap::default());
182 let mut value_type: Option<DataType> = None;
183 let mut null_location: Option<usize> = None;
184 let mut row_offset = 0;
185
186 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
187 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
188 let chunk = page_lookup_file
189 .read_range(start_row..end_row, Some(&["keys"]))
190 .await?;
191
192 if chunk.num_rows() == 0 {
193 continue;
194 }
195
196 if value_type.is_none() {
197 value_type = Some(chunk.schema().field(0).data_type().clone());
198 }
199
200 let dict_keys = chunk.column(0);
201
202 for idx in 0..chunk.num_rows() {
203 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
204
205 if key.0.is_null() {
206 null_location = Some(row_offset);
207 } else {
208 index_map.insert(key, row_offset);
209 }
210
211 row_offset += 1;
212 }
213 }
214
215 if let Some(null_loc) = null_location {
216 let batch = page_lookup_file
217 .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
218 .await?;
219
220 let binary_bitmaps = batch
221 .column(0)
222 .as_any()
223 .downcast_ref::<BinaryArray>()
224 .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
225 let bitmap_bytes = binary_bitmaps.value(0);
226 let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
227
228 if let Some(fri) = &frag_reuse_index {
230 bitmap = fri.remap_row_addrs_tree_map(&bitmap);
231 }
232
233 null_map = Arc::new(bitmap);
234 }
235
236 let final_value_type = value_type.expect_ok()?;
237
238 Ok(Arc::new(Self::new(
239 index_map,
240 null_map,
241 final_value_type,
242 store,
243 WeakLanceCache::from(index_cache),
244 frag_reuse_index,
245 )))
246 }
247
248 async fn load_bitmap(
249 &self,
250 key: &OrderableScalarValue,
251 metrics: Option<&dyn MetricsCollector>,
252 ) -> Result<Arc<RowAddrTreeMap>> {
253 if key.0.is_null() {
254 return Ok(self.null_map.clone());
255 }
256
257 let cache_key = BitmapKey { value: key.clone() };
258
259 if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
260 return Ok(cached);
261 }
262
263 if let Some(metrics) = metrics {
265 metrics.record_part_load();
266 }
267
268 let row_offset = match self.index_map.get(key) {
269 Some(loc) => *loc,
270 None => return Ok(Arc::new(RowAddrTreeMap::default())),
271 };
272
273 let page_lookup_file = self.lazy_reader.get().await?;
274 let batch = page_lookup_file
275 .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
276 .await?;
277
278 let binary_bitmaps = batch
279 .column(0)
280 .as_any()
281 .downcast_ref::<BinaryArray>()
282 .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
283 let bitmap_bytes = binary_bitmaps.value(0); let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
285
286 if let Some(fri) = &self.frag_reuse_index {
287 bitmap = fri.remap_row_addrs_tree_map(&bitmap);
288 }
289
290 self.index_cache
291 .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
292 .await;
293
294 Ok(Arc::new(bitmap))
295 }
296
297 pub(crate) fn value_type(&self) -> &DataType {
298 &self.value_type
299 }
300
301 pub(crate) async fn load_bitmap_index_state(
303 &self,
304 ) -> Result<HashMap<ScalarValue, RowAddrTreeMap>> {
305 let mut state = HashMap::new();
306
307 for key in self.index_map.keys() {
308 let bitmap = self.load_bitmap(key, None).await?;
309 state.insert(key.0.clone(), (*bitmap).clone());
310 }
311
312 if !self.null_map.is_empty() {
313 let existing_null = new_null_array(&self.value_type, 1);
314 let existing_null = ScalarValue::try_from_array(existing_null.as_ref(), 0)?;
315 state.insert(existing_null, (*self.null_map).clone());
316 }
317
318 Ok(state)
319 }
320}
321
322impl DeepSizeOf for BitmapIndex {
323 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
324 let mut total_size = 0;
325
326 total_size += self.index_map.deep_size_of_children(context);
327 total_size += self.store.deep_size_of_children(context);
328
329 total_size
330 }
331}
332
333#[derive(Serialize)]
334struct BitmapStatistics {
335 num_bitmaps: usize,
336}
337
338#[async_trait]
339impl Index for BitmapIndex {
340 fn as_any(&self) -> &dyn Any {
341 self
342 }
343
344 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
345 self
346 }
347
348 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
349 Err(Error::not_supported_source(
350 "BitmapIndex is not a vector index".into(),
351 ))
352 }
353
354 async fn prewarm(&self) -> Result<()> {
355 let page_lookup_file = self.lazy_reader.get().await?;
356 let total_rows = page_lookup_file.num_rows();
357
358 if total_rows == 0 {
359 return Ok(());
360 }
361
362 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
363 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
364 let chunk = page_lookup_file
365 .read_range(start_row..end_row, None)
366 .await?;
367
368 if chunk.num_rows() == 0 {
369 continue;
370 }
371
372 let dict_keys = chunk.column(0);
373 let binary_bitmaps = chunk.column(1);
374 let bitmap_binary_array = binary_bitmaps
375 .as_any()
376 .downcast_ref::<BinaryArray>()
377 .unwrap();
378
379 for idx in 0..chunk.num_rows() {
380 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
381
382 if key.0.is_null() {
383 continue;
384 }
385
386 let bitmap_bytes = bitmap_binary_array.value(idx);
387 let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
388
389 if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
390 bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
391 }
392
393 let cache_key = BitmapKey { value: key };
394 self.index_cache
395 .insert_with_key(&cache_key, Arc::new(bitmap))
396 .await;
397 }
398 }
399
400 Ok(())
401 }
402
403 fn index_type(&self) -> IndexType {
404 IndexType::Bitmap
405 }
406
407 fn statistics(&self) -> Result<serde_json::Value> {
408 let stats = BitmapStatistics {
409 num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
410 };
411 serde_json::to_value(stats).map_err(|e| {
412 Error::internal(format!(
413 "failed to serialize bitmap index statistics: {}",
414 e
415 ))
416 })
417 }
418
419 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
420 unimplemented!()
421 }
422}
423
424#[async_trait]
425impl ScalarIndex for BitmapIndex {
426 #[instrument(name = "bitmap_search", level = "debug", skip_all)]
427 async fn search(
428 &self,
429 query: &dyn AnyQuery,
430 metrics: &dyn MetricsCollector,
431 ) -> Result<SearchResult> {
432 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
433
434 let (row_ids, null_row_ids) = match query {
435 SargableQuery::Equals(val) => {
436 metrics.record_comparisons(1);
437 if val.is_null() {
438 ((*self.null_map).clone(), None)
440 } else {
441 let key = OrderableScalarValue(val.clone());
442 let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
443 let null_rows = if !self.null_map.is_empty() {
444 Some((*self.null_map).clone())
445 } else {
446 None
447 };
448 ((*bitmap).clone(), null_rows)
449 }
450 }
451 SargableQuery::Range(start, end) => {
452 let range_start = match start {
453 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
454 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
455 Bound::Unbounded => Bound::Unbounded,
456 };
457
458 let range_end = match end {
459 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
460 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
461 Bound::Unbounded => Bound::Unbounded,
462 };
463
464 let empty_range = match (&range_start, &range_end) {
466 (Bound::Included(lower), Bound::Included(upper)) => lower > upper,
467 (Bound::Included(lower), Bound::Excluded(upper))
468 | (Bound::Excluded(lower), Bound::Included(upper))
469 | (Bound::Excluded(lower), Bound::Excluded(upper)) => lower >= upper,
470 _ => false,
471 };
472
473 let keys: Vec<_> = if empty_range {
474 Vec::new()
475 } else {
476 self.index_map
477 .range((range_start, range_end))
478 .map(|(k, _v)| k.clone())
479 .collect()
480 };
481
482 metrics.record_comparisons(keys.len());
483
484 let result = if keys.is_empty() {
485 RowAddrTreeMap::default()
486 } else {
487 let bitmaps: Vec<_> = stream::iter(
488 keys.into_iter()
489 .map(|key| async move { self.load_bitmap(&key, None).await }),
490 )
491 .buffer_unordered(get_num_compute_intensive_cpus())
492 .try_collect()
493 .await?;
494
495 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
496 RowAddrTreeMap::union_all(&bitmap_refs)
497 };
498
499 let null_rows = if !self.null_map.is_empty() {
500 Some((*self.null_map).clone())
501 } else {
502 None
503 };
504 (result, null_rows)
505 }
506 SargableQuery::IsIn(values) => {
507 metrics.record_comparisons(values.len());
508
509 let mut has_null = false;
511 let keys: Vec<_> = values
512 .iter()
513 .filter_map(|val| {
514 if val.is_null() {
515 has_null = true;
516 None
517 } else {
518 let key = OrderableScalarValue(val.clone());
519 if self.index_map.contains_key(&key) {
520 Some(key)
521 } else {
522 None
523 }
524 }
525 })
526 .collect();
527
528 let mut bitmaps: Vec<_> = stream::iter(
530 keys.into_iter()
531 .map(|key| async move { self.load_bitmap(&key, None).await }),
532 )
533 .buffer_unordered(get_num_compute_intensive_cpus())
534 .try_collect()
535 .await?;
536
537 if has_null && !self.null_map.is_empty() {
539 bitmaps.push(self.null_map.clone());
540 }
541
542 let result = if bitmaps.is_empty() {
543 RowAddrTreeMap::default()
544 } else {
545 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
547 RowAddrTreeMap::union_all(&bitmap_refs)
548 };
549
550 let null_rows = if !has_null && !self.null_map.is_empty() {
553 Some((*self.null_map).clone())
554 } else {
555 None
556 };
557 (result, null_rows)
558 }
559 SargableQuery::IsNull() => {
560 metrics.record_comparisons(1);
561 ((*self.null_map).clone(), None)
563 }
564 SargableQuery::FullTextSearch(_) => {
565 return Err(Error::not_supported_source(
566 "full text search is not supported for bitmap indexes".into(),
567 ));
568 }
569 SargableQuery::LikePrefix(_) => {
570 return Err(Error::not_supported_source(
571 "LIKE prefix queries are not supported for bitmap indexes".into(),
572 ));
573 }
574 };
575
576 let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
577 Ok(SearchResult::Exact(selection))
578 }
579
580 fn can_remap(&self) -> bool {
581 true
582 }
583
584 async fn remap(
586 &self,
587 mapping: &HashMap<u64, Option<u64>>,
588 dest_store: &dyn IndexStore,
589 ) -> Result<CreatedIndex> {
590 let state = self.load_bitmap_index_state().await?;
591 let remapped_state = BitmapIndexPlugin::remap_bitmap_state(state, mapping);
592 BitmapIndexPlugin::write_bitmap_index(remapped_state, dest_store, &self.value_type).await?;
593
594 Ok(CreatedIndex {
595 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
596 .unwrap(),
597 index_version: BITMAP_INDEX_VERSION,
598 files: Some(dest_store.list_files_with_sizes().await?),
599 })
600 }
601
602 async fn update(
604 &self,
605 new_data: SendableRecordBatchStream,
606 dest_store: &dyn IndexStore,
607 _old_data_filter: Option<super::OldIndexDataFilter>,
608 ) -> Result<CreatedIndex> {
609 BitmapIndexPlugin::streaming_build_and_write(new_data, Some(self), dest_store).await?;
610
611 Ok(CreatedIndex {
612 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
613 .unwrap(),
614 index_version: BITMAP_INDEX_VERSION,
615 files: Some(dest_store.list_files_with_sizes().await?),
616 })
617 }
618
619 fn update_criteria(&self) -> UpdateCriteria {
620 UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
621 }
622
623 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
624 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
625 }
626}
627
628struct BitmapBatchWriter {
631 file: Box<dyn super::IndexWriter>,
632 keys: Vec<ScalarValue>,
633 serialized: Vec<Vec<u8>>,
634 bytes: usize,
635 num_bitmaps: usize,
636}
637
638impl BitmapBatchWriter {
639 fn new(file: Box<dyn super::IndexWriter>) -> Self {
640 Self {
641 file,
642 keys: Vec::new(),
643 serialized: Vec::new(),
644 bytes: 0,
645 num_bitmaps: 0,
646 }
647 }
648
649 async fn emit(&mut self, key: ScalarValue, bitmap: &RowAddrTreeMap) -> Result<()> {
652 let mut buf = Vec::new();
653 bitmap.serialize_into(&mut buf).unwrap();
654 let size = buf.len();
655
656 if self.bytes + size > MAX_BITMAP_ARRAY_LENGTH {
657 self.flush().await?;
658 }
659
660 self.keys.push(key);
661 self.serialized.push(buf);
662 self.bytes += size;
663 self.num_bitmaps += 1;
664 Ok(())
665 }
666
667 async fn flush(&mut self) -> Result<()> {
669 if self.keys.is_empty() {
670 return Ok(());
671 }
672 let keys_array =
673 ScalarValue::iter_to_array(self.keys.drain(..).collect::<Vec<_>>().into_iter())
674 .unwrap();
675 let total_size: usize = self.serialized.iter().map(|b| b.len()).sum();
676 let mut binary_builder = BinaryBuilder::with_capacity(self.serialized.len(), total_size);
677 for b in self.serialized.drain(..) {
678 binary_builder.append_value(&b);
679 }
680 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
681 let batch = BitmapIndexPlugin::get_batch_from_arrays(keys_array, bitmaps_array)?;
682 self.file.write_record_batch(batch).await?;
683 self.bytes = 0;
684 Ok(())
685 }
686
687 async fn finish(mut self) -> Result<()> {
689 self.flush().await?;
690 let stats_json = serde_json::to_string(&BitmapStatistics {
691 num_bitmaps: self.num_bitmaps,
692 })
693 .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
694 let mut metadata = HashMap::new();
695 metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
696 self.file.finish_with_metadata(metadata).await?;
697 Ok(())
698 }
699}
700
701#[derive(Debug, Default)]
702pub struct BitmapIndexPlugin;
703
704impl BitmapIndexPlugin {
705 fn get_batch_from_arrays(
706 keys: Arc<dyn Array>,
707 binary_bitmaps: Arc<dyn Array>,
708 ) -> Result<RecordBatch> {
709 let schema = Arc::new(Schema::new(vec![
710 Field::new("keys", keys.data_type().clone(), true),
711 Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
712 ]));
713
714 let columns = vec![keys, binary_bitmaps];
715
716 Ok(RecordBatch::try_new(schema, columns)?)
717 }
718
719 async fn write_bitmap_index(
720 state: HashMap<ScalarValue, RowAddrTreeMap>,
721 index_store: &dyn IndexStore,
722 value_type: &DataType,
723 ) -> Result<()> {
724 Self::write_bitmap_index_with_extras(
725 state,
726 index_store,
727 value_type,
728 HashMap::new(),
729 Vec::new(),
730 )
731 .await
732 }
733
734 pub(crate) async fn write_bitmap_index_with_extras(
736 state: HashMap<ScalarValue, RowAddrTreeMap>,
737 index_store: &dyn IndexStore,
738 value_type: &DataType,
739 mut metadata: HashMap<String, String>,
740 global_buffers: Vec<(String, Bytes)>,
741 ) -> Result<()> {
742 let num_bitmaps = state.len();
743 let schema = Arc::new(Schema::new(vec![
744 Field::new("keys", value_type.clone(), true),
745 Field::new("bitmaps", DataType::Binary, true),
746 ]));
747
748 let mut bitmap_index_file = index_store
749 .new_index_file(BITMAP_LOOKUP_NAME, schema)
750 .await?;
751
752 for (metadata_key, data) in global_buffers {
753 let buffer_idx = bitmap_index_file.add_global_buffer(data).await?;
754 metadata.insert(metadata_key, buffer_idx.to_string());
755 }
756
757 let mut cur_keys = Vec::new();
758 let mut cur_bitmaps = Vec::new();
759 let mut cur_bytes = 0;
760
761 for (key, bitmap) in state.into_iter() {
762 let mut bytes = Vec::new();
763 bitmap.serialize_into(&mut bytes).unwrap();
764 let bitmap_size = bytes.len();
765
766 if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
767 let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
768 let mut binary_builder = BinaryBuilder::new();
769 for b in &cur_bitmaps {
770 binary_builder.append_value(b);
771 }
772 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
773
774 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
775 bitmap_index_file.write_record_batch(record_batch).await?;
776
777 cur_keys.clear();
778 cur_bitmaps.clear();
779 cur_bytes = 0;
780 }
781
782 cur_keys.push(key);
783 cur_bitmaps.push(bytes);
784 cur_bytes += bitmap_size;
785 }
786
787 if !cur_keys.is_empty() {
789 let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
790 let mut binary_builder = BinaryBuilder::new();
791 for b in &cur_bitmaps {
792 binary_builder.append_value(b);
793 }
794 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
795
796 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
797 bitmap_index_file.write_record_batch(record_batch).await?;
798 }
799
800 let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps })
802 .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
803 metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
804
805 bitmap_index_file.finish_with_metadata(metadata).await?;
806
807 Ok(())
808 }
809
810 pub(crate) async fn build_bitmap_index_state(
812 mut data_source: SendableRecordBatchStream,
813 mut state: HashMap<ScalarValue, RowAddrTreeMap>,
814 ) -> Result<(HashMap<ScalarValue, RowAddrTreeMap>, DataType)> {
815 let value_type = data_source.schema().field(0).data_type().clone();
816 while let Some(batch) = data_source.try_next().await? {
817 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
818 let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
819 debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
820
821 let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
822
823 for i in 0..values.len() {
824 let row_id = row_id_column.value(i);
825 let key = ScalarValue::try_from_array(values.as_ref(), i)?;
826 state.entry(key.clone()).or_default().insert(row_id);
827 }
828 }
829
830 Ok((state, value_type))
831 }
832
833 pub async fn train_bitmap_index(
834 data: SendableRecordBatchStream,
835 index_store: &dyn IndexStore,
836 ) -> Result<()> {
837 Self::streaming_build_and_write(data, None, index_store).await
838 }
839
840 async fn streaming_build_and_write(
848 mut data_source: SendableRecordBatchStream,
849 old_index: Option<&BitmapIndex>,
850 index_store: &dyn IndexStore,
851 ) -> Result<()> {
852 let value_type = data_source.schema().field(0).data_type().clone();
853
854 let schema = Arc::new(Schema::new(vec![
855 Field::new("keys", value_type.clone(), true),
856 Field::new("bitmaps", DataType::Binary, true),
857 ]));
858
859 let index_file = index_store
860 .new_index_file(BITMAP_LOOKUP_NAME, schema)
861 .await?;
862 let mut writer = BitmapBatchWriter::new(index_file);
863
864 let old_keys: Vec<OrderableScalarValue> = old_index
867 .map(|idx| idx.index_map.keys().cloned().collect())
868 .unwrap_or_default();
869 let mut old_pos: usize = 0;
870
871 let mut current_key: Option<ScalarValue> = None;
873 let mut current_bitmap = RowAddrTreeMap::default();
874 let mut emitted_null = false;
877
878 while let Some(batch) = data_source.try_next().await? {
879 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
880 let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
881 debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
882 let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
883
884 for i in 0..values.len() {
885 let row_id = row_id_column.value(i);
886 let key = ScalarValue::try_from_array(values.as_ref(), i)?;
887
888 match ¤t_key {
889 Some(cur) if *cur == key => {
890 current_bitmap.insert(row_id);
891 }
892 _ => {
893 if let Some(prev_key) = current_key.take() {
895 let mut prev_bitmap = std::mem::take(&mut current_bitmap);
896 Self::finish_run(
897 prev_key,
898 &mut prev_bitmap,
899 old_index,
900 &old_keys,
901 &mut old_pos,
902 &mut emitted_null,
903 &mut writer,
904 )
905 .await?;
906 }
907 current_key = Some(key);
908 current_bitmap = RowAddrTreeMap::default();
909 current_bitmap.insert(row_id);
910 }
911 }
912 }
913 }
914
915 if let Some(last_key) = current_key.take() {
917 let mut last_bitmap = std::mem::take(&mut current_bitmap);
918 Self::finish_run(
919 last_key,
920 &mut last_bitmap,
921 old_index,
922 &old_keys,
923 &mut old_pos,
924 &mut emitted_null,
925 &mut writer,
926 )
927 .await?;
928 }
929
930 if let Some(idx) = old_index {
932 while old_pos < old_keys.len() {
933 let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?;
934 writer
935 .emit(old_keys[old_pos].0.clone(), &old_bitmap)
936 .await?;
937 old_pos += 1;
938 }
939 }
940
941 if !emitted_null
943 && let Some(idx) = old_index
944 && !idx.null_map.is_empty()
945 {
946 let null_key = new_null_array(&value_type, 1);
947 let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?;
948 writer.emit(null_key, &idx.null_map).await?;
949 }
950
951 writer.finish().await?;
952
953 Ok(())
954 }
955
956 async fn finish_run(
960 key: ScalarValue,
961 bitmap: &mut RowAddrTreeMap,
962 old_index: Option<&BitmapIndex>,
963 old_keys: &[OrderableScalarValue],
964 old_pos: &mut usize,
965 emitted_null: &mut bool,
966 writer: &mut BitmapBatchWriter,
967 ) -> Result<()> {
968 if key.is_null() {
969 if let Some(idx) = old_index
971 && !idx.null_map.is_empty()
972 {
973 *bitmap |= &*idx.null_map;
974 }
975 *emitted_null = true;
976 writer.emit(key, bitmap).await?;
977 } else if let Some(idx) = old_index {
978 let orderable = OrderableScalarValue(key.clone());
979
980 while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable {
982 let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
983 writer
984 .emit(old_keys[*old_pos].0.clone(), &old_bitmap)
985 .await?;
986 *old_pos += 1;
987 }
988
989 if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable {
991 let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
992 *bitmap |= &*old_bitmap;
993 *old_pos += 1;
994 }
995
996 writer.emit(key, bitmap).await?;
997 } else {
998 writer.emit(key, bitmap).await?;
999 }
1000 Ok(())
1001 }
1002
1003 pub(crate) fn remap_bitmap_state(
1005 state: HashMap<ScalarValue, RowAddrTreeMap>,
1006 mapping: &HashMap<u64, Option<u64>>,
1007 ) -> HashMap<ScalarValue, RowAddrTreeMap> {
1008 state
1009 .into_iter()
1010 .map(|(key, bitmap)| {
1011 let remapped_bitmap =
1012 RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
1013 let addr_as_u64 = u64::from(addr);
1014 mapping
1015 .get(&addr_as_u64)
1016 .copied()
1017 .unwrap_or(Some(addr_as_u64))
1018 }));
1019 (key, remapped_bitmap)
1020 })
1021 .collect()
1022 }
1023}
1024
1025#[async_trait]
1026impl ScalarIndexPlugin for BitmapIndexPlugin {
1027 fn name(&self) -> &str {
1028 "Bitmap"
1029 }
1030
1031 fn new_training_request(
1032 &self,
1033 _params: &str,
1034 field: &Field,
1035 ) -> Result<Box<dyn TrainingRequest>> {
1036 if field.data_type().is_nested() {
1037 return Err(Error::invalid_input_source(
1038 "A bitmap index can only be created on a non-nested field.".into(),
1039 ));
1040 }
1041 Ok(Box::new(DefaultTrainingRequest::new(
1042 TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
1043 )))
1044 }
1045
1046 fn provides_exact_answer(&self) -> bool {
1047 true
1048 }
1049
1050 fn version(&self) -> u32 {
1051 BITMAP_INDEX_VERSION
1052 }
1053
1054 fn new_query_parser(
1055 &self,
1056 index_name: String,
1057 _index_details: &prost_types::Any,
1058 ) -> Option<Box<dyn ScalarQueryParser>> {
1059 Some(Box::new(SargableQueryParser::new(index_name, false)))
1060 }
1061
1062 async fn train_index(
1063 &self,
1064 data: SendableRecordBatchStream,
1065 index_store: &dyn IndexStore,
1066 _request: Box<dyn TrainingRequest>,
1067 fragment_ids: Option<Vec<u32>>,
1068 _progress: Arc<dyn crate::progress::IndexBuildProgress>,
1069 ) -> Result<CreatedIndex> {
1070 if fragment_ids.is_some() {
1071 return Err(Error::invalid_input_source(
1072 "Bitmap index does not support fragment training".into(),
1073 ));
1074 }
1075
1076 Self::train_bitmap_index(data, index_store).await?;
1077 Ok(CreatedIndex {
1078 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
1079 .unwrap(),
1080 index_version: BITMAP_INDEX_VERSION,
1081 files: Some(index_store.list_files_with_sizes().await?),
1082 })
1083 }
1084
1085 async fn load_index(
1087 &self,
1088 index_store: Arc<dyn IndexStore>,
1089 _index_details: &prost_types::Any,
1090 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1091 cache: &LanceCache,
1092 ) -> Result<Arc<dyn ScalarIndex>> {
1093 Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
1094 }
1095
1096 async fn load_statistics(
1097 &self,
1098 index_store: Arc<dyn IndexStore>,
1099 _index_details: &prost_types::Any,
1100 ) -> Result<Option<serde_json::Value>> {
1101 let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
1102 if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
1103 let stats = serde_json::from_str(value).map_err(|e| {
1104 Error::internal(format!("failed to parse bitmap statistics metadata: {e}"))
1105 })?;
1106 Ok(Some(stats))
1107 } else {
1108 Ok(None)
1109 }
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use super::*;
1116 use crate::metrics::NoOpMetricsCollector;
1117 use crate::scalar::lance_format::LanceIndexStore;
1118 use arrow_array::{RecordBatch, StringArray, UInt64Array, record_batch};
1119 use arrow_schema::{DataType, Field, Schema};
1120
1121 fn sort_batch_by_value(batch: &RecordBatch) -> RecordBatch {
1124 use arrow::compute::SortOptions;
1125 let values = batch.column(0);
1126 let row_ids = batch.column(1);
1127 let options = SortOptions {
1128 descending: false,
1129 nulls_first: true,
1130 };
1131 let indices = arrow::compute::sort_to_indices(values, Some(options), None).unwrap();
1132 let sorted_values = arrow::compute::take(values.as_ref(), &indices, None).unwrap();
1133 let sorted_row_ids = arrow::compute::take(row_ids.as_ref(), &indices, None).unwrap();
1134 RecordBatch::try_new(batch.schema(), vec![sorted_values, sorted_row_ids]).unwrap()
1135 }
1136 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1137 use futures::stream;
1138 use lance_core::utils::mask::RowSetOps;
1139 use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
1140 use lance_io::object_store::ObjectStore;
1141 use std::collections::HashMap;
1142
1143 #[tokio::test]
1144 async fn test_bitmap_lazy_loading_and_cache() {
1145 let tmpdir = TempObjDir::default();
1147 let store = Arc::new(LanceIndexStore::new(
1148 Arc::new(ObjectStore::local()),
1149 tmpdir.clone(),
1150 Arc::new(LanceCache::no_cache()),
1151 ));
1152
1153 let colors = vec![
1155 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1156 "red", "red", "blue", "green", "yellow",
1157 ];
1158
1159 let row_ids = (0u64..15u64).collect::<Vec<_>>();
1160
1161 let schema = Arc::new(Schema::new(vec![
1162 Field::new("value", DataType::Utf8, false),
1163 Field::new("_rowid", DataType::UInt64, false),
1164 ]));
1165
1166 let batch = RecordBatch::try_new(
1167 schema.clone(),
1168 vec![
1169 Arc::new(StringArray::from(colors.clone())),
1170 Arc::new(UInt64Array::from(row_ids.clone())),
1171 ],
1172 )
1173 .unwrap();
1174
1175 let batch = sort_batch_by_value(&batch);
1176 let stream = stream::once(async move { Ok(batch) });
1177 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1178
1179 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1181 .await
1182 .unwrap();
1183
1184 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
1189 .await
1190 .unwrap();
1191
1192 assert_eq!(index.index_map.len(), 4); assert!(index.null_map.is_empty()); let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
1197 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1198
1199 let expected_red_rows = vec![0u64, 3, 6, 10, 11];
1201 if let SearchResult::Exact(row_ids) = result {
1202 let mut actual: Vec<u64> = row_ids
1203 .true_rows()
1204 .row_addrs()
1205 .unwrap()
1206 .map(|id| id.into())
1207 .collect();
1208 actual.sort();
1209 assert_eq!(actual, expected_red_rows);
1210 } else {
1211 panic!("Expected exact search result");
1212 }
1213
1214 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1216 if let SearchResult::Exact(row_ids) = result {
1217 let mut actual: Vec<u64> = row_ids
1218 .true_rows()
1219 .row_addrs()
1220 .unwrap()
1221 .map(|id| id.into())
1222 .collect();
1223 actual.sort();
1224 assert_eq!(actual, expected_red_rows);
1225 }
1226
1227 let query = SargableQuery::Range(
1229 std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1230 std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1231 );
1232 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1233
1234 let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
1235 if let SearchResult::Exact(row_ids) = result {
1236 let mut actual: Vec<u64> = row_ids
1237 .true_rows()
1238 .row_addrs()
1239 .unwrap()
1240 .map(|id| id.into())
1241 .collect();
1242 actual.sort();
1243 assert_eq!(actual, expected_range_rows);
1244 }
1245
1246 let query = SargableQuery::Range(
1248 std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1249 std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1250 );
1251 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1252 if let SearchResult::Exact(row_ids) = result {
1253 assert!(row_ids.true_rows().is_empty());
1254 } else {
1255 panic!("Expected exact search result");
1256 }
1257
1258 let query = SargableQuery::IsIn(vec![
1260 ScalarValue::Utf8(Some("red".to_string())),
1261 ScalarValue::Utf8(Some("yellow".to_string())),
1262 ]);
1263 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1264
1265 let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
1266 if let SearchResult::Exact(row_ids) = result {
1267 let mut actual: Vec<u64> = row_ids
1268 .true_rows()
1269 .row_addrs()
1270 .unwrap()
1271 .map(|id| id.into())
1272 .collect();
1273 actual.sort();
1274 assert_eq!(actual, expected_in_rows);
1275 }
1276 }
1277
1278 #[tokio::test]
1279 #[ignore]
1280 async fn test_big_bitmap_index() {
1281 use super::{BITMAP_LOOKUP_NAME, BitmapIndex};
1284 use crate::scalar::IndexStore;
1285 use crate::scalar::lance_format::LanceIndexStore;
1286 use arrow_schema::DataType;
1287 use datafusion_common::ScalarValue;
1288 use lance_core::cache::LanceCache;
1289 use lance_core::utils::mask::RowAddrTreeMap;
1290 use lance_io::object_store::ObjectStore;
1291 use std::collections::HashMap;
1292 use std::sync::Arc;
1293
1294 let m: u32 = 2_500_000;
1300 let per_bitmap_size = 1000; let mut state = HashMap::new();
1303 for i in 0..m {
1304 let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
1306
1307 let key = ScalarValue::UInt32(Some(i));
1308 state.insert(key, bitmap);
1309 }
1310
1311 let tmpdir = TempObjDir::default();
1313 let test_store = LanceIndexStore::new(
1314 Arc::new(ObjectStore::local()),
1315 tmpdir.clone(),
1316 Arc::new(LanceCache::no_cache()),
1317 );
1318
1319 let result =
1322 BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
1323
1324 assert!(
1325 result.is_ok(),
1326 "Failed to write bitmap index: {:?}",
1327 result.err()
1328 );
1329
1330 let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
1332 assert!(
1333 index_file.is_ok(),
1334 "Failed to open index file: {:?}",
1335 index_file.err()
1336 );
1337 let index_file = index_file.unwrap();
1338
1339 tracing::info!(
1341 "Index file contains {} rows in total",
1342 index_file.num_rows()
1343 );
1344
1345 tracing::info!("Loading index from disk...");
1347 let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
1348 .await
1349 .expect("Failed to load bitmap index");
1350
1351 assert_eq!(
1353 loaded_index.index_map.len(),
1354 m as usize,
1355 "Loaded index has incorrect number of keys (expected {}, got {})",
1356 m,
1357 loaded_index.index_map.len()
1358 );
1359
1360 let test_keys = [0, m / 2, m - 1]; for &key_val in &test_keys {
1363 let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
1364 let bitmap = loaded_index
1366 .load_bitmap(&key, None)
1367 .await
1368 .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
1369
1370 let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
1372
1373 assert_eq!(
1375 row_addrs.len(),
1376 per_bitmap_size as usize,
1377 "Bitmap for key {} has wrong size",
1378 key_val
1379 );
1380
1381 for i in 0..5.min(per_bitmap_size) {
1383 assert!(
1384 row_addrs.contains(&i),
1385 "Bitmap for key {} should contain row_id {}",
1386 key_val,
1387 i
1388 );
1389 }
1390
1391 for i in (per_bitmap_size - 5)..per_bitmap_size {
1392 assert!(
1393 row_addrs.contains(&i),
1394 "Bitmap for key {} should contain row_id {}",
1395 key_val,
1396 i
1397 );
1398 }
1399
1400 let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
1402 assert_eq!(
1403 row_addrs, expected_range,
1404 "Bitmap for key {} doesn't contain expected values",
1405 key_val
1406 );
1407
1408 tracing::info!(
1409 "✓ Verified bitmap for key {}: {} rows as expected",
1410 key_val,
1411 row_addrs.len()
1412 );
1413 }
1414
1415 tracing::info!("Test successful! Index properly contains {} keys", m);
1416 }
1417
1418 #[tokio::test]
1419 async fn test_bitmap_prewarm() {
1420 let tmpdir = TempObjDir::default();
1422 let store = Arc::new(LanceIndexStore::new(
1423 Arc::new(ObjectStore::local()),
1424 tmpdir.clone(),
1425 Arc::new(LanceCache::no_cache()),
1426 ));
1427
1428 let colors = vec![
1430 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1431 "red", "red", "blue", "green", "yellow",
1432 ];
1433
1434 let row_ids = (0u64..15u64).collect::<Vec<_>>();
1435
1436 let schema = Arc::new(Schema::new(vec![
1437 Field::new("value", DataType::Utf8, false),
1438 Field::new("_rowid", DataType::UInt64, false),
1439 ]));
1440
1441 let batch = RecordBatch::try_new(
1442 schema.clone(),
1443 vec![
1444 Arc::new(StringArray::from(colors.clone())),
1445 Arc::new(UInt64Array::from(row_ids.clone())),
1446 ],
1447 )
1448 .unwrap();
1449
1450 let batch = sort_batch_by_value(&batch);
1451 let stream = stream::once(async move { Ok(batch) });
1452 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1453
1454 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1456 .await
1457 .unwrap();
1458
1459 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
1464 .await
1465 .unwrap();
1466
1467 let cache_key_red = BitmapKey {
1469 value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
1470 };
1471 let cache_key_blue = BitmapKey {
1472 value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
1473 };
1474
1475 assert!(
1476 cache
1477 .get_with_key::<BitmapKey>(&cache_key_red)
1478 .await
1479 .is_none()
1480 );
1481 assert!(
1482 cache
1483 .get_with_key::<BitmapKey>(&cache_key_blue)
1484 .await
1485 .is_none()
1486 );
1487
1488 index.prewarm().await.unwrap();
1490
1491 assert!(
1493 cache
1494 .get_with_key::<BitmapKey>(&cache_key_red)
1495 .await
1496 .is_some()
1497 );
1498 assert!(
1499 cache
1500 .get_with_key::<BitmapKey>(&cache_key_blue)
1501 .await
1502 .is_some()
1503 );
1504
1505 let cached_red = cache
1507 .get_with_key::<BitmapKey>(&cache_key_red)
1508 .await
1509 .unwrap();
1510 let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
1511 assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
1512
1513 index.prewarm().await.unwrap();
1515
1516 let cached_red_2 = cache
1518 .get_with_key::<BitmapKey>(&cache_key_red)
1519 .await
1520 .unwrap();
1521 let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
1522 assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
1523 }
1524
1525 #[tokio::test]
1526 async fn test_remap_bitmap_with_null() {
1527 use arrow_array::UInt32Array;
1528
1529 let tmpdir = TempObjDir::default();
1531 let test_store = Arc::new(LanceIndexStore::new(
1532 Arc::new(ObjectStore::local()),
1533 tmpdir.clone(),
1534 Arc::new(LanceCache::no_cache()),
1535 ));
1536
1537 let values = vec![
1542 None, None, Some(1u32), Some(1u32), Some(2u32), Some(2u32), ];
1549
1550 let row_ids: Vec<u64> = vec![
1552 RowAddress::new_from_parts(1, 0).into(),
1553 RowAddress::new_from_parts(1, 1).into(),
1554 RowAddress::new_from_parts(1, 2).into(),
1555 RowAddress::new_from_parts(2, 0).into(),
1556 RowAddress::new_from_parts(2, 1).into(),
1557 RowAddress::new_from_parts(2, 2).into(),
1558 ];
1559
1560 let schema = Arc::new(Schema::new(vec![
1561 Field::new("value", DataType::UInt32, true),
1562 Field::new("_rowid", DataType::UInt64, false),
1563 ]));
1564
1565 let batch = RecordBatch::try_new(
1566 schema.clone(),
1567 vec![
1568 Arc::new(UInt32Array::from(values)),
1569 Arc::new(UInt64Array::from(row_ids)),
1570 ],
1571 )
1572 .unwrap();
1573
1574 let stream = stream::once(async move { Ok(batch) });
1575 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1576
1577 BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
1579 .await
1580 .unwrap();
1581
1582 let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1584 .await
1585 .expect("Failed to load bitmap index");
1586
1587 assert_eq!(index.index_map.len(), 2); assert!(!index.null_map.is_empty()); let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
1593 row_addr_map.insert(
1594 RowAddress::new_from_parts(1, 0).into(),
1595 Some(RowAddress::new_from_parts(3, 0).into()),
1596 );
1597 row_addr_map.insert(
1598 RowAddress::new_from_parts(1, 1).into(),
1599 Some(RowAddress::new_from_parts(3, 1).into()),
1600 );
1601 row_addr_map.insert(
1602 RowAddress::new_from_parts(1, 2).into(),
1603 Some(RowAddress::new_from_parts(3, 2).into()),
1604 );
1605 row_addr_map.insert(
1606 RowAddress::new_from_parts(2, 0).into(),
1607 Some(RowAddress::new_from_parts(3, 3).into()),
1608 );
1609 row_addr_map.insert(
1610 RowAddress::new_from_parts(2, 1).into(),
1611 Some(RowAddress::new_from_parts(3, 4).into()),
1612 );
1613 row_addr_map.insert(
1614 RowAddress::new_from_parts(2, 2).into(),
1615 Some(RowAddress::new_from_parts(3, 5).into()),
1616 );
1617
1618 index
1620 .remap(&row_addr_map, test_store.as_ref())
1621 .await
1622 .unwrap();
1623
1624 let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
1626 .await
1627 .expect("Failed to load remapped bitmap index");
1628
1629 let expected_null_addrs: Vec<u64> = vec![
1631 RowAddress::new_from_parts(3, 0).into(),
1632 RowAddress::new_from_parts(3, 1).into(),
1633 ];
1634 let actual_null_addrs: Vec<u64> = reloaded_idx
1635 .null_map
1636 .row_addrs()
1637 .unwrap()
1638 .map(u64::from)
1639 .collect();
1640 assert_eq!(
1641 actual_null_addrs, expected_null_addrs,
1642 "Null bitmap not remapped correctly"
1643 );
1644
1645 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
1647 let result = reloaded_idx
1648 .search(&query, &NoOpMetricsCollector)
1649 .await
1650 .unwrap();
1651 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1652 let mut actual: Vec<u64> = row_ids
1653 .true_rows()
1654 .row_addrs()
1655 .unwrap()
1656 .map(u64::from)
1657 .collect();
1658 actual.sort();
1659 let expected: Vec<u64> = vec![
1660 RowAddress::new_from_parts(3, 2).into(),
1661 RowAddress::new_from_parts(3, 3).into(),
1662 ];
1663 assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
1664 }
1665
1666 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
1668 let result = reloaded_idx
1669 .search(&query, &NoOpMetricsCollector)
1670 .await
1671 .unwrap();
1672 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1673 let mut actual: Vec<u64> = row_ids
1674 .true_rows()
1675 .row_addrs()
1676 .unwrap()
1677 .map(u64::from)
1678 .collect();
1679 actual.sort();
1680 let expected: Vec<u64> = vec![
1681 RowAddress::new_from_parts(3, 4).into(),
1682 RowAddress::new_from_parts(3, 5).into(),
1683 ];
1684 assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
1685 }
1686
1687 let query = SargableQuery::IsNull();
1689 let result = reloaded_idx
1690 .search(&query, &NoOpMetricsCollector)
1691 .await
1692 .unwrap();
1693 if let crate::scalar::SearchResult::Exact(row_ids) = result {
1694 let mut actual: Vec<u64> = row_ids
1695 .true_rows()
1696 .row_addrs()
1697 .unwrap()
1698 .map(u64::from)
1699 .collect();
1700 actual.sort();
1701 assert_eq!(
1702 actual, expected_null_addrs,
1703 "Null search results not correct"
1704 );
1705 }
1706 }
1707
1708 #[tokio::test]
1709 async fn test_bitmap_null_handling_in_queries() {
1710 let tmpdir = TempObjDir::default();
1712 let store = Arc::new(LanceIndexStore::new(
1713 Arc::new(ObjectStore::local()),
1714 tmpdir.clone(),
1715 Arc::new(LanceCache::no_cache()),
1716 ));
1717
1718 let batch = record_batch!(
1720 ("value", Int64, [Some(0), Some(5), None]),
1721 ("_rowid", UInt64, [0, 1, 2])
1722 )
1723 .unwrap();
1724 let schema = batch.schema();
1725 let stream = stream::once(async move { Ok(batch) });
1726 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1727
1728 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1730 .await
1731 .unwrap();
1732
1733 let cache = LanceCache::with_capacity(1024 * 1024);
1734 let index = BitmapIndex::load(store.clone(), None, &cache)
1735 .await
1736 .unwrap();
1737
1738 let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
1740 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1741
1742 match result {
1743 SearchResult::Exact(row_ids) => {
1744 let actual_rows: Vec<u64> = row_ids
1745 .true_rows()
1746 .row_addrs()
1747 .unwrap()
1748 .map(u64::from)
1749 .collect();
1750 assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
1751
1752 let null_row_ids = row_ids.null_rows();
1753 assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1755 let null_rows: Vec<u64> =
1756 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1757 assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1758 }
1759 _ => panic!("Expected Exact search result"),
1760 }
1761
1762 let query = SargableQuery::IsNull();
1764 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1765
1766 match result {
1767 SearchResult::Exact(row_addrs) => {
1768 let actual_rows: Vec<u64> = row_addrs
1769 .true_rows()
1770 .row_addrs()
1771 .unwrap()
1772 .map(u64::from)
1773 .collect();
1774 assert_eq!(
1775 actual_rows,
1776 vec![2],
1777 "IsNull should find row 2 where value is null"
1778 );
1779
1780 let null_row_ids = row_addrs.null_rows();
1781 assert!(
1783 null_row_ids.is_empty(),
1784 "null_row_ids should be None for IsNull query"
1785 );
1786 }
1787 _ => panic!("Expected Exact search result"),
1788 }
1789
1790 let query = SargableQuery::Range(
1792 std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
1793 std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
1794 );
1795 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1796
1797 match result {
1798 SearchResult::Exact(row_addrs) => {
1799 let actual_rows: Vec<u64> = row_addrs
1800 .true_rows()
1801 .row_addrs()
1802 .unwrap()
1803 .map(u64::from)
1804 .collect();
1805 assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
1806
1807 let null_row_ids = row_addrs.null_rows();
1809 assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1810 let null_rows: Vec<u64> =
1811 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1812 assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1813 }
1814 _ => panic!("Expected Exact search result"),
1815 }
1816 }
1817}