1use std::{
5 any::Any,
6 cmp::Reverse,
7 collections::{BTreeMap, BinaryHeap, HashMap},
8 fmt::Debug,
9 ops::Bound,
10 sync::Arc,
11};
12
13use arrow::array::BinaryBuilder;
14use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array, new_null_array};
15use arrow_schema::{DataType, Field, Schema};
16use async_trait::async_trait;
17use bytes::Bytes;
18use datafusion::physical_plan::SendableRecordBatchStream;
19use datafusion_common::ScalarValue;
20use deepsize::DeepSizeOf;
21use futures::{StreamExt, TryStreamExt, stream};
22use lance_arrow::ipc::{
23 read_ipc_stream_single_at, read_len_prefixed_bytes_at, write_ipc_stream,
24 write_len_prefixed_bytes,
25};
26use lance_core::utils::mask::RowSetOps;
27use lance_core::{
28 Error, ROW_ID, Result,
29 cache::{CacheCodec, CacheCodecImpl, CacheKey, LanceCache, WeakLanceCache},
30 error::LanceOptionExt,
31 utils::{
32 mask::{NullableRowAddrSet, RowAddrTreeMap},
33 tokio::get_num_compute_intensive_cpus,
34 },
35};
36use lance_io::object_store::ObjectStore;
37use object_store::path::Path;
38use roaring::RoaringBitmap;
39use serde::{Deserialize, Serialize};
40use tracing::{instrument, warn};
41
42use super::{AnyQuery, IndexStore, ScalarIndex};
43use super::{
44 BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue,
45};
46use crate::pbold;
47use crate::{Index, IndexType, metrics::MetricsCollector};
48use crate::{
49 frag_reuse::FragReuseIndex,
50 progress::IndexBuildProgress,
51 scalar::{
52 CreatedIndex, UpdateCriteria,
53 expression::SargableQueryParser,
54 registry::{
55 ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
56 VALUE_COLUMN_NAME,
57 },
58 },
59};
60use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser};
61
62pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
63pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
64const BITMAP_PART_LOOKUP_PREFIX: &str = "part_";
65const BITMAP_PART_LOOKUP_SUFFIX: &str = "_bitmap_page_lookup.lance";
66const EXPLICIT_SHARD_ID_TAG: u64 = 0;
67const IMPLICIT_FRAGMENT_ID_TAG: u64 = 1;
68
69const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
72const MERGE_ROWS_PER_CHUNK: usize = 512;
76
77const BITMAP_INDEX_VERSION: u32 = 0;
78
79#[derive(Clone)]
82struct LazyIndexReader {
83 index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
84 store: Arc<dyn IndexStore>,
85}
86
87impl std::fmt::Debug for LazyIndexReader {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("LazyIndexReader")
90 .field("store", &self.store)
91 .finish()
92 }
93}
94
95impl LazyIndexReader {
96 fn new(store: Arc<dyn IndexStore>) -> Self {
97 Self {
98 index_reader: Arc::new(tokio::sync::Mutex::new(None)),
99 store,
100 }
101 }
102
103 async fn get(&self) -> Result<Arc<dyn IndexReader>> {
104 let mut reader = self.index_reader.lock().await;
105 if reader.is_none() {
106 let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
107 *reader = Some(index_reader);
108 }
109 Ok(reader.as_ref().unwrap().clone())
110 }
111}
112
113#[derive(Clone, Debug)]
118pub struct BitmapIndex {
119 index_map: BTreeMap<OrderableScalarValue, usize>,
123
124 null_map: Arc<RowAddrTreeMap>,
125
126 value_type: DataType,
127
128 store: Arc<dyn IndexStore>,
129
130 index_cache: WeakLanceCache,
131
132 frag_reuse_index: Option<Arc<FragReuseIndex>>,
133
134 lazy_reader: LazyIndexReader,
135}
136
137#[derive(Debug, Clone)]
138pub struct BitmapKey {
139 value: OrderableScalarValue,
140}
141
142impl CacheKey for BitmapKey {
143 type ValueType = RowAddrTreeMap;
144
145 fn key(&self) -> std::borrow::Cow<'_, str> {
146 format!("{}", self.value.0).into()
147 }
148
149 fn type_name() -> &'static str {
150 "Bitmap"
151 }
152
153 fn codec() -> Option<CacheCodec> {
154 Some(CacheCodec::from_impl::<RowAddrTreeMap>())
155 }
156}
157
158#[derive(Debug, Clone)]
165pub struct BitmapIndexState {
166 lookup_batch: RecordBatch,
173 null_map: Arc<RowAddrTreeMap>,
176 value_type: DataType,
179}
180
181impl DeepSizeOf for BitmapIndexState {
182 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
183 self.lookup_batch.get_array_memory_size() + self.null_map.deep_size_of_children(context)
184 }
185}
186
187impl BitmapIndexState {
188 pub(crate) fn from_index(index: &BitmapIndex) -> Result<Self> {
189 Ok(Self {
190 lookup_batch: build_lookup_batch(&index.index_map, &index.value_type)?,
191 null_map: index.null_map.clone(),
192 value_type: index.value_type.clone(),
193 })
194 }
195
196 pub(crate) fn into_bitmap_index(
197 self,
198 store: Arc<dyn IndexStore>,
199 index_cache: &LanceCache,
200 frag_reuse_index: Option<Arc<FragReuseIndex>>,
201 ) -> Result<Arc<BitmapIndex>> {
202 let index_map = parse_lookup_batch(&self.lookup_batch)?;
203 Ok(Arc::new(BitmapIndex::new(
204 index_map,
205 self.null_map,
206 self.value_type,
207 store,
208 WeakLanceCache::from(index_cache),
209 frag_reuse_index,
210 )))
211 }
212}
213
214fn build_lookup_batch(
215 index_map: &BTreeMap<OrderableScalarValue, usize>,
216 value_type: &DataType,
217) -> Result<RecordBatch> {
218 let keys = if index_map.is_empty() {
219 arrow_array::new_empty_array(value_type)
220 } else {
221 ScalarValue::iter_to_array(index_map.keys().map(|k| k.0.clone()))?
222 };
223 let offsets = Arc::new(UInt64Array::from_iter_values(
224 index_map.values().map(|v| *v as u64),
225 ));
226 let schema = Arc::new(Schema::new(vec![
227 Field::new("keys", value_type.clone(), true),
228 Field::new("offsets", DataType::UInt64, false),
229 ]));
230 Ok(RecordBatch::try_new(schema, vec![keys, offsets])?)
231}
232
233fn parse_lookup_batch(batch: &RecordBatch) -> Result<BTreeMap<OrderableScalarValue, usize>> {
234 let keys = batch.column(0);
235 let offsets = batch
236 .column(1)
237 .as_any()
238 .downcast_ref::<UInt64Array>()
239 .ok_or_else(|| {
240 Error::internal("BitmapIndexState: expected UInt64 offsets column".to_string())
241 })?;
242 let mut index_map = BTreeMap::new();
243 for idx in 0..batch.num_rows() {
244 let value = OrderableScalarValue(ScalarValue::try_from_array(keys, idx)?);
245 index_map.insert(value, offsets.value(idx) as usize);
246 }
247 Ok(index_map)
248}
249
250impl CacheCodecImpl for BitmapIndexState {
251 fn serialize(&self, writer: &mut dyn std::io::Write) -> Result<()> {
258 let mut null_bytes = Vec::with_capacity(self.null_map.serialized_size());
259 self.null_map.serialize_into(&mut null_bytes)?;
260 write_len_prefixed_bytes(writer, &null_bytes)?;
261 write_ipc_stream(&self.lookup_batch, writer)?;
262 Ok(())
263 }
264
265 fn deserialize(data: &bytes::Bytes) -> Result<Self> {
266 let mut offset = 0;
267 let null_bytes = read_len_prefixed_bytes_at(data, &mut offset)?;
268 let null_map = Arc::new(RowAddrTreeMap::deserialize_from(null_bytes.as_ref())?);
269 let lookup_batch = read_ipc_stream_single_at(data, &mut offset)?;
270 let value_type = lookup_batch.schema().field(0).data_type().clone();
271 Ok(Self {
272 lookup_batch,
273 null_map,
274 value_type,
275 })
276 }
277}
278
279struct BitmapIndexStateKey;
282
283impl CacheKey for BitmapIndexStateKey {
284 type ValueType = BitmapIndexState;
285
286 fn key(&self) -> std::borrow::Cow<'_, str> {
287 "state".into()
288 }
289
290 fn type_name() -> &'static str {
291 "BitmapIndexState"
292 }
293
294 fn codec() -> Option<CacheCodec> {
295 Some(CacheCodec::from_impl::<BitmapIndexState>())
296 }
297}
298
299impl BitmapIndex {
300 fn new(
301 index_map: BTreeMap<OrderableScalarValue, usize>,
302 null_map: Arc<RowAddrTreeMap>,
303 value_type: DataType,
304 store: Arc<dyn IndexStore>,
305 index_cache: WeakLanceCache,
306 frag_reuse_index: Option<Arc<FragReuseIndex>>,
307 ) -> Self {
308 let lazy_reader = LazyIndexReader::new(store.clone());
309 Self {
310 index_map,
311 null_map,
312 value_type,
313 store,
314 index_cache,
315 frag_reuse_index,
316 lazy_reader,
317 }
318 }
319
320 pub(crate) async fn load(
321 store: Arc<dyn IndexStore>,
322 frag_reuse_index: Option<Arc<FragReuseIndex>>,
323 index_cache: &LanceCache,
324 ) -> Result<Arc<Self>> {
325 let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
326 let total_rows = page_lookup_file.num_rows();
327
328 if total_rows == 0 {
329 let schema = page_lookup_file.schema();
330 let data_type = schema.fields[0].data_type();
331 return Ok(Arc::new(Self::new(
332 BTreeMap::new(),
333 Arc::new(RowAddrTreeMap::default()),
334 data_type,
335 store,
336 WeakLanceCache::from(index_cache),
337 frag_reuse_index,
338 )));
339 }
340
341 let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
342 let mut null_map = Arc::new(RowAddrTreeMap::default());
343 let mut null_location: Option<usize> = None;
344 let value_type = page_lookup_file.schema().fields[0].data_type();
345
346 let mut keys_stream = page_lookup_file
349 .read_range_stream(0..total_rows, Some(&["keys"]))
350 .await?;
351 let mut row_offset: usize = 0;
352 while let Some(keys_batch) = keys_stream.try_next().await? {
353 let dict_keys = keys_batch.column(0);
354 for idx in 0..keys_batch.num_rows() {
355 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
356 if key.0.is_null() {
357 null_location = Some(row_offset);
358 } else {
359 index_map.insert(key, row_offset);
360 }
361 row_offset += 1;
362 }
363 }
364
365 if let Some(null_loc) = null_location {
366 let batch = page_lookup_file
367 .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
368 .await?;
369
370 let binary_bitmaps = batch
371 .column(0)
372 .as_any()
373 .downcast_ref::<BinaryArray>()
374 .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
375 let bitmap_bytes = binary_bitmaps.value(0);
376 let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
377
378 if let Some(fri) = &frag_reuse_index {
380 bitmap = fri.remap_row_addrs_tree_map(&bitmap);
381 }
382
383 null_map = Arc::new(bitmap);
384 }
385
386 Ok(Arc::new(Self::new(
387 index_map,
388 null_map,
389 value_type,
390 store,
391 WeakLanceCache::from(index_cache),
392 frag_reuse_index,
393 )))
394 }
395
396 async fn load_bitmap(
397 &self,
398 key: &OrderableScalarValue,
399 metrics: Option<&dyn MetricsCollector>,
400 ) -> Result<Arc<RowAddrTreeMap>> {
401 if key.0.is_null() {
402 return Ok(self.null_map.clone());
403 }
404
405 let cache_key = BitmapKey { value: key.clone() };
406
407 if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
408 return Ok(cached);
409 }
410
411 if let Some(metrics) = metrics {
413 metrics.record_part_load();
414 }
415
416 let row_offset = match self.index_map.get(key) {
417 Some(loc) => *loc,
418 None => return Ok(Arc::new(RowAddrTreeMap::default())),
419 };
420
421 let page_lookup_file = self.lazy_reader.get().await?;
422 let batch = page_lookup_file
423 .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
424 .await?;
425
426 let binary_bitmaps = batch
427 .column(0)
428 .as_any()
429 .downcast_ref::<BinaryArray>()
430 .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
431 let bitmap_bytes = binary_bitmaps.value(0); let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
433
434 if let Some(fri) = &self.frag_reuse_index {
435 bitmap = fri.remap_row_addrs_tree_map(&bitmap);
436 }
437
438 self.index_cache
439 .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
440 .await;
441
442 Ok(Arc::new(bitmap))
443 }
444
445 pub(crate) fn value_type(&self) -> &DataType {
446 &self.value_type
447 }
448
449 pub(crate) async fn load_bitmap_index_state(
451 &self,
452 ) -> Result<HashMap<ScalarValue, RowAddrTreeMap>> {
453 let mut state = HashMap::new();
454
455 for key in self.index_map.keys() {
456 let bitmap = self.load_bitmap(key, None).await?;
457 state.insert(key.0.clone(), (*bitmap).clone());
458 }
459
460 if !self.null_map.is_empty() {
461 let existing_null = new_null_array(&self.value_type, 1);
462 let existing_null = ScalarValue::try_from_array(existing_null.as_ref(), 0)?;
463 state.insert(existing_null, (*self.null_map).clone());
464 }
465
466 Ok(state)
467 }
468}
469
470impl DeepSizeOf for BitmapIndex {
471 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
472 let mut total_size = 0;
473
474 total_size += self.index_map.deep_size_of_children(context);
475 total_size += self.store.deep_size_of_children(context);
476
477 total_size
478 }
479}
480
481#[derive(Serialize)]
482struct BitmapStatistics {
483 num_bitmaps: usize,
484}
485
486#[derive(Debug, Clone, Default, Serialize, Deserialize)]
487pub struct BitmapParameters {
488 pub shard_id: Option<u32>,
491}
492
493struct BitmapTrainingRequest {
494 parameters: BitmapParameters,
495 criteria: TrainingCriteria,
496}
497
498impl BitmapTrainingRequest {
499 fn new(parameters: BitmapParameters) -> Self {
500 Self {
501 parameters,
502 criteria: TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
503 }
504 }
505}
506
507impl TrainingRequest for BitmapTrainingRequest {
508 fn as_any(&self) -> &dyn std::any::Any {
509 self
510 }
511
512 fn criteria(&self) -> &TrainingCriteria {
513 &self.criteria
514 }
515}
516
517#[async_trait]
518impl Index for BitmapIndex {
519 fn as_any(&self) -> &dyn Any {
520 self
521 }
522
523 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
524 self
525 }
526
527 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
528 Err(Error::not_supported_source(
529 "BitmapIndex is not a vector index".into(),
530 ))
531 }
532
533 async fn prewarm(&self) -> Result<()> {
534 let page_lookup_file = self.lazy_reader.get().await?;
535 let total_rows = page_lookup_file.num_rows();
536
537 if total_rows == 0 {
538 return Ok(());
539 }
540
541 for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
542 let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
543 let chunk = page_lookup_file
544 .read_range(start_row..end_row, None)
545 .await?;
546
547 if chunk.num_rows() == 0 {
548 continue;
549 }
550
551 let dict_keys = chunk.column(0);
552 let binary_bitmaps = chunk.column(1);
553 let bitmap_binary_array = binary_bitmaps
554 .as_any()
555 .downcast_ref::<BinaryArray>()
556 .unwrap();
557
558 for idx in 0..chunk.num_rows() {
559 let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
560
561 if key.0.is_null() {
562 continue;
563 }
564
565 let bitmap_bytes = bitmap_binary_array.value(idx);
566 let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
567
568 if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
569 bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
570 }
571
572 let cache_key = BitmapKey { value: key };
573 self.index_cache
574 .insert_with_key(&cache_key, Arc::new(bitmap))
575 .await;
576 }
577 }
578
579 Ok(())
580 }
581
582 fn index_type(&self) -> IndexType {
583 IndexType::Bitmap
584 }
585
586 fn statistics(&self) -> Result<serde_json::Value> {
587 let stats = BitmapStatistics {
588 num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
589 };
590 serde_json::to_value(stats).map_err(|e| {
591 Error::internal(format!(
592 "failed to serialize bitmap index statistics: {}",
593 e
594 ))
595 })
596 }
597
598 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
599 unimplemented!()
600 }
601}
602
603#[async_trait]
604impl ScalarIndex for BitmapIndex {
605 #[instrument(name = "bitmap_search", level = "debug", skip_all)]
606 async fn search(
607 &self,
608 query: &dyn AnyQuery,
609 metrics: &dyn MetricsCollector,
610 ) -> Result<SearchResult> {
611 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
612
613 let (row_ids, null_row_ids) = match query {
614 SargableQuery::Equals(val) => {
615 metrics.record_comparisons(1);
616 if val.is_null() {
617 ((*self.null_map).clone(), None)
619 } else {
620 let key = OrderableScalarValue(val.clone());
621 let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
622 let null_rows = if !self.null_map.is_empty() {
623 Some((*self.null_map).clone())
624 } else {
625 None
626 };
627 ((*bitmap).clone(), null_rows)
628 }
629 }
630 SargableQuery::Range(start, end) => {
631 let range_start = match start {
632 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
633 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
634 Bound::Unbounded => Bound::Unbounded,
635 };
636
637 let range_end = match end {
638 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
639 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
640 Bound::Unbounded => Bound::Unbounded,
641 };
642
643 let empty_range = match (&range_start, &range_end) {
645 (Bound::Included(lower), Bound::Included(upper)) => lower > upper,
646 (Bound::Included(lower), Bound::Excluded(upper))
647 | (Bound::Excluded(lower), Bound::Included(upper))
648 | (Bound::Excluded(lower), Bound::Excluded(upper)) => lower >= upper,
649 _ => false,
650 };
651
652 let keys: Vec<_> = if empty_range {
653 Vec::new()
654 } else {
655 self.index_map
656 .range((range_start, range_end))
657 .map(|(k, _v)| k.clone())
658 .collect()
659 };
660
661 metrics.record_comparisons(keys.len());
662
663 let result = if keys.is_empty() {
664 RowAddrTreeMap::default()
665 } else {
666 let bitmaps: Vec<_> = stream::iter(
667 keys.into_iter()
668 .map(|key| async move { self.load_bitmap(&key, None).await }),
669 )
670 .buffer_unordered(get_num_compute_intensive_cpus())
671 .try_collect()
672 .await?;
673
674 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
675 RowAddrTreeMap::union_all(&bitmap_refs)
676 };
677
678 let null_rows = if !self.null_map.is_empty() {
679 Some((*self.null_map).clone())
680 } else {
681 None
682 };
683 (result, null_rows)
684 }
685 SargableQuery::IsIn(values) => {
686 metrics.record_comparisons(values.len());
687
688 let mut has_null = false;
690 let keys: Vec<_> = values
691 .iter()
692 .filter_map(|val| {
693 if val.is_null() {
694 has_null = true;
695 None
696 } else {
697 let key = OrderableScalarValue(val.clone());
698 if self.index_map.contains_key(&key) {
699 Some(key)
700 } else {
701 None
702 }
703 }
704 })
705 .collect();
706
707 let mut bitmaps: Vec<_> = stream::iter(
709 keys.into_iter()
710 .map(|key| async move { self.load_bitmap(&key, None).await }),
711 )
712 .buffer_unordered(get_num_compute_intensive_cpus())
713 .try_collect()
714 .await?;
715
716 if has_null && !self.null_map.is_empty() {
718 bitmaps.push(self.null_map.clone());
719 }
720
721 let result = if bitmaps.is_empty() {
722 RowAddrTreeMap::default()
723 } else {
724 let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
726 RowAddrTreeMap::union_all(&bitmap_refs)
727 };
728
729 let null_rows = if !has_null && !self.null_map.is_empty() {
732 Some((*self.null_map).clone())
733 } else {
734 None
735 };
736 (result, null_rows)
737 }
738 SargableQuery::IsNull() => {
739 metrics.record_comparisons(1);
740 ((*self.null_map).clone(), None)
742 }
743 SargableQuery::FullTextSearch(_) => {
744 return Err(Error::not_supported_source(
745 "full text search is not supported for bitmap indexes".into(),
746 ));
747 }
748 SargableQuery::LikePrefix(_) => {
749 return Err(Error::not_supported_source(
750 "LIKE prefix queries are not supported for bitmap indexes".into(),
751 ));
752 }
753 };
754
755 let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
756 Ok(SearchResult::Exact(selection))
757 }
758
759 fn can_remap(&self) -> bool {
760 true
761 }
762
763 async fn remap(
765 &self,
766 mapping: &HashMap<u64, Option<u64>>,
767 dest_store: &dyn IndexStore,
768 ) -> Result<CreatedIndex> {
769 let state = self.load_bitmap_index_state().await?;
770 let remapped_state = BitmapIndexPlugin::remap_bitmap_state(state, mapping);
771 BitmapIndexPlugin::write_bitmap_index(remapped_state, dest_store, &self.value_type).await?;
772
773 Ok(CreatedIndex {
774 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
775 .unwrap(),
776 index_version: BITMAP_INDEX_VERSION,
777 files: Some(dest_store.list_files_with_sizes().await?),
778 })
779 }
780
781 async fn update(
783 &self,
784 new_data: SendableRecordBatchStream,
785 dest_store: &dyn IndexStore,
786 _old_data_filter: Option<super::OldIndexDataFilter>,
787 ) -> Result<CreatedIndex> {
788 BitmapIndexPlugin::streaming_build_and_write(
789 new_data,
790 Some(self),
791 dest_store,
792 BITMAP_LOOKUP_NAME,
793 )
794 .await?;
795
796 Ok(CreatedIndex {
797 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
798 .unwrap(),
799 index_version: BITMAP_INDEX_VERSION,
800 files: Some(dest_store.list_files_with_sizes().await?),
801 })
802 }
803
804 fn update_criteria(&self) -> UpdateCriteria {
805 UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
806 }
807
808 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
809 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
810 }
811}
812
813struct BitmapBatchWriter {
816 file: Box<dyn super::IndexWriter>,
817 keys: Vec<ScalarValue>,
818 serialized: Vec<Vec<u8>>,
819 bytes: usize,
820 num_bitmaps: usize,
821}
822
823impl BitmapBatchWriter {
824 fn new(file: Box<dyn super::IndexWriter>) -> Self {
825 Self {
826 file,
827 keys: Vec::new(),
828 serialized: Vec::new(),
829 bytes: 0,
830 num_bitmaps: 0,
831 }
832 }
833
834 async fn emit(&mut self, key: ScalarValue, bitmap: &RowAddrTreeMap) -> Result<()> {
837 let mut buf = Vec::new();
838 bitmap.serialize_into(&mut buf).unwrap();
839 let size = buf.len();
840
841 if self.bytes + size > MAX_BITMAP_ARRAY_LENGTH {
842 self.flush().await?;
843 }
844
845 self.keys.push(key);
846 self.serialized.push(buf);
847 self.bytes += size;
848 self.num_bitmaps += 1;
849 Ok(())
850 }
851
852 async fn flush(&mut self) -> Result<()> {
854 if self.keys.is_empty() {
855 return Ok(());
856 }
857 let keys_array =
858 ScalarValue::iter_to_array(self.keys.drain(..).collect::<Vec<_>>().into_iter())
859 .unwrap();
860 let total_size: usize = self.serialized.iter().map(|b| b.len()).sum();
861 let mut binary_builder = BinaryBuilder::with_capacity(self.serialized.len(), total_size);
862 for b in self.serialized.drain(..) {
863 binary_builder.append_value(&b);
864 }
865 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
866 let batch = BitmapIndexPlugin::get_batch_from_arrays(keys_array, bitmaps_array)?;
867 self.file.write_record_batch(batch).await?;
868 self.bytes = 0;
869 Ok(())
870 }
871
872 async fn finish(mut self) -> Result<()> {
874 self.flush().await?;
875 let stats_json = serde_json::to_string(&BitmapStatistics {
876 num_bitmaps: self.num_bitmaps,
877 })
878 .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
879 let mut metadata = HashMap::new();
880 metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
881 self.file.finish_with_metadata(metadata).await?;
882 Ok(())
883 }
884}
885
886fn bitmap_shard_file_name(partition_id: u64) -> String {
887 format!("{BITMAP_PART_LOOKUP_PREFIX}{partition_id}{BITMAP_PART_LOOKUP_SUFFIX}")
888}
889
890fn tagged_bitmap_partition_id(id: u32, tag: u64) -> u64 {
891 ((id as u64) << 32) | tag
892}
893
894fn bitmap_shard_partition_id(fragment_ids: &[u32], shard_id: Option<u32>) -> Result<u64> {
895 if fragment_ids.is_empty() {
896 return Err(Error::invalid_input(
897 "Bitmap shard build requires at least one fragment id".to_string(),
898 ));
899 }
900
901 if let Some(shard_id) = shard_id {
902 return Ok(tagged_bitmap_partition_id(shard_id, EXPLICIT_SHARD_ID_TAG));
903 }
904
905 let [fragment_id] = fragment_ids else {
906 return Err(Error::invalid_input(format!(
907 "Bitmap distributed build over multiple fragments requires an explicit shard_id. \
908 Received {} fragment ids: {:?}. Please assign mutually exclusive shard_id values \
909 to disjoint fragment groups.",
910 fragment_ids.len(),
911 fragment_ids
912 )));
913 };
914
915 Ok(tagged_bitmap_partition_id(
916 *fragment_id,
917 IMPLICIT_FRAGMENT_ID_TAG,
918 ))
919}
920
921fn extract_bitmap_shard_id(filename: &str) -> Result<u64> {
922 let partition_id = filename
923 .strip_prefix(BITMAP_PART_LOOKUP_PREFIX)
924 .and_then(|name| name.strip_suffix(BITMAP_PART_LOOKUP_SUFFIX))
925 .ok_or_else(|| {
926 Error::internal(format!("Invalid bitmap shard file name format: {filename}"))
927 })?;
928 partition_id.parse::<u64>().map_err(|_| {
929 Error::internal(format!(
930 "Failed to parse bitmap partition id from file name: {filename}"
931 ))
932 })
933}
934
935fn deserialize_bitmap(bitmap_bytes: &[u8], file_name: &str) -> Result<RowAddrTreeMap> {
936 RowAddrTreeMap::deserialize_from(bitmap_bytes).map_err(|error| {
937 Error::corrupt_file(
938 Path::from(file_name),
939 format!("Failed to deserialize bitmap bytes: {error}"),
940 )
941 })
942}
943
944async fn new_bitmap_batch_writer(
945 index_store: &dyn IndexStore,
946 file_name: &str,
947 value_type: &DataType,
948) -> Result<BitmapBatchWriter> {
949 let schema = Arc::new(Schema::new(vec![
950 Field::new("keys", value_type.clone(), true),
951 Field::new("bitmaps", DataType::Binary, true),
952 ]));
953 let index_file = index_store.new_index_file(file_name, schema).await?;
954 Ok(BitmapBatchWriter::new(index_file))
955}
956
957#[derive(Clone, Debug, Eq, PartialEq)]
958struct BitmapHeapItem {
959 key: OrderableScalarValue,
960 shard_idx: usize,
961}
962
963impl Ord for BitmapHeapItem {
964 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
965 self.key
966 .cmp(&other.key)
967 .then_with(|| self.shard_idx.cmp(&other.shard_idx))
968 }
969}
970
971impl PartialOrd for BitmapHeapItem {
972 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
973 Some(self.cmp(other))
974 }
975}
976
977struct BitmapShardCursor {
978 file_name: String,
979 reader: Arc<dyn IndexReader>,
980 total_rows: usize,
981 next_row_offset: usize,
982 batch: Option<RecordBatch>,
983 batch_row_idx: usize,
984}
985
986impl BitmapShardCursor {
987 async fn try_new(file_name: String, reader: Arc<dyn IndexReader>) -> Result<Option<Self>> {
988 let total_rows = reader.num_rows();
989 if total_rows == 0 {
990 return Ok(None);
991 }
992
993 let mut cursor = Self {
994 file_name,
995 reader,
996 total_rows,
997 next_row_offset: 0,
998 batch: None,
999 batch_row_idx: 0,
1000 };
1001 if cursor.advance().await? {
1002 Ok(Some(cursor))
1003 } else {
1004 Ok(None)
1005 }
1006 }
1007
1008 fn peek_key(&self) -> Result<OrderableScalarValue> {
1009 let batch = self.batch.as_ref().ok_or_else(|| {
1010 Error::internal(format!(
1011 "Bitmap shard {} has no active batch",
1012 self.file_name
1013 ))
1014 })?;
1015 let key = ScalarValue::try_from_array(batch.column(0), self.batch_row_idx)?;
1016 Ok(OrderableScalarValue(key))
1017 }
1018
1019 fn take_current(&mut self) -> Result<(ScalarValue, RowAddrTreeMap)> {
1020 let batch = self.batch.as_ref().ok_or_else(|| {
1021 Error::internal(format!(
1022 "Bitmap shard {} has no active batch",
1023 self.file_name
1024 ))
1025 })?;
1026 let keys = batch.column(0);
1027 let binary_bitmaps = batch
1028 .column(1)
1029 .as_any()
1030 .downcast_ref::<BinaryArray>()
1031 .ok_or_else(|| {
1032 Error::corrupt_file(
1033 Path::from(self.file_name.as_str()),
1034 "Bitmap shard batch has non-binary bitmap column".to_string(),
1035 )
1036 })?;
1037 let key = ScalarValue::try_from_array(keys, self.batch_row_idx)?;
1038 let bitmap = deserialize_bitmap(binary_bitmaps.value(self.batch_row_idx), &self.file_name)?;
1039 self.batch_row_idx += 1;
1040 Ok((key, bitmap))
1041 }
1042
1043 async fn advance(&mut self) -> Result<bool> {
1044 loop {
1045 if let Some(batch) = &self.batch
1046 && self.batch_row_idx < batch.num_rows()
1047 {
1048 return Ok(true);
1049 }
1050
1051 if self.next_row_offset >= self.total_rows {
1052 self.batch = None;
1053 return Ok(false);
1054 }
1055
1056 let end_row = (self.next_row_offset + MERGE_ROWS_PER_CHUNK).min(self.total_rows);
1057 let batch = self
1058 .reader
1059 .read_range(self.next_row_offset..end_row, None)
1060 .await?;
1061 self.next_row_offset = end_row;
1062 self.batch = Some(batch);
1063 self.batch_row_idx = 0;
1064 }
1065 }
1066}
1067
1068async fn advance_cursor_and_push(
1069 cursors: &mut [BitmapShardCursor],
1070 heap: &mut BinaryHeap<Reverse<BitmapHeapItem>>,
1071 shard_idx: usize,
1072) -> Result<()> {
1073 if cursors[shard_idx].advance().await? {
1074 heap.push(Reverse(BitmapHeapItem {
1075 key: cursors[shard_idx].peek_key()?,
1076 shard_idx,
1077 }));
1078 }
1079 Ok(())
1080}
1081
1082async fn drain_same_key_bitmaps(
1083 cursors: &mut [BitmapShardCursor],
1084 heap: &mut BinaryHeap<Reverse<BitmapHeapItem>>,
1085 item: BitmapHeapItem,
1086) -> Result<(ScalarValue, RowAddrTreeMap)> {
1087 let (key, mut merged_bitmap) = cursors[item.shard_idx].take_current()?;
1088 let merged_key = OrderableScalarValue(key);
1089 advance_cursor_and_push(cursors, heap, item.shard_idx).await?;
1090
1091 loop {
1092 let Some(Reverse(next_item)) = heap.peek() else {
1093 break;
1094 };
1095 if next_item.key != merged_key {
1096 break;
1097 }
1098
1099 let shard_idx = next_item.shard_idx;
1100 let _ = heap.pop();
1101 let (_, bitmap) = cursors[shard_idx].take_current()?;
1102 merged_bitmap |= &bitmap;
1103 advance_cursor_and_push(cursors, heap, shard_idx).await?;
1104 }
1105
1106 Ok((merged_key.0, merged_bitmap))
1107}
1108
1109async fn list_bitmap_shard_files(
1110 object_store: &ObjectStore,
1111 index_dir: &Path,
1112 progress: &dyn IndexBuildProgress,
1113) -> Result<Vec<String>> {
1114 let mut shard_files = Vec::new();
1115 let mut list_stream = object_store.list(Some(index_dir.clone()));
1116 while let Some(item) = list_stream.next().await {
1117 match item {
1118 Ok(meta) => {
1119 let file_name = meta.location.filename().unwrap_or_default();
1120 if file_name.starts_with(BITMAP_PART_LOOKUP_PREFIX)
1121 && file_name.ends_with(BITMAP_PART_LOOKUP_SUFFIX)
1122 {
1123 shard_files.push(file_name.to_string());
1124 progress
1125 .stage_progress("scan_bitmap_shards", shard_files.len() as u64)
1126 .await?;
1127 }
1128 }
1129 Err(err) => {
1130 return Err(Error::io(format!(
1131 "Failed to list bitmap shard files in {}: {err}",
1132 index_dir
1133 )));
1134 }
1135 }
1136 }
1137 let mut shard_files = shard_files
1138 .into_iter()
1139 .map(|file_name| extract_bitmap_shard_id(&file_name).map(|shard_id| (shard_id, file_name)))
1140 .collect::<Result<Vec<_>>>()?;
1141 shard_files.sort_unstable_by_key(|(shard_id, _)| *shard_id);
1142 let shard_files = shard_files
1143 .into_iter()
1144 .map(|(_, file_name)| file_name)
1145 .collect::<Vec<_>>();
1146 if shard_files.is_empty() {
1147 return Err(Error::invalid_input(format!(
1148 "No bitmap shard files found in index directory: {}; \
1149 call build_index for each fragment before calling merge_index_metadata",
1150 index_dir
1151 )));
1152 }
1153 Ok(shard_files)
1154}
1155
1156async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_files: &[String]) {
1157 for file_name in shard_files {
1158 if let Err(error) = store.delete_index_file(file_name).await {
1159 warn!(
1160 "Failed to delete bitmap shard file '{}': {}. \
1161 This does not affect the merged bitmap index, but the shard file \
1162 may need manual cleanup.",
1163 file_name, error
1164 );
1165 }
1166 }
1167}
1168
1169#[derive(Debug, Default)]
1170pub struct BitmapIndexPlugin;
1171
1172impl BitmapIndexPlugin {
1173 fn get_batch_from_arrays(
1174 keys: Arc<dyn Array>,
1175 binary_bitmaps: Arc<dyn Array>,
1176 ) -> Result<RecordBatch> {
1177 let schema = Arc::new(Schema::new(vec![
1178 Field::new("keys", keys.data_type().clone(), true),
1179 Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
1180 ]));
1181
1182 let columns = vec![keys, binary_bitmaps];
1183
1184 Ok(RecordBatch::try_new(schema, columns)?)
1185 }
1186
1187 async fn write_bitmap_index(
1188 state: HashMap<ScalarValue, RowAddrTreeMap>,
1189 index_store: &dyn IndexStore,
1190 value_type: &DataType,
1191 ) -> Result<()> {
1192 Self::write_bitmap_index_with_extras(
1193 state,
1194 index_store,
1195 value_type,
1196 HashMap::new(),
1197 Vec::new(),
1198 )
1199 .await
1200 }
1201
1202 pub(crate) async fn write_bitmap_index_with_extras(
1204 state: HashMap<ScalarValue, RowAddrTreeMap>,
1205 index_store: &dyn IndexStore,
1206 value_type: &DataType,
1207 mut metadata: HashMap<String, String>,
1208 global_buffers: Vec<(String, Bytes)>,
1209 ) -> Result<()> {
1210 let num_bitmaps = state.len();
1211 let schema = Arc::new(Schema::new(vec![
1212 Field::new("keys", value_type.clone(), true),
1213 Field::new("bitmaps", DataType::Binary, true),
1214 ]));
1215
1216 let mut bitmap_index_file = index_store
1217 .new_index_file(BITMAP_LOOKUP_NAME, schema)
1218 .await?;
1219
1220 for (metadata_key, data) in global_buffers {
1221 let buffer_idx = bitmap_index_file.add_global_buffer(data).await?;
1222 metadata.insert(metadata_key, buffer_idx.to_string());
1223 }
1224
1225 let mut cur_keys = Vec::new();
1226 let mut cur_bitmaps = Vec::new();
1227 let mut cur_bytes = 0;
1228
1229 for (key, bitmap) in state.into_iter() {
1230 let mut bytes = Vec::new();
1231 bitmap.serialize_into(&mut bytes).unwrap();
1232 let bitmap_size = bytes.len();
1233
1234 if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
1235 let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
1236 let mut binary_builder = BinaryBuilder::new();
1237 for b in &cur_bitmaps {
1238 binary_builder.append_value(b);
1239 }
1240 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
1241
1242 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
1243 bitmap_index_file.write_record_batch(record_batch).await?;
1244
1245 cur_keys.clear();
1246 cur_bitmaps.clear();
1247 cur_bytes = 0;
1248 }
1249
1250 cur_keys.push(key);
1251 cur_bitmaps.push(bytes);
1252 cur_bytes += bitmap_size;
1253 }
1254
1255 if !cur_keys.is_empty() {
1257 let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
1258 let mut binary_builder = BinaryBuilder::new();
1259 for b in &cur_bitmaps {
1260 binary_builder.append_value(b);
1261 }
1262 let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
1263
1264 let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
1265 bitmap_index_file.write_record_batch(record_batch).await?;
1266 }
1267
1268 let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps })
1270 .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
1271 metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
1272
1273 bitmap_index_file.finish_with_metadata(metadata).await?;
1274
1275 Ok(())
1276 }
1277
1278 pub(crate) async fn build_bitmap_index_state(
1280 mut data_source: SendableRecordBatchStream,
1281 mut state: HashMap<ScalarValue, RowAddrTreeMap>,
1282 ) -> Result<(HashMap<ScalarValue, RowAddrTreeMap>, DataType)> {
1283 let value_type = data_source.schema().field(0).data_type().clone();
1284 while let Some(batch) = data_source.try_next().await? {
1285 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1286 let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
1287 debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
1288
1289 let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
1290
1291 for i in 0..values.len() {
1292 let row_id = row_id_column.value(i);
1293 let key = ScalarValue::try_from_array(values.as_ref(), i)?;
1294 state.entry(key.clone()).or_default().insert(row_id);
1295 }
1296 }
1297
1298 Ok((state, value_type))
1299 }
1300
1301 pub async fn train_bitmap_index(
1302 data: SendableRecordBatchStream,
1303 index_store: &dyn IndexStore,
1304 ) -> Result<()> {
1305 Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await
1306 }
1307
1308 async fn train_bitmap_shard(
1309 data: SendableRecordBatchStream,
1310 index_store: &dyn IndexStore,
1311 fragment_ids: &[u32],
1312 shard_id: Option<u32>,
1313 progress: Arc<dyn crate::progress::IndexBuildProgress>,
1314 ) -> Result<()> {
1315 let partition_id = bitmap_shard_partition_id(fragment_ids, shard_id)?;
1316 let file_name = bitmap_shard_file_name(partition_id);
1317 progress
1318 .stage_start("build_bitmap_shard", None, "rows")
1319 .await?;
1320 Self::streaming_build_and_write(data, None, index_store, &file_name).await?;
1321 progress.stage_complete("build_bitmap_shard").await?;
1322 Ok(())
1323 }
1324
1325 async fn streaming_build_and_write(
1333 mut data_source: SendableRecordBatchStream,
1334 old_index: Option<&BitmapIndex>,
1335 index_store: &dyn IndexStore,
1336 output_file_name: &str,
1337 ) -> Result<()> {
1338 let value_type = data_source.schema().field(0).data_type().clone();
1339
1340 let mut writer =
1341 new_bitmap_batch_writer(index_store, output_file_name, &value_type).await?;
1342
1343 let old_keys: Vec<OrderableScalarValue> = old_index
1346 .map(|idx| idx.index_map.keys().cloned().collect())
1347 .unwrap_or_default();
1348 let mut old_pos: usize = 0;
1349
1350 let mut current_key: Option<ScalarValue> = None;
1352 let mut current_bitmap = RowAddrTreeMap::default();
1353 let mut emitted_null = false;
1356
1357 while let Some(batch) = data_source.try_next().await? {
1358 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1359 let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
1360 debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
1361 let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
1362
1363 for i in 0..values.len() {
1364 let row_id = row_id_column.value(i);
1365 let key = ScalarValue::try_from_array(values.as_ref(), i)?;
1366
1367 match ¤t_key {
1368 Some(cur) if *cur == key => {
1369 current_bitmap.insert(row_id);
1370 }
1371 _ => {
1372 if let Some(prev_key) = current_key.take() {
1374 let mut prev_bitmap = std::mem::take(&mut current_bitmap);
1375 Self::finish_run(
1376 prev_key,
1377 &mut prev_bitmap,
1378 old_index,
1379 &old_keys,
1380 &mut old_pos,
1381 &mut emitted_null,
1382 &mut writer,
1383 )
1384 .await?;
1385 }
1386 current_key = Some(key);
1387 current_bitmap = RowAddrTreeMap::default();
1388 current_bitmap.insert(row_id);
1389 }
1390 }
1391 }
1392 }
1393
1394 if let Some(last_key) = current_key.take() {
1396 let mut last_bitmap = std::mem::take(&mut current_bitmap);
1397 Self::finish_run(
1398 last_key,
1399 &mut last_bitmap,
1400 old_index,
1401 &old_keys,
1402 &mut old_pos,
1403 &mut emitted_null,
1404 &mut writer,
1405 )
1406 .await?;
1407 }
1408
1409 if let Some(idx) = old_index {
1411 while old_pos < old_keys.len() {
1412 let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?;
1413 writer
1414 .emit(old_keys[old_pos].0.clone(), &old_bitmap)
1415 .await?;
1416 old_pos += 1;
1417 }
1418 }
1419
1420 if !emitted_null
1422 && let Some(idx) = old_index
1423 && !idx.null_map.is_empty()
1424 {
1425 let null_key = new_null_array(&value_type, 1);
1426 let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?;
1427 writer.emit(null_key, &idx.null_map).await?;
1428 }
1429
1430 writer.finish().await?;
1431
1432 Ok(())
1433 }
1434
1435 async fn finish_run(
1439 key: ScalarValue,
1440 bitmap: &mut RowAddrTreeMap,
1441 old_index: Option<&BitmapIndex>,
1442 old_keys: &[OrderableScalarValue],
1443 old_pos: &mut usize,
1444 emitted_null: &mut bool,
1445 writer: &mut BitmapBatchWriter,
1446 ) -> Result<()> {
1447 if key.is_null() {
1448 if let Some(idx) = old_index
1450 && !idx.null_map.is_empty()
1451 {
1452 *bitmap |= &*idx.null_map;
1453 }
1454 *emitted_null = true;
1455 writer.emit(key, bitmap).await?;
1456 } else if let Some(idx) = old_index {
1457 let orderable = OrderableScalarValue(key.clone());
1458
1459 while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable {
1461 let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
1462 writer
1463 .emit(old_keys[*old_pos].0.clone(), &old_bitmap)
1464 .await?;
1465 *old_pos += 1;
1466 }
1467
1468 if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable {
1470 let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
1471 *bitmap |= &*old_bitmap;
1472 *old_pos += 1;
1473 }
1474
1475 writer.emit(key, bitmap).await?;
1476 } else {
1477 writer.emit(key, bitmap).await?;
1478 }
1479 Ok(())
1480 }
1481
1482 pub(crate) fn remap_bitmap_state(
1484 state: HashMap<ScalarValue, RowAddrTreeMap>,
1485 mapping: &HashMap<u64, Option<u64>>,
1486 ) -> HashMap<ScalarValue, RowAddrTreeMap> {
1487 state
1488 .into_iter()
1489 .map(|(key, bitmap)| {
1490 let remapped_bitmap =
1491 RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
1492 let addr_as_u64 = u64::from(addr);
1493 mapping
1494 .get(&addr_as_u64)
1495 .copied()
1496 .unwrap_or(Some(addr_as_u64))
1497 }));
1498 (key, remapped_bitmap)
1499 })
1500 .collect()
1501 }
1502
1503 async fn merge_shards(
1520 store: &dyn IndexStore,
1521 shard_files: &[String],
1522 progress: Arc<dyn IndexBuildProgress>,
1523 ) -> Result<()> {
1524 progress
1525 .stage_start("merge_bitmap_shards", None, "bitmaps")
1526 .await?;
1527
1528 let mut cursors = Vec::with_capacity(shard_files.len());
1529 let mut heap = BinaryHeap::with_capacity(shard_files.len());
1530 let mut value_type: Option<DataType> = None;
1531
1532 for file_name in shard_files {
1533 let reader = store.open_index_file(file_name).await?;
1534 let shard_value_type = reader.schema().fields[0].data_type().clone();
1535 if let Some(existing_type) = &value_type {
1536 if existing_type != &shard_value_type {
1537 return Err(Error::invalid_input(format!(
1538 "Bitmap shard {} has value type {:?}, expected {:?}",
1539 file_name, shard_value_type, existing_type
1540 )));
1541 }
1542 } else {
1543 value_type = Some(shard_value_type);
1544 }
1545 if let Some(cursor) = BitmapShardCursor::try_new(file_name.clone(), reader).await? {
1546 let key = cursor.peek_key()?;
1547 let shard_idx = cursors.len();
1548 cursors.push(cursor);
1549 heap.push(Reverse(BitmapHeapItem { key, shard_idx }));
1550 }
1551 }
1552
1553 let value_type = value_type.ok_or_else(|| {
1554 Error::invalid_input("Bitmap shard merge requires at least one shard file".to_string())
1555 })?;
1556 let mut writer = new_bitmap_batch_writer(store, BITMAP_LOOKUP_NAME, &value_type).await?;
1557 let mut merged_keys = 0u64;
1558
1559 while let Some(Reverse(item)) = heap.pop() {
1560 let (key, merged_bitmap) =
1561 drain_same_key_bitmaps(&mut cursors, &mut heap, item).await?;
1562 writer.emit(key, &merged_bitmap).await?;
1563 merged_keys += 1;
1564 progress
1565 .stage_progress("merge_bitmap_shards", merged_keys)
1566 .await?;
1567 }
1568
1569 progress.stage_complete("merge_bitmap_shards").await?;
1570 progress
1571 .stage_start("write_bitmap_index", Some(1), "files")
1572 .await?;
1573 writer.finish().await?;
1574 progress.stage_progress("write_bitmap_index", 1).await?;
1575 progress.stage_complete("write_bitmap_index").await?;
1576 Ok(())
1577 }
1578}
1579
1580pub async fn merge_index_files(
1581 object_store: &ObjectStore,
1582 index_dir: &Path,
1583 store: Arc<dyn IndexStore>,
1584 progress: Arc<dyn IndexBuildProgress>,
1585) -> Result<()> {
1586 progress
1587 .stage_start("scan_bitmap_shards", None, "files")
1588 .await?;
1589 let shard_files = list_bitmap_shard_files(object_store, index_dir, progress.as_ref()).await?;
1590 progress.stage_complete("scan_bitmap_shards").await?;
1591
1592 BitmapIndexPlugin::merge_shards(store.as_ref(), &shard_files, progress).await?;
1593 cleanup_bitmap_shard_files(store.as_ref(), &shard_files).await;
1594 Ok(())
1595}
1596
1597#[async_trait]
1598impl ScalarIndexPlugin for BitmapIndexPlugin {
1599 fn name(&self) -> &str {
1600 "Bitmap"
1601 }
1602
1603 fn new_training_request(
1604 &self,
1605 params: &str,
1606 field: &Field,
1607 ) -> Result<Box<dyn TrainingRequest>> {
1608 if field.data_type().is_nested() {
1609 return Err(Error::invalid_input_source(
1610 "A bitmap index can only be created on a non-nested field.".into(),
1611 ));
1612 }
1613 let params = if params.is_empty() {
1614 BitmapParameters::default()
1615 } else {
1616 serde_json::from_str::<BitmapParameters>(params)?
1617 };
1618 Ok(Box::new(BitmapTrainingRequest::new(params)))
1619 }
1620
1621 fn provides_exact_answer(&self) -> bool {
1622 true
1623 }
1624
1625 fn version(&self) -> u32 {
1626 BITMAP_INDEX_VERSION
1627 }
1628
1629 fn new_query_parser(
1630 &self,
1631 index_name: String,
1632 _index_details: &prost_types::Any,
1633 ) -> Option<Box<dyn ScalarQueryParser>> {
1634 Some(Box::new(SargableQueryParser::new(
1635 index_name,
1636 self.name().to_string(),
1637 false,
1638 )))
1639 }
1640
1641 async fn train_index(
1642 &self,
1643 data: SendableRecordBatchStream,
1644 index_store: &dyn IndexStore,
1645 request: Box<dyn TrainingRequest>,
1646 fragment_ids: Option<Vec<u32>>,
1647 progress: Arc<dyn crate::progress::IndexBuildProgress>,
1648 ) -> Result<CreatedIndex> {
1649 let request = request
1650 .as_any()
1651 .downcast_ref::<BitmapTrainingRequest>()
1652 .ok_or_else(|| {
1653 Error::internal(
1654 "BitmapIndexPlugin::train_index received a non-bitmap training request"
1655 .to_string(),
1656 )
1657 })?;
1658 if let Some(fragment_ids) = fragment_ids.as_ref() {
1659 Self::train_bitmap_shard(
1660 data,
1661 index_store,
1662 fragment_ids,
1663 request.parameters.shard_id,
1664 progress,
1665 )
1666 .await?;
1667 } else if request.parameters.shard_id.is_some() {
1668 return Err(Error::invalid_input(
1669 "Bitmap shard_id requires fragment_ids and is only supported for distributed shard builds"
1670 .to_string(),
1671 ));
1672 } else {
1673 Self::train_bitmap_index(data, index_store).await?;
1674 }
1675 Ok(CreatedIndex {
1676 index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
1677 .unwrap(),
1678 index_version: BITMAP_INDEX_VERSION,
1679 files: Some(index_store.list_files_with_sizes().await?),
1680 })
1681 }
1682
1683 async fn load_index(
1685 &self,
1686 index_store: Arc<dyn IndexStore>,
1687 _index_details: &prost_types::Any,
1688 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1689 cache: &LanceCache,
1690 ) -> Result<Arc<dyn ScalarIndex>> {
1691 Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
1692 }
1693
1694 async fn get_from_cache(
1695 &self,
1696 index_store: Arc<dyn IndexStore>,
1697 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1698 cache: &LanceCache,
1699 ) -> Result<Option<Arc<dyn ScalarIndex>>> {
1700 let Some(state) = cache.get_with_key(&BitmapIndexStateKey).await else {
1701 return Ok(None);
1702 };
1703 let state = (*state).clone();
1704 let index = state.into_bitmap_index(index_store, cache, frag_reuse_index)?;
1705 Ok(Some(index as Arc<dyn ScalarIndex>))
1706 }
1707
1708 async fn put_in_cache(&self, cache: &LanceCache, index: Arc<dyn ScalarIndex>) -> Result<()> {
1709 let bitmap = index
1710 .as_any()
1711 .downcast_ref::<BitmapIndex>()
1712 .ok_or_else(|| {
1713 Error::internal("BitmapIndexPlugin::put_in_cache called with a non-bitmap index")
1714 })?;
1715 let state = BitmapIndexState::from_index(bitmap)?;
1716 cache
1717 .insert_with_key(&BitmapIndexStateKey, Arc::new(state))
1718 .await;
1719 Ok(())
1720 }
1721
1722 async fn load_statistics(
1723 &self,
1724 index_store: Arc<dyn IndexStore>,
1725 _index_details: &prost_types::Any,
1726 ) -> Result<Option<serde_json::Value>> {
1727 let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
1728 if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
1729 let stats = serde_json::from_str(value).map_err(|e| {
1730 Error::internal(format!("failed to parse bitmap statistics metadata: {e}"))
1731 })?;
1732 Ok(Some(stats))
1733 } else {
1734 Ok(None)
1735 }
1736 }
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741 use super::*;
1742 use crate::metrics::NoOpMetricsCollector;
1743 use crate::scalar::lance_format::LanceIndexStore;
1744 use arrow_array::{RecordBatch, StringArray, UInt64Array, record_batch};
1745 use arrow_schema::{DataType, Field, Schema};
1746
1747 fn sort_batch_by_value(batch: &RecordBatch) -> RecordBatch {
1750 use arrow::compute::SortOptions;
1751 let values = batch.column(0);
1752 let row_ids = batch.column(1);
1753 let options = SortOptions {
1754 descending: false,
1755 nulls_first: true,
1756 };
1757 let indices = arrow::compute::sort_to_indices(values, Some(options), None).unwrap();
1758 let sorted_values = arrow::compute::take(values.as_ref(), &indices, None).unwrap();
1759 let sorted_row_ids = arrow::compute::take(row_ids.as_ref(), &indices, None).unwrap();
1760 RecordBatch::try_new(batch.schema(), vec![sorted_values, sorted_row_ids]).unwrap()
1761 }
1762 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1763 use futures::stream;
1764 use lance_core::utils::mask::RowSetOps;
1765 use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
1766 use lance_io::object_store::ObjectStore;
1767 use std::collections::HashMap;
1768
1769 fn assert_state_roundtrips(state: &BitmapIndexState) {
1770 let mut buf = Vec::new();
1771 state.serialize(&mut buf).unwrap();
1772 let restored = BitmapIndexState::deserialize(&bytes::Bytes::from(buf)).unwrap();
1773 assert_eq!(restored.lookup_batch, state.lookup_batch);
1774 assert_eq!(&*restored.null_map, &*state.null_map);
1775 assert_eq!(restored.value_type, state.value_type);
1776 }
1777
1778 #[test]
1779 fn test_bitmap_index_state_codec_roundtrip() {
1780 let mut index_map = BTreeMap::new();
1782 index_map.insert(OrderableScalarValue(ScalarValue::Int32(Some(1))), 0);
1783 index_map.insert(OrderableScalarValue(ScalarValue::Int32(Some(7))), 1);
1784 index_map.insert(OrderableScalarValue(ScalarValue::Int32(Some(42))), 2);
1785 let mut null_map = RowAddrTreeMap::new();
1786 null_map.insert(RowAddress::new_from_parts(0, 3).into());
1787 null_map.insert(RowAddress::new_from_parts(0, 5).into());
1788 let state = BitmapIndexState {
1789 lookup_batch: build_lookup_batch(&index_map, &DataType::Int32).unwrap(),
1790 null_map: Arc::new(null_map),
1791 value_type: DataType::Int32,
1792 };
1793 assert_state_roundtrips(&state);
1794
1795 let empty_state = BitmapIndexState {
1797 lookup_batch: build_lookup_batch(&BTreeMap::new(), &DataType::Utf8).unwrap(),
1798 null_map: Arc::new(RowAddrTreeMap::new()),
1799 value_type: DataType::Utf8,
1800 };
1801 assert_state_roundtrips(&empty_state);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_bitmap_lazy_loading_and_cache() {
1806 let tmpdir = TempObjDir::default();
1808 let store = Arc::new(LanceIndexStore::new(
1809 Arc::new(ObjectStore::local()),
1810 tmpdir.clone(),
1811 Arc::new(LanceCache::no_cache()),
1812 ));
1813
1814 let colors = vec![
1816 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1817 "red", "red", "blue", "green", "yellow",
1818 ];
1819
1820 let row_ids = (0u64..15u64).collect::<Vec<_>>();
1821
1822 let schema = Arc::new(Schema::new(vec![
1823 Field::new("value", DataType::Utf8, false),
1824 Field::new("_rowid", DataType::UInt64, false),
1825 ]));
1826
1827 let batch = RecordBatch::try_new(
1828 schema.clone(),
1829 vec![
1830 Arc::new(StringArray::from(colors.clone())),
1831 Arc::new(UInt64Array::from(row_ids.clone())),
1832 ],
1833 )
1834 .unwrap();
1835
1836 let batch = sort_batch_by_value(&batch);
1837 let stream = stream::once(async move { Ok(batch) });
1838 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1839
1840 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1842 .await
1843 .unwrap();
1844
1845 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
1850 .await
1851 .unwrap();
1852
1853 assert_eq!(index.index_map.len(), 4); assert!(index.null_map.is_empty()); let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
1858 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1859
1860 let expected_red_rows = vec![0u64, 3, 6, 10, 11];
1862 if let SearchResult::Exact(row_ids) = result {
1863 let mut actual: Vec<u64> = row_ids
1864 .true_rows()
1865 .row_addrs()
1866 .unwrap()
1867 .map(|id| id.into())
1868 .collect();
1869 actual.sort();
1870 assert_eq!(actual, expected_red_rows);
1871 } else {
1872 panic!("Expected exact search result");
1873 }
1874
1875 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1877 if let SearchResult::Exact(row_ids) = result {
1878 let mut actual: Vec<u64> = row_ids
1879 .true_rows()
1880 .row_addrs()
1881 .unwrap()
1882 .map(|id| id.into())
1883 .collect();
1884 actual.sort();
1885 assert_eq!(actual, expected_red_rows);
1886 }
1887
1888 let query = SargableQuery::Range(
1890 std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1891 std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1892 );
1893 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1894
1895 let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
1896 if let SearchResult::Exact(row_ids) = result {
1897 let mut actual: Vec<u64> = row_ids
1898 .true_rows()
1899 .row_addrs()
1900 .unwrap()
1901 .map(|id| id.into())
1902 .collect();
1903 actual.sort();
1904 assert_eq!(actual, expected_range_rows);
1905 }
1906
1907 let query = SargableQuery::Range(
1909 std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1910 std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1911 );
1912 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1913 if let SearchResult::Exact(row_ids) = result {
1914 assert!(row_ids.true_rows().is_empty());
1915 } else {
1916 panic!("Expected exact search result");
1917 }
1918
1919 let query = SargableQuery::IsIn(vec![
1921 ScalarValue::Utf8(Some("red".to_string())),
1922 ScalarValue::Utf8(Some("yellow".to_string())),
1923 ]);
1924 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1925
1926 let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
1927 if let SearchResult::Exact(row_ids) = result {
1928 let mut actual: Vec<u64> = row_ids
1929 .true_rows()
1930 .row_addrs()
1931 .unwrap()
1932 .map(|id| id.into())
1933 .collect();
1934 actual.sort();
1935 assert_eq!(actual, expected_in_rows);
1936 }
1937 }
1938
1939 #[tokio::test]
1940 #[ignore]
1941 async fn test_big_bitmap_index() {
1942 use super::{BITMAP_LOOKUP_NAME, BitmapIndex};
1945 use crate::scalar::IndexStore;
1946 use crate::scalar::lance_format::LanceIndexStore;
1947 use arrow_schema::DataType;
1948 use datafusion_common::ScalarValue;
1949 use lance_core::cache::LanceCache;
1950 use lance_core::utils::mask::RowAddrTreeMap;
1951 use lance_io::object_store::ObjectStore;
1952 use std::collections::HashMap;
1953 use std::sync::Arc;
1954
1955 let m: u32 = 2_500_000;
1961 let per_bitmap_size = 1000; let mut state = HashMap::new();
1964 for i in 0..m {
1965 let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
1967
1968 let key = ScalarValue::UInt32(Some(i));
1969 state.insert(key, bitmap);
1970 }
1971
1972 let tmpdir = TempObjDir::default();
1974 let test_store = LanceIndexStore::new(
1975 Arc::new(ObjectStore::local()),
1976 tmpdir.clone(),
1977 Arc::new(LanceCache::no_cache()),
1978 );
1979
1980 let result =
1983 BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
1984
1985 assert!(
1986 result.is_ok(),
1987 "Failed to write bitmap index: {:?}",
1988 result.err()
1989 );
1990
1991 let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
1993 assert!(
1994 index_file.is_ok(),
1995 "Failed to open index file: {:?}",
1996 index_file.err()
1997 );
1998 let index_file = index_file.unwrap();
1999
2000 tracing::info!(
2002 "Index file contains {} rows in total",
2003 index_file.num_rows()
2004 );
2005
2006 tracing::info!("Loading index from disk...");
2008 let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
2009 .await
2010 .expect("Failed to load bitmap index");
2011
2012 assert_eq!(
2014 loaded_index.index_map.len(),
2015 m as usize,
2016 "Loaded index has incorrect number of keys (expected {}, got {})",
2017 m,
2018 loaded_index.index_map.len()
2019 );
2020
2021 let test_keys = [0, m / 2, m - 1]; for &key_val in &test_keys {
2024 let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
2025 let bitmap = loaded_index
2027 .load_bitmap(&key, None)
2028 .await
2029 .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
2030
2031 let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
2033
2034 assert_eq!(
2036 row_addrs.len(),
2037 per_bitmap_size as usize,
2038 "Bitmap for key {} has wrong size",
2039 key_val
2040 );
2041
2042 for i in 0..5.min(per_bitmap_size) {
2044 assert!(
2045 row_addrs.contains(&i),
2046 "Bitmap for key {} should contain row_id {}",
2047 key_val,
2048 i
2049 );
2050 }
2051
2052 for i in (per_bitmap_size - 5)..per_bitmap_size {
2053 assert!(
2054 row_addrs.contains(&i),
2055 "Bitmap for key {} should contain row_id {}",
2056 key_val,
2057 i
2058 );
2059 }
2060
2061 let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
2063 assert_eq!(
2064 row_addrs, expected_range,
2065 "Bitmap for key {} doesn't contain expected values",
2066 key_val
2067 );
2068
2069 tracing::info!(
2070 "✓ Verified bitmap for key {}: {} rows as expected",
2071 key_val,
2072 row_addrs.len()
2073 );
2074 }
2075
2076 tracing::info!("Test successful! Index properly contains {} keys", m);
2077 }
2078
2079 #[tokio::test]
2080 async fn test_bitmap_prewarm() {
2081 let tmpdir = TempObjDir::default();
2083 let store = Arc::new(LanceIndexStore::new(
2084 Arc::new(ObjectStore::local()),
2085 tmpdir.clone(),
2086 Arc::new(LanceCache::no_cache()),
2087 ));
2088
2089 let colors = vec![
2091 "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
2092 "red", "red", "blue", "green", "yellow",
2093 ];
2094
2095 let row_ids = (0u64..15u64).collect::<Vec<_>>();
2096
2097 let schema = Arc::new(Schema::new(vec![
2098 Field::new("value", DataType::Utf8, false),
2099 Field::new("_rowid", DataType::UInt64, false),
2100 ]));
2101
2102 let batch = RecordBatch::try_new(
2103 schema.clone(),
2104 vec![
2105 Arc::new(StringArray::from(colors.clone())),
2106 Arc::new(UInt64Array::from(row_ids.clone())),
2107 ],
2108 )
2109 .unwrap();
2110
2111 let batch = sort_batch_by_value(&batch);
2112 let stream = stream::once(async move { Ok(batch) });
2113 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
2114
2115 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
2117 .await
2118 .unwrap();
2119
2120 let cache = LanceCache::with_capacity(1024 * 1024); let index = BitmapIndex::load(store.clone(), None, &cache)
2125 .await
2126 .unwrap();
2127
2128 let cache_key_red = BitmapKey {
2130 value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
2131 };
2132 let cache_key_blue = BitmapKey {
2133 value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
2134 };
2135
2136 assert!(
2137 cache
2138 .get_with_key::<BitmapKey>(&cache_key_red)
2139 .await
2140 .is_none()
2141 );
2142 assert!(
2143 cache
2144 .get_with_key::<BitmapKey>(&cache_key_blue)
2145 .await
2146 .is_none()
2147 );
2148
2149 index.prewarm().await.unwrap();
2151
2152 assert!(
2154 cache
2155 .get_with_key::<BitmapKey>(&cache_key_red)
2156 .await
2157 .is_some()
2158 );
2159 assert!(
2160 cache
2161 .get_with_key::<BitmapKey>(&cache_key_blue)
2162 .await
2163 .is_some()
2164 );
2165
2166 let cached_red = cache
2168 .get_with_key::<BitmapKey>(&cache_key_red)
2169 .await
2170 .unwrap();
2171 let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
2172 assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
2173
2174 index.prewarm().await.unwrap();
2176
2177 let cached_red_2 = cache
2179 .get_with_key::<BitmapKey>(&cache_key_red)
2180 .await
2181 .unwrap();
2182 let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
2183 assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
2184 }
2185
2186 #[tokio::test]
2187 async fn test_remap_bitmap_with_null() {
2188 use arrow_array::UInt32Array;
2189
2190 let tmpdir = TempObjDir::default();
2192 let test_store = Arc::new(LanceIndexStore::new(
2193 Arc::new(ObjectStore::local()),
2194 tmpdir.clone(),
2195 Arc::new(LanceCache::no_cache()),
2196 ));
2197
2198 let values = vec![
2203 None, None, Some(1u32), Some(1u32), Some(2u32), Some(2u32), ];
2210
2211 let row_ids: Vec<u64> = vec![
2213 RowAddress::new_from_parts(1, 0).into(),
2214 RowAddress::new_from_parts(1, 1).into(),
2215 RowAddress::new_from_parts(1, 2).into(),
2216 RowAddress::new_from_parts(2, 0).into(),
2217 RowAddress::new_from_parts(2, 1).into(),
2218 RowAddress::new_from_parts(2, 2).into(),
2219 ];
2220
2221 let schema = Arc::new(Schema::new(vec![
2222 Field::new("value", DataType::UInt32, true),
2223 Field::new("_rowid", DataType::UInt64, false),
2224 ]));
2225
2226 let batch = RecordBatch::try_new(
2227 schema.clone(),
2228 vec![
2229 Arc::new(UInt32Array::from(values)),
2230 Arc::new(UInt64Array::from(row_ids)),
2231 ],
2232 )
2233 .unwrap();
2234
2235 let stream = stream::once(async move { Ok(batch) });
2236 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
2237
2238 BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
2240 .await
2241 .unwrap();
2242
2243 let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2245 .await
2246 .expect("Failed to load bitmap index");
2247
2248 assert_eq!(index.index_map.len(), 2); assert!(!index.null_map.is_empty()); let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
2254 row_addr_map.insert(
2255 RowAddress::new_from_parts(1, 0).into(),
2256 Some(RowAddress::new_from_parts(3, 0).into()),
2257 );
2258 row_addr_map.insert(
2259 RowAddress::new_from_parts(1, 1).into(),
2260 Some(RowAddress::new_from_parts(3, 1).into()),
2261 );
2262 row_addr_map.insert(
2263 RowAddress::new_from_parts(1, 2).into(),
2264 Some(RowAddress::new_from_parts(3, 2).into()),
2265 );
2266 row_addr_map.insert(
2267 RowAddress::new_from_parts(2, 0).into(),
2268 Some(RowAddress::new_from_parts(3, 3).into()),
2269 );
2270 row_addr_map.insert(
2271 RowAddress::new_from_parts(2, 1).into(),
2272 Some(RowAddress::new_from_parts(3, 4).into()),
2273 );
2274 row_addr_map.insert(
2275 RowAddress::new_from_parts(2, 2).into(),
2276 Some(RowAddress::new_from_parts(3, 5).into()),
2277 );
2278
2279 index
2281 .remap(&row_addr_map, test_store.as_ref())
2282 .await
2283 .unwrap();
2284
2285 let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
2287 .await
2288 .expect("Failed to load remapped bitmap index");
2289
2290 let expected_null_addrs: Vec<u64> = vec![
2292 RowAddress::new_from_parts(3, 0).into(),
2293 RowAddress::new_from_parts(3, 1).into(),
2294 ];
2295 let actual_null_addrs: Vec<u64> = reloaded_idx
2296 .null_map
2297 .row_addrs()
2298 .unwrap()
2299 .map(u64::from)
2300 .collect();
2301 assert_eq!(
2302 actual_null_addrs, expected_null_addrs,
2303 "Null bitmap not remapped correctly"
2304 );
2305
2306 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
2308 let result = reloaded_idx
2309 .search(&query, &NoOpMetricsCollector)
2310 .await
2311 .unwrap();
2312 if let crate::scalar::SearchResult::Exact(row_ids) = result {
2313 let mut actual: Vec<u64> = row_ids
2314 .true_rows()
2315 .row_addrs()
2316 .unwrap()
2317 .map(u64::from)
2318 .collect();
2319 actual.sort();
2320 let expected: Vec<u64> = vec![
2321 RowAddress::new_from_parts(3, 2).into(),
2322 RowAddress::new_from_parts(3, 3).into(),
2323 ];
2324 assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
2325 }
2326
2327 let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
2329 let result = reloaded_idx
2330 .search(&query, &NoOpMetricsCollector)
2331 .await
2332 .unwrap();
2333 if let crate::scalar::SearchResult::Exact(row_ids) = result {
2334 let mut actual: Vec<u64> = row_ids
2335 .true_rows()
2336 .row_addrs()
2337 .unwrap()
2338 .map(u64::from)
2339 .collect();
2340 actual.sort();
2341 let expected: Vec<u64> = vec![
2342 RowAddress::new_from_parts(3, 4).into(),
2343 RowAddress::new_from_parts(3, 5).into(),
2344 ];
2345 assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
2346 }
2347
2348 let query = SargableQuery::IsNull();
2350 let result = reloaded_idx
2351 .search(&query, &NoOpMetricsCollector)
2352 .await
2353 .unwrap();
2354 if let crate::scalar::SearchResult::Exact(row_ids) = result {
2355 let mut actual: Vec<u64> = row_ids
2356 .true_rows()
2357 .row_addrs()
2358 .unwrap()
2359 .map(u64::from)
2360 .collect();
2361 actual.sort();
2362 assert_eq!(
2363 actual, expected_null_addrs,
2364 "Null search results not correct"
2365 );
2366 }
2367 }
2368
2369 #[tokio::test]
2370 async fn test_bitmap_null_handling_in_queries() {
2371 let tmpdir = TempObjDir::default();
2373 let store = Arc::new(LanceIndexStore::new(
2374 Arc::new(ObjectStore::local()),
2375 tmpdir.clone(),
2376 Arc::new(LanceCache::no_cache()),
2377 ));
2378
2379 let batch = record_batch!(
2381 ("value", Int64, [Some(0), Some(5), None]),
2382 ("_rowid", UInt64, [0, 1, 2])
2383 )
2384 .unwrap();
2385 let schema = batch.schema();
2386 let stream = stream::once(async move { Ok(batch) });
2387 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
2388
2389 BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
2391 .await
2392 .unwrap();
2393
2394 let cache = LanceCache::with_capacity(1024 * 1024);
2395 let index = BitmapIndex::load(store.clone(), None, &cache)
2396 .await
2397 .unwrap();
2398
2399 let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
2401 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2402
2403 match result {
2404 SearchResult::Exact(row_ids) => {
2405 let actual_rows: Vec<u64> = row_ids
2406 .true_rows()
2407 .row_addrs()
2408 .unwrap()
2409 .map(u64::from)
2410 .collect();
2411 assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
2412
2413 let null_row_ids = row_ids.null_rows();
2414 assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
2416 let null_rows: Vec<u64> =
2417 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
2418 assert_eq!(null_rows, vec![2], "Should report row 2 as null");
2419 }
2420 _ => panic!("Expected Exact search result"),
2421 }
2422
2423 let query = SargableQuery::IsNull();
2425 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2426
2427 match result {
2428 SearchResult::Exact(row_addrs) => {
2429 let actual_rows: Vec<u64> = row_addrs
2430 .true_rows()
2431 .row_addrs()
2432 .unwrap()
2433 .map(u64::from)
2434 .collect();
2435 assert_eq!(
2436 actual_rows,
2437 vec![2],
2438 "IsNull should find row 2 where value is null"
2439 );
2440
2441 let null_row_ids = row_addrs.null_rows();
2442 assert!(
2444 null_row_ids.is_empty(),
2445 "null_row_ids should be None for IsNull query"
2446 );
2447 }
2448 _ => panic!("Expected Exact search result"),
2449 }
2450
2451 let query = SargableQuery::Range(
2453 std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
2454 std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
2455 );
2456 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2457
2458 match result {
2459 SearchResult::Exact(row_addrs) => {
2460 let actual_rows: Vec<u64> = row_addrs
2461 .true_rows()
2462 .row_addrs()
2463 .unwrap()
2464 .map(u64::from)
2465 .collect();
2466 assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
2467
2468 let null_row_ids = row_addrs.null_rows();
2470 assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
2471 let null_rows: Vec<u64> =
2472 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
2473 assert_eq!(null_rows, vec![2], "Should report row 2 as null");
2474 }
2475 _ => panic!("Expected Exact search result"),
2476 }
2477 }
2478}