lance_index/scalar/
bitmap.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{BTreeMap, HashMap},
7    fmt::Debug,
8    ops::Bound,
9    sync::Arc,
10};
11
12use crate::pbold;
13use arrow::array::BinaryBuilder;
14use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array};
15use arrow_schema::{DataType, Field, Schema};
16use async_trait::async_trait;
17use datafusion::physical_plan::SendableRecordBatchStream;
18use datafusion_common::ScalarValue;
19use deepsize::DeepSizeOf;
20use futures::{stream, StreamExt, TryStreamExt};
21use lance_core::{
22    cache::{CacheKey, LanceCache, WeakLanceCache},
23    error::LanceOptionExt,
24    utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus},
25    Error, Result, ROW_ID,
26};
27use roaring::RoaringBitmap;
28use serde::Serialize;
29use snafu::location;
30use tracing::instrument;
31
32use super::{
33    btree::OrderableScalarValue, BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult,
34};
35use super::{AnyQuery, IndexStore, ScalarIndex};
36use crate::{
37    frag_reuse::FragReuseIndex,
38    scalar::{
39        expression::SargableQueryParser,
40        registry::{
41            DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
42            TrainingRequest, VALUE_COLUMN_NAME,
43        },
44        CreatedIndex, UpdateCriteria,
45    },
46};
47use crate::{metrics::MetricsCollector, Index, IndexType};
48use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader};
49
50pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
51
52const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom
53
54const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
55
56const BITMAP_INDEX_VERSION: u32 = 0;
57
58// We only need to open a file reader if we need to load a bitmap. If all
59// bitmaps are cached we don't open it. If we do open it we should only open it once.
60#[derive(Clone)]
61struct LazyIndexReader {
62    index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
63    store: Arc<dyn IndexStore>,
64}
65
66impl std::fmt::Debug for LazyIndexReader {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("LazyIndexReader")
69            .field("store", &self.store)
70            .finish()
71    }
72}
73
74impl LazyIndexReader {
75    fn new(store: Arc<dyn IndexStore>) -> Self {
76        Self {
77            index_reader: Arc::new(tokio::sync::Mutex::new(None)),
78            store,
79        }
80    }
81
82    async fn get(&self) -> Result<Arc<dyn IndexReader>> {
83        let mut reader = self.index_reader.lock().await;
84        if reader.is_none() {
85            let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
86            *reader = Some(index_reader);
87        }
88        Ok(reader.as_ref().unwrap().clone())
89    }
90}
91
92/// A scalar index that stores a bitmap for each possible value
93///
94/// This index works best for low-cardinality columns, where the number of unique values is small.
95/// The bitmap stores a list of row ids where the value is present.
96#[derive(Clone, Debug)]
97pub struct BitmapIndex {
98    /// Maps each unique value to its bitmap location in the index file
99    /// The usize value is the row offset in the bitmap_page_lookup.lance file
100    /// for quickly locating the row and reading it out
101    index_map: BTreeMap<OrderableScalarValue, usize>,
102
103    null_map: Arc<RowIdTreeMap>,
104
105    value_type: DataType,
106
107    store: Arc<dyn IndexStore>,
108
109    index_cache: WeakLanceCache,
110
111    frag_reuse_index: Option<Arc<FragReuseIndex>>,
112
113    lazy_reader: LazyIndexReader,
114}
115
116#[derive(Debug, Clone)]
117pub struct BitmapKey {
118    value: OrderableScalarValue,
119}
120
121impl CacheKey for BitmapKey {
122    type ValueType = RowIdTreeMap;
123
124    fn key(&self) -> std::borrow::Cow<'_, str> {
125        format!("{}", self.value.0).into()
126    }
127}
128
129impl BitmapIndex {
130    fn new(
131        index_map: BTreeMap<OrderableScalarValue, usize>,
132        null_map: Arc<RowIdTreeMap>,
133        value_type: DataType,
134        store: Arc<dyn IndexStore>,
135        index_cache: WeakLanceCache,
136        frag_reuse_index: Option<Arc<FragReuseIndex>>,
137    ) -> Self {
138        let lazy_reader = LazyIndexReader::new(store.clone());
139        Self {
140            index_map,
141            null_map,
142            value_type,
143            store,
144            index_cache,
145            frag_reuse_index,
146            lazy_reader,
147        }
148    }
149
150    pub(crate) async fn load(
151        store: Arc<dyn IndexStore>,
152        frag_reuse_index: Option<Arc<FragReuseIndex>>,
153        index_cache: &LanceCache,
154    ) -> Result<Arc<Self>> {
155        let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
156        let total_rows = page_lookup_file.num_rows();
157
158        if total_rows == 0 {
159            let schema = page_lookup_file.schema();
160            let data_type = schema.fields[0].data_type();
161            return Ok(Arc::new(Self::new(
162                BTreeMap::new(),
163                Arc::new(RowIdTreeMap::default()),
164                data_type,
165                store,
166                WeakLanceCache::from(index_cache),
167                frag_reuse_index,
168            )));
169        }
170
171        let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
172        let mut null_map = Arc::new(RowIdTreeMap::default());
173        let mut value_type: Option<DataType> = None;
174        let mut null_location: Option<usize> = None;
175        let mut row_offset = 0;
176
177        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
178            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
179            let chunk = page_lookup_file
180                .read_range(start_row..end_row, Some(&["keys"]))
181                .await?;
182
183            if chunk.num_rows() == 0 {
184                continue;
185            }
186
187            if value_type.is_none() {
188                value_type = Some(chunk.schema().field(0).data_type().clone());
189            }
190
191            let dict_keys = chunk.column(0);
192
193            for idx in 0..chunk.num_rows() {
194                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
195
196                if key.0.is_null() {
197                    null_location = Some(row_offset);
198                } else {
199                    index_map.insert(key, row_offset);
200                }
201
202                row_offset += 1;
203            }
204        }
205
206        if let Some(null_loc) = null_location {
207            let batch = page_lookup_file
208                .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
209                .await?;
210
211            let binary_bitmaps = batch
212                .column(0)
213                .as_any()
214                .downcast_ref::<BinaryArray>()
215                .ok_or_else(|| Error::Internal {
216                    message: "Invalid bitmap column type".to_string(),
217                    location: location!(),
218                })?;
219            let bitmap_bytes = binary_bitmaps.value(0);
220            let mut bitmap = RowIdTreeMap::deserialize_from(bitmap_bytes).unwrap();
221
222            // Apply fragment remapping if needed
223            if let Some(fri) = &frag_reuse_index {
224                bitmap = fri.remap_row_ids_tree_map(&bitmap);
225            }
226
227            null_map = Arc::new(bitmap);
228        }
229
230        let final_value_type = value_type.expect_ok()?;
231
232        Ok(Arc::new(Self::new(
233            index_map,
234            null_map,
235            final_value_type,
236            store,
237            WeakLanceCache::from(index_cache),
238            frag_reuse_index,
239        )))
240    }
241
242    async fn load_bitmap(
243        &self,
244        key: &OrderableScalarValue,
245        metrics: Option<&dyn MetricsCollector>,
246    ) -> Result<Arc<RowIdTreeMap>> {
247        if key.0.is_null() {
248            return Ok(self.null_map.clone());
249        }
250
251        let cache_key = BitmapKey { value: key.clone() };
252
253        if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
254            return Ok(cached);
255        }
256
257        // Record that we're loading a partition from disk
258        if let Some(metrics) = metrics {
259            metrics.record_part_load();
260        }
261
262        let row_offset = match self.index_map.get(key) {
263            Some(loc) => *loc,
264            None => return Ok(Arc::new(RowIdTreeMap::default())),
265        };
266
267        let page_lookup_file = self.lazy_reader.get().await?;
268        let batch = page_lookup_file
269            .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
270            .await?;
271
272        let binary_bitmaps = batch
273            .column(0)
274            .as_any()
275            .downcast_ref::<BinaryArray>()
276            .ok_or_else(|| Error::Internal {
277                message: "Invalid bitmap column type".to_string(),
278                location: location!(),
279            })?;
280        let bitmap_bytes = binary_bitmaps.value(0); // First (and only) row
281        let mut bitmap = RowIdTreeMap::deserialize_from(bitmap_bytes).unwrap();
282
283        if let Some(fri) = &self.frag_reuse_index {
284            bitmap = fri.remap_row_ids_tree_map(&bitmap);
285        }
286
287        self.index_cache
288            .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
289            .await;
290
291        Ok(Arc::new(bitmap))
292    }
293}
294
295impl DeepSizeOf for BitmapIndex {
296    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
297        let mut total_size = 0;
298
299        total_size += self.index_map.deep_size_of_children(context);
300        total_size += self.store.deep_size_of_children(context);
301
302        total_size
303    }
304}
305
306#[derive(Serialize)]
307struct BitmapStatistics {
308    num_bitmaps: usize,
309}
310
311#[async_trait]
312impl Index for BitmapIndex {
313    fn as_any(&self) -> &dyn Any {
314        self
315    }
316
317    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
318        self
319    }
320
321    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
322        Err(Error::NotSupported {
323            source: "BitmapIndex is not a vector index".into(),
324            location: location!(),
325        })
326    }
327
328    async fn prewarm(&self) -> Result<()> {
329        let page_lookup_file = self.lazy_reader.get().await?;
330        let total_rows = page_lookup_file.num_rows();
331
332        if total_rows == 0 {
333            return Ok(());
334        }
335
336        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
337            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
338            let chunk = page_lookup_file
339                .read_range(start_row..end_row, None)
340                .await?;
341
342            if chunk.num_rows() == 0 {
343                continue;
344            }
345
346            let dict_keys = chunk.column(0);
347            let binary_bitmaps = chunk.column(1);
348            let bitmap_binary_array = binary_bitmaps
349                .as_any()
350                .downcast_ref::<BinaryArray>()
351                .unwrap();
352
353            for idx in 0..chunk.num_rows() {
354                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
355
356                if key.0.is_null() {
357                    continue;
358                }
359
360                let bitmap_bytes = bitmap_binary_array.value(idx);
361                let mut bitmap = RowIdTreeMap::deserialize_from(bitmap_bytes).unwrap();
362
363                if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
364                    bitmap = frag_reuse_index_ref.remap_row_ids_tree_map(&bitmap);
365                }
366
367                let cache_key = BitmapKey { value: key };
368                self.index_cache
369                    .insert_with_key(&cache_key, Arc::new(bitmap))
370                    .await;
371            }
372        }
373
374        Ok(())
375    }
376
377    fn index_type(&self) -> IndexType {
378        IndexType::Bitmap
379    }
380
381    fn statistics(&self) -> Result<serde_json::Value> {
382        let stats = BitmapStatistics {
383            num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
384        };
385        serde_json::to_value(stats).map_err(|e| Error::Internal {
386            message: format!("failed to serialize bitmap index statistics: {}", e),
387            location: location!(),
388        })
389    }
390
391    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
392        unimplemented!()
393    }
394}
395
396#[async_trait]
397impl ScalarIndex for BitmapIndex {
398    #[instrument(name = "bitmap_search", level = "debug", skip_all)]
399    async fn search(
400        &self,
401        query: &dyn AnyQuery,
402        metrics: &dyn MetricsCollector,
403    ) -> Result<SearchResult> {
404        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
405
406        let row_ids = match query {
407            SargableQuery::Equals(val) => {
408                metrics.record_comparisons(1);
409                if val.is_null() {
410                    (*self.null_map).clone()
411                } else {
412                    let key = OrderableScalarValue(val.clone());
413                    let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
414                    (*bitmap).clone()
415                }
416            }
417            SargableQuery::Range(start, end) => {
418                let range_start = match start {
419                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
420                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
421                    Bound::Unbounded => Bound::Unbounded,
422                };
423
424                let range_end = match end {
425                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
426                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
427                    Bound::Unbounded => Bound::Unbounded,
428                };
429
430                let keys: Vec<_> = self
431                    .index_map
432                    .range((range_start, range_end))
433                    .map(|(k, _v)| k.clone())
434                    .collect();
435
436                metrics.record_comparisons(keys.len());
437
438                if keys.is_empty() {
439                    RowIdTreeMap::default()
440                } else {
441                    let bitmaps: Vec<_> = stream::iter(
442                        keys.into_iter()
443                            .map(|key| async move { self.load_bitmap(&key, None).await }),
444                    )
445                    .buffer_unordered(get_num_compute_intensive_cpus())
446                    .try_collect()
447                    .await?;
448
449                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
450                    RowIdTreeMap::union_all(&bitmap_refs)
451                }
452            }
453            SargableQuery::IsIn(values) => {
454                metrics.record_comparisons(values.len());
455
456                // Collect keys that exist in the index, tracking if we need nulls
457                let mut has_null = false;
458                let keys: Vec<_> = values
459                    .iter()
460                    .filter_map(|val| {
461                        if val.is_null() {
462                            has_null = true;
463                            None
464                        } else {
465                            let key = OrderableScalarValue(val.clone());
466                            if self.index_map.contains_key(&key) {
467                                Some(key)
468                            } else {
469                                None
470                            }
471                        }
472                    })
473                    .collect();
474
475                if keys.is_empty() && (!has_null || self.null_map.is_empty()) {
476                    RowIdTreeMap::default()
477                } else {
478                    // Load bitmaps in parallel
479                    let mut bitmaps: Vec<_> = stream::iter(
480                        keys.into_iter()
481                            .map(|key| async move { self.load_bitmap(&key, None).await }),
482                    )
483                    .buffer_unordered(get_num_compute_intensive_cpus())
484                    .try_collect()
485                    .await?;
486
487                    // Add null bitmap if needed
488                    if has_null && !self.null_map.is_empty() {
489                        bitmaps.push(self.null_map.clone());
490                    }
491
492                    if bitmaps.is_empty() {
493                        RowIdTreeMap::default()
494                    } else {
495                        // Convert Arc<RowIdTreeMap> to &RowIdTreeMap for union_all
496                        let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
497                        RowIdTreeMap::union_all(&bitmap_refs)
498                    }
499                }
500            }
501            SargableQuery::IsNull() => {
502                metrics.record_comparisons(1);
503                (*self.null_map).clone()
504            }
505            SargableQuery::FullTextSearch(_) => {
506                return Err(Error::NotSupported {
507                    source: "full text search is not supported for bitmap indexes".into(),
508                    location: location!(),
509                });
510            }
511        };
512
513        Ok(SearchResult::Exact(row_ids))
514    }
515
516    fn can_remap(&self) -> bool {
517        true
518    }
519
520    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
521    async fn remap(
522        &self,
523        mapping: &HashMap<u64, Option<u64>>,
524        dest_store: &dyn IndexStore,
525    ) -> Result<CreatedIndex> {
526        let mut state = HashMap::new();
527
528        for key in self.index_map.keys() {
529            let bitmap = self.load_bitmap(key, None).await?;
530            let remapped_bitmap =
531                RowIdTreeMap::from_iter(bitmap.row_ids().unwrap().filter_map(|addr| {
532                    let addr_as_u64 = u64::from(addr);
533                    mapping
534                        .get(&addr_as_u64)
535                        .copied()
536                        .unwrap_or(Some(addr_as_u64))
537                }));
538            state.insert(key.0.clone(), remapped_bitmap);
539        }
540
541        if !self.null_map.is_empty() {
542            let remapped_null =
543                RowIdTreeMap::from_iter(self.null_map.row_ids().unwrap().filter_map(|addr| {
544                    let addr_as_u64 = u64::from(addr);
545                    mapping
546                        .get(&addr_as_u64)
547                        .copied()
548                        .unwrap_or(Some(addr_as_u64))
549                }));
550            state.insert(ScalarValue::try_from(&self.value_type)?, remapped_null);
551        }
552
553        BitmapIndexPlugin::write_bitmap_index(state, dest_store, &self.value_type).await?;
554
555        Ok(CreatedIndex {
556            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
557                .unwrap(),
558            index_version: BITMAP_INDEX_VERSION,
559        })
560    }
561
562    /// Add the new data into the index, creating an updated version of the index in `dest_store`
563    async fn update(
564        &self,
565        new_data: SendableRecordBatchStream,
566        dest_store: &dyn IndexStore,
567    ) -> Result<CreatedIndex> {
568        let mut state = HashMap::new();
569
570        // Initialize builder with existing data
571        for key in self.index_map.keys() {
572            let bitmap = self.load_bitmap(key, None).await?;
573            state.insert(key.0.clone(), (*bitmap).clone());
574        }
575
576        if !self.null_map.is_empty() {
577            let ex_null = new_null_array(&self.value_type, 1);
578            let ex_null = ScalarValue::try_from_array(ex_null.as_ref(), 0)?;
579            state.insert(ex_null, (*self.null_map).clone());
580        }
581
582        BitmapIndexPlugin::do_train_bitmap_index(new_data, state, dest_store).await?;
583
584        Ok(CreatedIndex {
585            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
586                .unwrap(),
587            index_version: BITMAP_INDEX_VERSION,
588        })
589    }
590
591    fn update_criteria(&self) -> UpdateCriteria {
592        UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None).with_row_id())
593    }
594
595    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
596        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
597    }
598}
599
600#[derive(Debug, Default)]
601pub struct BitmapIndexPlugin;
602
603impl BitmapIndexPlugin {
604    fn get_batch_from_arrays(
605        keys: Arc<dyn Array>,
606        binary_bitmaps: Arc<dyn Array>,
607    ) -> Result<RecordBatch> {
608        let schema = Arc::new(Schema::new(vec![
609            Field::new("keys", keys.data_type().clone(), true),
610            Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
611        ]));
612
613        let columns = vec![keys, binary_bitmaps];
614
615        Ok(RecordBatch::try_new(schema, columns)?)
616    }
617
618    async fn write_bitmap_index(
619        state: HashMap<ScalarValue, RowIdTreeMap>,
620        index_store: &dyn IndexStore,
621        value_type: &DataType,
622    ) -> Result<()> {
623        let schema = Arc::new(Schema::new(vec![
624            Field::new("keys", value_type.clone(), true),
625            Field::new("bitmaps", DataType::Binary, true),
626        ]));
627
628        let mut bitmap_index_file = index_store
629            .new_index_file(BITMAP_LOOKUP_NAME, schema)
630            .await?;
631
632        let mut cur_keys = Vec::new();
633        let mut cur_bitmaps = Vec::new();
634        let mut cur_bytes = 0;
635
636        for (key, bitmap) in state.into_iter() {
637            let mut bytes = Vec::new();
638            bitmap.serialize_into(&mut bytes).unwrap();
639            let bitmap_size = bytes.len();
640
641            if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
642                let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
643                let mut binary_builder = BinaryBuilder::new();
644                for b in &cur_bitmaps {
645                    binary_builder.append_value(b);
646                }
647                let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
648
649                let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
650                bitmap_index_file.write_record_batch(record_batch).await?;
651
652                cur_keys.clear();
653                cur_bitmaps.clear();
654                cur_bytes = 0;
655            }
656
657            cur_keys.push(key);
658            cur_bitmaps.push(bytes);
659            cur_bytes += bitmap_size;
660        }
661
662        // Flush any remaining
663        if !cur_keys.is_empty() {
664            let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
665            let mut binary_builder = BinaryBuilder::new();
666            for b in &cur_bitmaps {
667                binary_builder.append_value(b);
668            }
669            let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
670
671            let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
672            bitmap_index_file.write_record_batch(record_batch).await?;
673        }
674
675        // Finish file once at the end - this creates the file even if we wrote no batches
676        bitmap_index_file.finish().await?;
677
678        Ok(())
679    }
680
681    async fn do_train_bitmap_index(
682        mut data_source: SendableRecordBatchStream,
683        mut state: HashMap<ScalarValue, RowIdTreeMap>,
684        index_store: &dyn IndexStore,
685    ) -> Result<()> {
686        let value_type = data_source.schema().field(0).data_type().clone();
687        while let Some(batch) = data_source.try_next().await? {
688            let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
689            let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
690            debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
691
692            let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
693
694            for i in 0..values.len() {
695                let row_id = row_id_column.value(i);
696                let key = ScalarValue::try_from_array(values.as_ref(), i)?;
697                state.entry(key.clone()).or_default().insert(row_id);
698            }
699        }
700
701        Self::write_bitmap_index(state, index_store, &value_type).await
702    }
703
704    pub async fn train_bitmap_index(
705        data: SendableRecordBatchStream,
706        index_store: &dyn IndexStore,
707    ) -> Result<()> {
708        // mapping from item to list of the row ids where it is present
709        let dictionary: HashMap<ScalarValue, RowIdTreeMap> = HashMap::new();
710
711        Self::do_train_bitmap_index(data, dictionary, index_store).await
712    }
713}
714
715#[async_trait]
716impl ScalarIndexPlugin for BitmapIndexPlugin {
717    fn name(&self) -> &str {
718        "Bitmap"
719    }
720
721    fn new_training_request(
722        &self,
723        _params: &str,
724        field: &Field,
725    ) -> Result<Box<dyn TrainingRequest>> {
726        if field.data_type().is_nested() {
727            return Err(Error::InvalidInput {
728                source: "A bitmap index can only be created on a non-nested field.".into(),
729                location: location!(),
730            });
731        }
732        Ok(Box::new(DefaultTrainingRequest::new(
733            TrainingCriteria::new(TrainingOrdering::None).with_row_id(),
734        )))
735    }
736
737    fn provides_exact_answer(&self) -> bool {
738        true
739    }
740
741    fn version(&self) -> u32 {
742        BITMAP_INDEX_VERSION
743    }
744
745    fn new_query_parser(
746        &self,
747        index_name: String,
748        _index_details: &prost_types::Any,
749    ) -> Option<Box<dyn ScalarQueryParser>> {
750        Some(Box::new(SargableQueryParser::new(index_name, false)))
751    }
752
753    async fn train_index(
754        &self,
755        data: SendableRecordBatchStream,
756        index_store: &dyn IndexStore,
757        _request: Box<dyn TrainingRequest>,
758        fragment_ids: Option<Vec<u32>>,
759    ) -> Result<CreatedIndex> {
760        if fragment_ids.is_some() {
761            return Err(Error::InvalidInput {
762                source: "Bitmap index does not support fragment training".into(),
763                location: location!(),
764            });
765        }
766
767        Self::train_bitmap_index(data, index_store).await?;
768        Ok(CreatedIndex {
769            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
770                .unwrap(),
771            index_version: BITMAP_INDEX_VERSION,
772        })
773    }
774
775    /// Load an index from storage
776    async fn load_index(
777        &self,
778        index_store: Arc<dyn IndexStore>,
779        _index_details: &prost_types::Any,
780        frag_reuse_index: Option<Arc<FragReuseIndex>>,
781        cache: &LanceCache,
782    ) -> Result<Arc<dyn ScalarIndex>> {
783        Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
784    }
785}
786
787#[cfg(test)]
788pub mod tests {
789    use super::*;
790    use crate::metrics::NoOpMetricsCollector;
791    use crate::scalar::lance_format::LanceIndexStore;
792    use arrow_array::{RecordBatch, StringArray, UInt64Array};
793    use arrow_schema::{Field, Schema};
794    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
795    use futures::stream;
796    use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
797    use lance_io::object_store::ObjectStore;
798
799    #[tokio::test]
800    async fn test_bitmap_lazy_loading_and_cache() {
801        // Create a temporary directory for the index
802        let tmpdir = TempObjDir::default();
803        let store = Arc::new(LanceIndexStore::new(
804            Arc::new(ObjectStore::local()),
805            tmpdir.clone(),
806            Arc::new(LanceCache::no_cache()),
807        ));
808
809        // Create test data with low cardinality column
810        let colors = vec![
811            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
812            "red", "red", "blue", "green", "yellow",
813        ];
814
815        let row_ids = (0u64..15u64).collect::<Vec<_>>();
816
817        let schema = Arc::new(Schema::new(vec![
818            Field::new("value", DataType::Utf8, false),
819            Field::new("_rowid", DataType::UInt64, false),
820        ]));
821
822        let batch = RecordBatch::try_new(
823            schema.clone(),
824            vec![
825                Arc::new(StringArray::from(colors.clone())),
826                Arc::new(UInt64Array::from(row_ids.clone())),
827            ],
828        )
829        .unwrap();
830
831        let stream = stream::once(async move { Ok(batch) });
832        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
833
834        // Train and write the bitmap index
835        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
836            .await
837            .unwrap();
838
839        // Create a cache with limited capacity
840        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
841
842        // Load the index (should only load metadata, not bitmaps)
843        let index = BitmapIndex::load(store.clone(), None, &cache)
844            .await
845            .unwrap();
846
847        assert_eq!(index.index_map.len(), 4); // 4 non-null unique values (red, blue, green, yellow)
848        assert!(index.null_map.is_empty()); // No nulls in test data
849
850        // Test 1: Search for "red"
851        let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
852        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
853
854        // Verify results
855        let expected_red_rows = vec![0u64, 3, 6, 10, 11];
856        if let SearchResult::Exact(row_ids) = result {
857            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
858            actual.sort();
859            assert_eq!(actual, expected_red_rows);
860        } else {
861            panic!("Expected exact search result");
862        }
863
864        // Test 2: Search for "red" again - should hit cache
865        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
866        if let SearchResult::Exact(row_ids) = result {
867            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
868            actual.sort();
869            assert_eq!(actual, expected_red_rows);
870        }
871
872        // Test 3: Range query
873        let query = SargableQuery::Range(
874            std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
875            std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
876        );
877        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
878
879        let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
880        if let SearchResult::Exact(row_ids) = result {
881            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
882            actual.sort();
883            assert_eq!(actual, expected_range_rows);
884        }
885
886        // Test 4: IsIn query
887        let query = SargableQuery::IsIn(vec![
888            ScalarValue::Utf8(Some("red".to_string())),
889            ScalarValue::Utf8(Some("yellow".to_string())),
890        ]);
891        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
892
893        let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
894        if let SearchResult::Exact(row_ids) = result {
895            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(|id| id.into()).collect();
896            actual.sort();
897            assert_eq!(actual, expected_in_rows);
898        }
899    }
900
901    #[tokio::test]
902    #[ignore]
903    async fn test_big_bitmap_index() {
904        // WARNING: This test allocates a huge state to force overflow over int32 on BinaryArray
905        // You must run it only on a machine with enough resources (or skip it normally).
906        use super::{BitmapIndex, BITMAP_LOOKUP_NAME};
907        use crate::scalar::lance_format::LanceIndexStore;
908        use crate::scalar::IndexStore;
909        use arrow_schema::DataType;
910        use datafusion_common::ScalarValue;
911        use lance_core::cache::LanceCache;
912        use lance_core::utils::mask::RowIdTreeMap;
913        use lance_io::object_store::ObjectStore;
914        use std::collections::HashMap;
915        use std::sync::Arc;
916
917        // Adjust these numbers so that:
918        //     m * (serialized size per bitmap) > 2^31 bytes.
919        //
920        // For example, if we assume each bitmap serializes to ~1000 bytes,
921        // you need m > 2.1e6.
922        let m: u32 = 2_500_000;
923        let per_bitmap_size = 1000; // assumed bytes per bitmap
924
925        let mut state = HashMap::new();
926        for i in 0..m {
927            // Create a bitmap that contains, say, 1000 row IDs.
928            let bitmap = RowIdTreeMap::from_iter(0..per_bitmap_size);
929
930            let key = ScalarValue::UInt32(Some(i));
931            state.insert(key, bitmap);
932        }
933
934        // Create a temporary store.
935        let tmpdir = TempObjDir::default();
936        let test_store = LanceIndexStore::new(
937            Arc::new(ObjectStore::local()),
938            tmpdir.clone(),
939            Arc::new(LanceCache::no_cache()),
940        );
941
942        // This call should never trigger a "byte array offset overflow" error since now the code supports
943        // read by chunks
944        let result =
945            BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
946
947        assert!(
948            result.is_ok(),
949            "Failed to write bitmap index: {:?}",
950            result.err()
951        );
952
953        // Verify the index file exists
954        let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
955        assert!(
956            index_file.is_ok(),
957            "Failed to open index file: {:?}",
958            index_file.err()
959        );
960        let index_file = index_file.unwrap();
961
962        // Print stats about the index file
963        tracing::info!(
964            "Index file contains {} rows in total",
965            index_file.num_rows()
966        );
967
968        // Load the index using BitmapIndex::load
969        tracing::info!("Loading index from disk...");
970        let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
971            .await
972            .expect("Failed to load bitmap index");
973
974        // Verify the loaded index has the correct number of entries
975        assert_eq!(
976            loaded_index.index_map.len(),
977            m as usize,
978            "Loaded index has incorrect number of keys (expected {}, got {})",
979            m,
980            loaded_index.index_map.len()
981        );
982
983        // Manually verify specific keys without using search()
984        let test_keys = [0, m / 2, m - 1]; // Beginning, middle, and end
985        for &key_val in &test_keys {
986            let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
987            // Load the bitmap for this key
988            let bitmap = loaded_index
989                .load_bitmap(&key, None)
990                .await
991                .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
992
993            // Convert RowIdTreeMap to a vector for easier assertion
994            let row_ids: Vec<u64> = bitmap.row_ids().unwrap().map(u64::from).collect();
995
996            // Verify length
997            assert_eq!(
998                row_ids.len(),
999                per_bitmap_size as usize,
1000                "Bitmap for key {} has wrong size",
1001                key_val
1002            );
1003
1004            // Verify first few and last few elements
1005            for i in 0..5.min(per_bitmap_size) {
1006                assert!(
1007                    row_ids.contains(&i),
1008                    "Bitmap for key {} should contain row_id {}",
1009                    key_val,
1010                    i
1011                );
1012            }
1013
1014            for i in (per_bitmap_size - 5)..per_bitmap_size {
1015                assert!(
1016                    row_ids.contains(&i),
1017                    "Bitmap for key {} should contain row_id {}",
1018                    key_val,
1019                    i
1020                );
1021            }
1022
1023            // Verify exact range
1024            let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
1025            assert_eq!(
1026                row_ids, expected_range,
1027                "Bitmap for key {} doesn't contain expected values",
1028                key_val
1029            );
1030
1031            tracing::info!(
1032                "✓ Verified bitmap for key {}: {} rows as expected",
1033                key_val,
1034                row_ids.len()
1035            );
1036        }
1037
1038        tracing::info!("Test successful! Index properly contains {} keys", m);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_bitmap_prewarm() {
1043        // Create a temporary directory for the index
1044        let tmpdir = TempObjDir::default();
1045        let store = Arc::new(LanceIndexStore::new(
1046            Arc::new(ObjectStore::local()),
1047            tmpdir.clone(),
1048            Arc::new(LanceCache::no_cache()),
1049        ));
1050
1051        // Create test data with low cardinality
1052        let colors = vec![
1053            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1054            "red", "red", "blue", "green", "yellow",
1055        ];
1056
1057        let row_ids = (0u64..15u64).collect::<Vec<_>>();
1058
1059        let schema = Arc::new(Schema::new(vec![
1060            Field::new("value", DataType::Utf8, false),
1061            Field::new("_rowid", DataType::UInt64, false),
1062        ]));
1063
1064        let batch = RecordBatch::try_new(
1065            schema.clone(),
1066            vec![
1067                Arc::new(StringArray::from(colors.clone())),
1068                Arc::new(UInt64Array::from(row_ids.clone())),
1069            ],
1070        )
1071        .unwrap();
1072
1073        let stream = stream::once(async move { Ok(batch) });
1074        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1075
1076        // Train and write the bitmap index
1077        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1078            .await
1079            .unwrap();
1080
1081        // Create a cache with metrics tracking
1082        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
1083
1084        // Load the index (should only load metadata, not bitmaps)
1085        let index = BitmapIndex::load(store.clone(), None, &cache)
1086            .await
1087            .unwrap();
1088
1089        // Verify no bitmaps are cached yet
1090        let cache_key_red = BitmapKey {
1091            value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
1092        };
1093        let cache_key_blue = BitmapKey {
1094            value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
1095        };
1096
1097        assert!(cache
1098            .get_with_key::<BitmapKey>(&cache_key_red)
1099            .await
1100            .is_none());
1101        assert!(cache
1102            .get_with_key::<BitmapKey>(&cache_key_blue)
1103            .await
1104            .is_none());
1105
1106        // Call prewarm
1107        index.prewarm().await.unwrap();
1108
1109        // Verify all bitmaps are now cached
1110        assert!(cache
1111            .get_with_key::<BitmapKey>(&cache_key_red)
1112            .await
1113            .is_some());
1114        assert!(cache
1115            .get_with_key::<BitmapKey>(&cache_key_blue)
1116            .await
1117            .is_some());
1118
1119        // Verify cached bitmaps have correct content
1120        let cached_red = cache
1121            .get_with_key::<BitmapKey>(&cache_key_red)
1122            .await
1123            .unwrap();
1124        let red_rows: Vec<u64> = cached_red.row_ids().unwrap().map(u64::from).collect();
1125        assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
1126
1127        // Call prewarm again - should be idempotent
1128        index.prewarm().await.unwrap();
1129
1130        // Verify cache still contains the same items
1131        let cached_red_2 = cache
1132            .get_with_key::<BitmapKey>(&cache_key_red)
1133            .await
1134            .unwrap();
1135        let red_rows_2: Vec<u64> = cached_red_2.row_ids().unwrap().map(u64::from).collect();
1136        assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
1137    }
1138
1139    #[tokio::test]
1140    async fn test_remap_bitmap_with_null() {
1141        use arrow_array::UInt32Array;
1142
1143        // Create a temporary store.
1144        let tmpdir = TempObjDir::default();
1145        let test_store = Arc::new(LanceIndexStore::new(
1146            Arc::new(ObjectStore::local()),
1147            tmpdir.clone(),
1148            Arc::new(LanceCache::no_cache()),
1149        ));
1150
1151        // Create test data that simulates:
1152        // frag 1 - { 0: null, 1: null, 2: 1 }
1153        // frag 2 - { 0: 1, 1: 2, 2: 2 }
1154        // We'll create this data with specific row addresses
1155        let values = vec![
1156            None,       // row 0: null (will be at address (1,0))
1157            None,       // row 1: null (will be at address (1,1))
1158            Some(1u32), // row 2: 1    (will be at address (1,2))
1159            Some(1u32), // row 3: 1    (will be at address (2,0))
1160            Some(2u32), // row 4: 2    (will be at address (2,1))
1161            Some(2u32), // row 5: 2    (will be at address (2,2))
1162        ];
1163
1164        // Create row IDs with specific fragment addresses
1165        let row_ids: Vec<u64> = vec![
1166            RowAddress::new_from_parts(1, 0).into(),
1167            RowAddress::new_from_parts(1, 1).into(),
1168            RowAddress::new_from_parts(1, 2).into(),
1169            RowAddress::new_from_parts(2, 0).into(),
1170            RowAddress::new_from_parts(2, 1).into(),
1171            RowAddress::new_from_parts(2, 2).into(),
1172        ];
1173
1174        let schema = Arc::new(Schema::new(vec![
1175            Field::new("value", DataType::UInt32, true),
1176            Field::new("_rowid", DataType::UInt64, false),
1177        ]));
1178
1179        let batch = RecordBatch::try_new(
1180            schema.clone(),
1181            vec![
1182                Arc::new(UInt32Array::from(values)),
1183                Arc::new(UInt64Array::from(row_ids)),
1184            ],
1185        )
1186        .unwrap();
1187
1188        let stream = stream::once(async move { Ok(batch) });
1189        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1190
1191        // Create the bitmap index
1192        BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
1193            .await
1194            .unwrap();
1195
1196        // Load the index
1197        let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1198            .await
1199            .expect("Failed to load bitmap index");
1200
1201        // Verify initial state
1202        assert_eq!(index.index_map.len(), 2); // 2 non-null values (1 and 2)
1203        assert!(!index.null_map.is_empty()); // Should have null values
1204
1205        // Create a remap that simulates compaction of frags 1 and 2 into frag 3
1206        let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
1207        row_addr_map.insert(
1208            RowAddress::new_from_parts(1, 0).into(),
1209            Some(RowAddress::new_from_parts(3, 0).into()),
1210        );
1211        row_addr_map.insert(
1212            RowAddress::new_from_parts(1, 1).into(),
1213            Some(RowAddress::new_from_parts(3, 1).into()),
1214        );
1215        row_addr_map.insert(
1216            RowAddress::new_from_parts(1, 2).into(),
1217            Some(RowAddress::new_from_parts(3, 2).into()),
1218        );
1219        row_addr_map.insert(
1220            RowAddress::new_from_parts(2, 0).into(),
1221            Some(RowAddress::new_from_parts(3, 3).into()),
1222        );
1223        row_addr_map.insert(
1224            RowAddress::new_from_parts(2, 1).into(),
1225            Some(RowAddress::new_from_parts(3, 4).into()),
1226        );
1227        row_addr_map.insert(
1228            RowAddress::new_from_parts(2, 2).into(),
1229            Some(RowAddress::new_from_parts(3, 5).into()),
1230        );
1231
1232        // Perform remap
1233        index
1234            .remap(&row_addr_map, test_store.as_ref())
1235            .await
1236            .unwrap();
1237
1238        // Reload and check
1239        let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
1240            .await
1241            .expect("Failed to load remapped bitmap index");
1242
1243        // Verify the null bitmap was remapped correctly
1244        let expected_null_addrs: Vec<u64> = vec![
1245            RowAddress::new_from_parts(3, 0).into(),
1246            RowAddress::new_from_parts(3, 1).into(),
1247        ];
1248        let actual_null_addrs: Vec<u64> = reloaded_idx
1249            .null_map
1250            .row_ids()
1251            .unwrap()
1252            .map(u64::from)
1253            .collect();
1254        assert_eq!(
1255            actual_null_addrs, expected_null_addrs,
1256            "Null bitmap not remapped correctly"
1257        );
1258
1259        // Search for value 1 and verify remapped addresses
1260        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
1261        let result = reloaded_idx
1262            .search(&query, &NoOpMetricsCollector)
1263            .await
1264            .unwrap();
1265        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1266            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(u64::from).collect();
1267            actual.sort();
1268            let expected: Vec<u64> = vec![
1269                RowAddress::new_from_parts(3, 2).into(),
1270                RowAddress::new_from_parts(3, 3).into(),
1271            ];
1272            assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
1273        }
1274
1275        // Search for value 2 and verify remapped addresses
1276        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
1277        let result = reloaded_idx
1278            .search(&query, &NoOpMetricsCollector)
1279            .await
1280            .unwrap();
1281        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1282            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(u64::from).collect();
1283            actual.sort();
1284            let expected: Vec<u64> = vec![
1285                RowAddress::new_from_parts(3, 4).into(),
1286                RowAddress::new_from_parts(3, 5).into(),
1287            ];
1288            assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
1289        }
1290
1291        // Search for null values
1292        let query = SargableQuery::IsNull();
1293        let result = reloaded_idx
1294            .search(&query, &NoOpMetricsCollector)
1295            .await
1296            .unwrap();
1297        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1298            let mut actual: Vec<u64> = row_ids.row_ids().unwrap().map(u64::from).collect();
1299            actual.sort();
1300            assert_eq!(
1301                actual, expected_null_addrs,
1302                "Null search results not correct"
1303            );
1304        }
1305    }
1306}