Skip to main content

lance_index/scalar/
bitmap.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{BTreeMap, HashMap},
7    fmt::Debug,
8    ops::Bound,
9    sync::Arc,
10};
11
12use arrow::array::BinaryBuilder;
13use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array};
14use arrow_schema::{DataType, Field, Schema};
15use async_trait::async_trait;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::ScalarValue;
18use deepsize::DeepSizeOf;
19use futures::{stream, StreamExt, TryStreamExt};
20use lance_core::utils::mask::RowSetOps;
21use lance_core::{
22    cache::{CacheKey, LanceCache, WeakLanceCache},
23    error::LanceOptionExt,
24    utils::{
25        mask::{NullableRowAddrSet, RowAddrTreeMap},
26        tokio::get_num_compute_intensive_cpus,
27    },
28    Error, Result, ROW_ID,
29};
30use roaring::RoaringBitmap;
31use serde::Serialize;
32use snafu::location;
33use tracing::instrument;
34
35use super::{
36    btree::OrderableScalarValue, BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult,
37};
38use super::{AnyQuery, IndexStore, ScalarIndex};
39use crate::pbold;
40use crate::{
41    frag_reuse::FragReuseIndex,
42    scalar::{
43        expression::SargableQueryParser,
44        registry::{
45            DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
46            TrainingRequest, VALUE_COLUMN_NAME,
47        },
48        CreatedIndex, UpdateCriteria,
49    },
50};
51use crate::{metrics::MetricsCollector, Index, IndexType};
52use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader};
53
54pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
55pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
56
57const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom
58
59const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
60
61const BITMAP_INDEX_VERSION: u32 = 0;
62
63// We only need to open a file reader if we need to load a bitmap. If all
64// bitmaps are cached we don't open it. If we do open it we should only open it once.
65#[derive(Clone)]
66struct LazyIndexReader {
67    index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
68    store: Arc<dyn IndexStore>,
69}
70
71impl std::fmt::Debug for LazyIndexReader {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("LazyIndexReader")
74            .field("store", &self.store)
75            .finish()
76    }
77}
78
79impl LazyIndexReader {
80    fn new(store: Arc<dyn IndexStore>) -> Self {
81        Self {
82            index_reader: Arc::new(tokio::sync::Mutex::new(None)),
83            store,
84        }
85    }
86
87    async fn get(&self) -> Result<Arc<dyn IndexReader>> {
88        let mut reader = self.index_reader.lock().await;
89        if reader.is_none() {
90            let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
91            *reader = Some(index_reader);
92        }
93        Ok(reader.as_ref().unwrap().clone())
94    }
95}
96
97/// A scalar index that stores a bitmap for each possible value
98///
99/// This index works best for low-cardinality columns, where the number of unique values is small.
100/// The bitmap stores a list of row ids where the value is present.
101#[derive(Clone, Debug)]
102pub struct BitmapIndex {
103    /// Maps each unique value to its bitmap location in the index file
104    /// The usize value is the row offset in the bitmap_page_lookup.lance file
105    /// for quickly locating the row and reading it out
106    index_map: BTreeMap<OrderableScalarValue, usize>,
107
108    null_map: Arc<RowAddrTreeMap>,
109
110    value_type: DataType,
111
112    store: Arc<dyn IndexStore>,
113
114    index_cache: WeakLanceCache,
115
116    frag_reuse_index: Option<Arc<FragReuseIndex>>,
117
118    lazy_reader: LazyIndexReader,
119}
120
121#[derive(Debug, Clone)]
122pub struct BitmapKey {
123    value: OrderableScalarValue,
124}
125
126impl CacheKey for BitmapKey {
127    type ValueType = RowAddrTreeMap;
128
129    fn key(&self) -> std::borrow::Cow<'_, str> {
130        format!("{}", self.value.0).into()
131    }
132}
133
134impl BitmapIndex {
135    fn new(
136        index_map: BTreeMap<OrderableScalarValue, usize>,
137        null_map: Arc<RowAddrTreeMap>,
138        value_type: DataType,
139        store: Arc<dyn IndexStore>,
140        index_cache: WeakLanceCache,
141        frag_reuse_index: Option<Arc<FragReuseIndex>>,
142    ) -> Self {
143        let lazy_reader = LazyIndexReader::new(store.clone());
144        Self {
145            index_map,
146            null_map,
147            value_type,
148            store,
149            index_cache,
150            frag_reuse_index,
151            lazy_reader,
152        }
153    }
154
155    pub(crate) async fn load(
156        store: Arc<dyn IndexStore>,
157        frag_reuse_index: Option<Arc<FragReuseIndex>>,
158        index_cache: &LanceCache,
159    ) -> Result<Arc<Self>> {
160        let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
161        let total_rows = page_lookup_file.num_rows();
162
163        if total_rows == 0 {
164            let schema = page_lookup_file.schema();
165            let data_type = schema.fields[0].data_type();
166            return Ok(Arc::new(Self::new(
167                BTreeMap::new(),
168                Arc::new(RowAddrTreeMap::default()),
169                data_type,
170                store,
171                WeakLanceCache::from(index_cache),
172                frag_reuse_index,
173            )));
174        }
175
176        let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
177        let mut null_map = Arc::new(RowAddrTreeMap::default());
178        let mut value_type: Option<DataType> = None;
179        let mut null_location: Option<usize> = None;
180        let mut row_offset = 0;
181
182        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
183            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
184            let chunk = page_lookup_file
185                .read_range(start_row..end_row, Some(&["keys"]))
186                .await?;
187
188            if chunk.num_rows() == 0 {
189                continue;
190            }
191
192            if value_type.is_none() {
193                value_type = Some(chunk.schema().field(0).data_type().clone());
194            }
195
196            let dict_keys = chunk.column(0);
197
198            for idx in 0..chunk.num_rows() {
199                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
200
201                if key.0.is_null() {
202                    null_location = Some(row_offset);
203                } else {
204                    index_map.insert(key, row_offset);
205                }
206
207                row_offset += 1;
208            }
209        }
210
211        if let Some(null_loc) = null_location {
212            let batch = page_lookup_file
213                .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
214                .await?;
215
216            let binary_bitmaps = batch
217                .column(0)
218                .as_any()
219                .downcast_ref::<BinaryArray>()
220                .ok_or_else(|| Error::Internal {
221                    message: "Invalid bitmap column type".to_string(),
222                    location: location!(),
223                })?;
224            let bitmap_bytes = binary_bitmaps.value(0);
225            let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
226
227            // Apply fragment remapping if needed
228            if let Some(fri) = &frag_reuse_index {
229                bitmap = fri.remap_row_addrs_tree_map(&bitmap);
230            }
231
232            null_map = Arc::new(bitmap);
233        }
234
235        let final_value_type = value_type.expect_ok()?;
236
237        Ok(Arc::new(Self::new(
238            index_map,
239            null_map,
240            final_value_type,
241            store,
242            WeakLanceCache::from(index_cache),
243            frag_reuse_index,
244        )))
245    }
246
247    async fn load_bitmap(
248        &self,
249        key: &OrderableScalarValue,
250        metrics: Option<&dyn MetricsCollector>,
251    ) -> Result<Arc<RowAddrTreeMap>> {
252        if key.0.is_null() {
253            return Ok(self.null_map.clone());
254        }
255
256        let cache_key = BitmapKey { value: key.clone() };
257
258        if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
259            return Ok(cached);
260        }
261
262        // Record that we're loading a partition from disk
263        if let Some(metrics) = metrics {
264            metrics.record_part_load();
265        }
266
267        let row_offset = match self.index_map.get(key) {
268            Some(loc) => *loc,
269            None => return Ok(Arc::new(RowAddrTreeMap::default())),
270        };
271
272        let page_lookup_file = self.lazy_reader.get().await?;
273        let batch = page_lookup_file
274            .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
275            .await?;
276
277        let binary_bitmaps = batch
278            .column(0)
279            .as_any()
280            .downcast_ref::<BinaryArray>()
281            .ok_or_else(|| Error::Internal {
282                message: "Invalid bitmap column type".to_string(),
283                location: location!(),
284            })?;
285        let bitmap_bytes = binary_bitmaps.value(0); // First (and only) row
286        let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
287
288        if let Some(fri) = &self.frag_reuse_index {
289            bitmap = fri.remap_row_addrs_tree_map(&bitmap);
290        }
291
292        self.index_cache
293            .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
294            .await;
295
296        Ok(Arc::new(bitmap))
297    }
298}
299
300impl DeepSizeOf for BitmapIndex {
301    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
302        let mut total_size = 0;
303
304        total_size += self.index_map.deep_size_of_children(context);
305        total_size += self.store.deep_size_of_children(context);
306
307        total_size
308    }
309}
310
311#[derive(Serialize)]
312struct BitmapStatistics {
313    num_bitmaps: usize,
314}
315
316#[async_trait]
317impl Index for BitmapIndex {
318    fn as_any(&self) -> &dyn Any {
319        self
320    }
321
322    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
323        self
324    }
325
326    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
327        Err(Error::NotSupported {
328            source: "BitmapIndex is not a vector index".into(),
329            location: location!(),
330        })
331    }
332
333    async fn prewarm(&self) -> Result<()> {
334        let page_lookup_file = self.lazy_reader.get().await?;
335        let total_rows = page_lookup_file.num_rows();
336
337        if total_rows == 0 {
338            return Ok(());
339        }
340
341        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
342            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
343            let chunk = page_lookup_file
344                .read_range(start_row..end_row, None)
345                .await?;
346
347            if chunk.num_rows() == 0 {
348                continue;
349            }
350
351            let dict_keys = chunk.column(0);
352            let binary_bitmaps = chunk.column(1);
353            let bitmap_binary_array = binary_bitmaps
354                .as_any()
355                .downcast_ref::<BinaryArray>()
356                .unwrap();
357
358            for idx in 0..chunk.num_rows() {
359                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
360
361                if key.0.is_null() {
362                    continue;
363                }
364
365                let bitmap_bytes = bitmap_binary_array.value(idx);
366                let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
367
368                if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
369                    bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
370                }
371
372                let cache_key = BitmapKey { value: key };
373                self.index_cache
374                    .insert_with_key(&cache_key, Arc::new(bitmap))
375                    .await;
376            }
377        }
378
379        Ok(())
380    }
381
382    fn index_type(&self) -> IndexType {
383        IndexType::Bitmap
384    }
385
386    fn statistics(&self) -> Result<serde_json::Value> {
387        let stats = BitmapStatistics {
388            num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
389        };
390        serde_json::to_value(stats).map_err(|e| Error::Internal {
391            message: format!("failed to serialize bitmap index statistics: {}", e),
392            location: location!(),
393        })
394    }
395
396    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
397        unimplemented!()
398    }
399}
400
401#[async_trait]
402impl ScalarIndex for BitmapIndex {
403    #[instrument(name = "bitmap_search", level = "debug", skip_all)]
404    async fn search(
405        &self,
406        query: &dyn AnyQuery,
407        metrics: &dyn MetricsCollector,
408    ) -> Result<SearchResult> {
409        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
410
411        let (row_ids, null_row_ids) = match query {
412            SargableQuery::Equals(val) => {
413                metrics.record_comparisons(1);
414                if val.is_null() {
415                    // Querying FOR nulls - they are the TRUE result, not NULL result
416                    ((*self.null_map).clone(), None)
417                } else {
418                    let key = OrderableScalarValue(val.clone());
419                    let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
420                    let null_rows = if !self.null_map.is_empty() {
421                        Some((*self.null_map).clone())
422                    } else {
423                        None
424                    };
425                    ((*bitmap).clone(), null_rows)
426                }
427            }
428            SargableQuery::Range(start, end) => {
429                let range_start = match start {
430                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
431                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
432                    Bound::Unbounded => Bound::Unbounded,
433                };
434
435                let range_end = match end {
436                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
437                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
438                    Bound::Unbounded => Bound::Unbounded,
439                };
440
441                let keys: Vec<_> = self
442                    .index_map
443                    .range((range_start, range_end))
444                    .map(|(k, _v)| k.clone())
445                    .collect();
446
447                metrics.record_comparisons(keys.len());
448
449                let result = if keys.is_empty() {
450                    RowAddrTreeMap::default()
451                } else {
452                    let bitmaps: Vec<_> = stream::iter(
453                        keys.into_iter()
454                            .map(|key| async move { self.load_bitmap(&key, None).await }),
455                    )
456                    .buffer_unordered(get_num_compute_intensive_cpus())
457                    .try_collect()
458                    .await?;
459
460                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
461                    RowAddrTreeMap::union_all(&bitmap_refs)
462                };
463
464                let null_rows = if !self.null_map.is_empty() {
465                    Some((*self.null_map).clone())
466                } else {
467                    None
468                };
469                (result, null_rows)
470            }
471            SargableQuery::IsIn(values) => {
472                metrics.record_comparisons(values.len());
473
474                // Collect keys that exist in the index, tracking if we need nulls
475                let mut has_null = false;
476                let keys: Vec<_> = values
477                    .iter()
478                    .filter_map(|val| {
479                        if val.is_null() {
480                            has_null = true;
481                            None
482                        } else {
483                            let key = OrderableScalarValue(val.clone());
484                            if self.index_map.contains_key(&key) {
485                                Some(key)
486                            } else {
487                                None
488                            }
489                        }
490                    })
491                    .collect();
492
493                // Load bitmaps in parallel
494                let mut bitmaps: Vec<_> = stream::iter(
495                    keys.into_iter()
496                        .map(|key| async move { self.load_bitmap(&key, None).await }),
497                )
498                .buffer_unordered(get_num_compute_intensive_cpus())
499                .try_collect()
500                .await?;
501
502                // Add null bitmap if needed
503                if has_null && !self.null_map.is_empty() {
504                    bitmaps.push(self.null_map.clone());
505                }
506
507                let result = if bitmaps.is_empty() {
508                    RowAddrTreeMap::default()
509                } else {
510                    // Convert Arc<RowAddrTreeMap> to &RowAddrTreeMap for union_all
511                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
512                    RowAddrTreeMap::union_all(&bitmap_refs)
513                };
514
515                // If the query explicitly includes null, then nulls are TRUE (not NULL)
516                // Otherwise, nulls remain NULL (unknown)
517                let null_rows = if !has_null && !self.null_map.is_empty() {
518                    Some((*self.null_map).clone())
519                } else {
520                    None
521                };
522                (result, null_rows)
523            }
524            SargableQuery::IsNull() => {
525                metrics.record_comparisons(1);
526                // Querying FOR nulls - they are the TRUE result, not NULL result
527                ((*self.null_map).clone(), None)
528            }
529            SargableQuery::FullTextSearch(_) => {
530                return Err(Error::NotSupported {
531                    source: "full text search is not supported for bitmap indexes".into(),
532                    location: location!(),
533                });
534            }
535        };
536
537        let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
538        Ok(SearchResult::Exact(selection))
539    }
540
541    fn can_remap(&self) -> bool {
542        true
543    }
544
545    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
546    async fn remap(
547        &self,
548        mapping: &HashMap<u64, Option<u64>>,
549        dest_store: &dyn IndexStore,
550    ) -> Result<CreatedIndex> {
551        let mut state = HashMap::new();
552
553        for key in self.index_map.keys() {
554            let bitmap = self.load_bitmap(key, None).await?;
555            let remapped_bitmap =
556                RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
557                    let addr_as_u64 = u64::from(addr);
558                    mapping
559                        .get(&addr_as_u64)
560                        .copied()
561                        .unwrap_or(Some(addr_as_u64))
562                }));
563            state.insert(key.0.clone(), remapped_bitmap);
564        }
565
566        if !self.null_map.is_empty() {
567            let remapped_null =
568                RowAddrTreeMap::from_iter(self.null_map.row_addrs().unwrap().filter_map(|addr| {
569                    let addr_as_u64 = u64::from(addr);
570                    mapping
571                        .get(&addr_as_u64)
572                        .copied()
573                        .unwrap_or(Some(addr_as_u64))
574                }));
575            state.insert(ScalarValue::try_from(&self.value_type)?, remapped_null);
576        }
577
578        BitmapIndexPlugin::write_bitmap_index(state, dest_store, &self.value_type).await?;
579
580        Ok(CreatedIndex {
581            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
582                .unwrap(),
583            index_version: BITMAP_INDEX_VERSION,
584        })
585    }
586
587    /// Add the new data into the index, creating an updated version of the index in `dest_store`
588    async fn update(
589        &self,
590        new_data: SendableRecordBatchStream,
591        dest_store: &dyn IndexStore,
592    ) -> Result<CreatedIndex> {
593        let mut state = HashMap::new();
594
595        // Initialize builder with existing data
596        for key in self.index_map.keys() {
597            let bitmap = self.load_bitmap(key, None).await?;
598            state.insert(key.0.clone(), (*bitmap).clone());
599        }
600
601        if !self.null_map.is_empty() {
602            let ex_null = new_null_array(&self.value_type, 1);
603            let ex_null = ScalarValue::try_from_array(ex_null.as_ref(), 0)?;
604            state.insert(ex_null, (*self.null_map).clone());
605        }
606
607        BitmapIndexPlugin::do_train_bitmap_index(new_data, state, dest_store).await?;
608
609        Ok(CreatedIndex {
610            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
611                .unwrap(),
612            index_version: BITMAP_INDEX_VERSION,
613        })
614    }
615
616    fn update_criteria(&self) -> UpdateCriteria {
617        UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None).with_row_id())
618    }
619
620    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
621        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
622    }
623}
624
625#[derive(Debug, Default)]
626pub struct BitmapIndexPlugin;
627
628impl BitmapIndexPlugin {
629    fn get_batch_from_arrays(
630        keys: Arc<dyn Array>,
631        binary_bitmaps: Arc<dyn Array>,
632    ) -> Result<RecordBatch> {
633        let schema = Arc::new(Schema::new(vec![
634            Field::new("keys", keys.data_type().clone(), true),
635            Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
636        ]));
637
638        let columns = vec![keys, binary_bitmaps];
639
640        Ok(RecordBatch::try_new(schema, columns)?)
641    }
642
643    async fn write_bitmap_index(
644        state: HashMap<ScalarValue, RowAddrTreeMap>,
645        index_store: &dyn IndexStore,
646        value_type: &DataType,
647    ) -> Result<()> {
648        let num_bitmaps = state.len();
649        let schema = Arc::new(Schema::new(vec![
650            Field::new("keys", value_type.clone(), true),
651            Field::new("bitmaps", DataType::Binary, true),
652        ]));
653
654        let mut bitmap_index_file = index_store
655            .new_index_file(BITMAP_LOOKUP_NAME, schema)
656            .await?;
657
658        let mut cur_keys = Vec::new();
659        let mut cur_bitmaps = Vec::new();
660        let mut cur_bytes = 0;
661
662        for (key, bitmap) in state.into_iter() {
663            let mut bytes = Vec::new();
664            bitmap.serialize_into(&mut bytes).unwrap();
665            let bitmap_size = bytes.len();
666
667            if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
668                let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
669                let mut binary_builder = BinaryBuilder::new();
670                for b in &cur_bitmaps {
671                    binary_builder.append_value(b);
672                }
673                let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
674
675                let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
676                bitmap_index_file.write_record_batch(record_batch).await?;
677
678                cur_keys.clear();
679                cur_bitmaps.clear();
680                cur_bytes = 0;
681            }
682
683            cur_keys.push(key);
684            cur_bitmaps.push(bytes);
685            cur_bytes += bitmap_size;
686        }
687
688        // Flush any remaining
689        if !cur_keys.is_empty() {
690            let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
691            let mut binary_builder = BinaryBuilder::new();
692            for b in &cur_bitmaps {
693                binary_builder.append_value(b);
694            }
695            let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
696
697            let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
698            bitmap_index_file.write_record_batch(record_batch).await?;
699        }
700
701        // Finish file with metadata that allows lightweight statistics reads
702        let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps }).map_err(|e| {
703            Error::Internal {
704                message: format!("failed to serialize bitmap statistics: {e}"),
705                location: location!(),
706            }
707        })?;
708        let mut metadata = HashMap::new();
709        metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
710
711        bitmap_index_file.finish_with_metadata(metadata).await?;
712
713        Ok(())
714    }
715
716    async fn do_train_bitmap_index(
717        mut data_source: SendableRecordBatchStream,
718        mut state: HashMap<ScalarValue, RowAddrTreeMap>,
719        index_store: &dyn IndexStore,
720    ) -> Result<()> {
721        let value_type = data_source.schema().field(0).data_type().clone();
722        while let Some(batch) = data_source.try_next().await? {
723            let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
724            let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
725            debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
726
727            let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
728
729            for i in 0..values.len() {
730                let row_id = row_id_column.value(i);
731                let key = ScalarValue::try_from_array(values.as_ref(), i)?;
732                state.entry(key.clone()).or_default().insert(row_id);
733            }
734        }
735
736        Self::write_bitmap_index(state, index_store, &value_type).await
737    }
738
739    pub async fn train_bitmap_index(
740        data: SendableRecordBatchStream,
741        index_store: &dyn IndexStore,
742    ) -> Result<()> {
743        // mapping from item to list of the row ids where it is present
744        let dictionary: HashMap<ScalarValue, RowAddrTreeMap> = HashMap::new();
745
746        Self::do_train_bitmap_index(data, dictionary, index_store).await
747    }
748}
749
750#[async_trait]
751impl ScalarIndexPlugin for BitmapIndexPlugin {
752    fn name(&self) -> &str {
753        "Bitmap"
754    }
755
756    fn new_training_request(
757        &self,
758        _params: &str,
759        field: &Field,
760    ) -> Result<Box<dyn TrainingRequest>> {
761        if field.data_type().is_nested() {
762            return Err(Error::InvalidInput {
763                source: "A bitmap index can only be created on a non-nested field.".into(),
764                location: location!(),
765            });
766        }
767        Ok(Box::new(DefaultTrainingRequest::new(
768            TrainingCriteria::new(TrainingOrdering::None).with_row_id(),
769        )))
770    }
771
772    fn provides_exact_answer(&self) -> bool {
773        true
774    }
775
776    fn version(&self) -> u32 {
777        BITMAP_INDEX_VERSION
778    }
779
780    fn new_query_parser(
781        &self,
782        index_name: String,
783        _index_details: &prost_types::Any,
784    ) -> Option<Box<dyn ScalarQueryParser>> {
785        Some(Box::new(SargableQueryParser::new(index_name, false)))
786    }
787
788    async fn train_index(
789        &self,
790        data: SendableRecordBatchStream,
791        index_store: &dyn IndexStore,
792        _request: Box<dyn TrainingRequest>,
793        fragment_ids: Option<Vec<u32>>,
794    ) -> Result<CreatedIndex> {
795        if fragment_ids.is_some() {
796            return Err(Error::InvalidInput {
797                source: "Bitmap index does not support fragment training".into(),
798                location: location!(),
799            });
800        }
801
802        Self::train_bitmap_index(data, index_store).await?;
803        Ok(CreatedIndex {
804            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
805                .unwrap(),
806            index_version: BITMAP_INDEX_VERSION,
807        })
808    }
809
810    /// Load an index from storage
811    async fn load_index(
812        &self,
813        index_store: Arc<dyn IndexStore>,
814        _index_details: &prost_types::Any,
815        frag_reuse_index: Option<Arc<FragReuseIndex>>,
816        cache: &LanceCache,
817    ) -> Result<Arc<dyn ScalarIndex>> {
818        Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
819    }
820
821    async fn load_statistics(
822        &self,
823        index_store: Arc<dyn IndexStore>,
824        _index_details: &prost_types::Any,
825    ) -> Result<Option<serde_json::Value>> {
826        let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
827        if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
828            let stats = serde_json::from_str(value).map_err(|e| Error::Internal {
829                message: format!("failed to parse bitmap statistics metadata: {e}"),
830                location: location!(),
831            })?;
832            Ok(Some(stats))
833        } else {
834            Ok(None)
835        }
836    }
837}
838
839#[cfg(test)]
840pub mod tests {
841    use super::*;
842    use crate::metrics::NoOpMetricsCollector;
843    use crate::scalar::lance_format::LanceIndexStore;
844    use arrow_array::{record_batch, RecordBatch, StringArray, UInt64Array};
845    use arrow_schema::{DataType, Field, Schema};
846    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
847    use futures::stream;
848    use lance_core::utils::mask::RowSetOps;
849    use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
850    use lance_io::object_store::ObjectStore;
851    use std::collections::HashMap;
852
853    #[tokio::test]
854    async fn test_bitmap_lazy_loading_and_cache() {
855        // Create a temporary directory for the index
856        let tmpdir = TempObjDir::default();
857        let store = Arc::new(LanceIndexStore::new(
858            Arc::new(ObjectStore::local()),
859            tmpdir.clone(),
860            Arc::new(LanceCache::no_cache()),
861        ));
862
863        // Create test data with low cardinality column
864        let colors = vec![
865            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
866            "red", "red", "blue", "green", "yellow",
867        ];
868
869        let row_ids = (0u64..15u64).collect::<Vec<_>>();
870
871        let schema = Arc::new(Schema::new(vec![
872            Field::new("value", DataType::Utf8, false),
873            Field::new("_rowid", DataType::UInt64, false),
874        ]));
875
876        let batch = RecordBatch::try_new(
877            schema.clone(),
878            vec![
879                Arc::new(StringArray::from(colors.clone())),
880                Arc::new(UInt64Array::from(row_ids.clone())),
881            ],
882        )
883        .unwrap();
884
885        let stream = stream::once(async move { Ok(batch) });
886        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
887
888        // Train and write the bitmap index
889        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
890            .await
891            .unwrap();
892
893        // Create a cache with limited capacity
894        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
895
896        // Load the index (should only load metadata, not bitmaps)
897        let index = BitmapIndex::load(store.clone(), None, &cache)
898            .await
899            .unwrap();
900
901        assert_eq!(index.index_map.len(), 4); // 4 non-null unique values (red, blue, green, yellow)
902        assert!(index.null_map.is_empty()); // No nulls in test data
903
904        // Test 1: Search for "red"
905        let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
906        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
907
908        // Verify results
909        let expected_red_rows = vec![0u64, 3, 6, 10, 11];
910        if let SearchResult::Exact(row_ids) = result {
911            let mut actual: Vec<u64> = row_ids
912                .true_rows()
913                .row_addrs()
914                .unwrap()
915                .map(|id| id.into())
916                .collect();
917            actual.sort();
918            assert_eq!(actual, expected_red_rows);
919        } else {
920            panic!("Expected exact search result");
921        }
922
923        // Test 2: Search for "red" again - should hit cache
924        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
925        if let SearchResult::Exact(row_ids) = result {
926            let mut actual: Vec<u64> = row_ids
927                .true_rows()
928                .row_addrs()
929                .unwrap()
930                .map(|id| id.into())
931                .collect();
932            actual.sort();
933            assert_eq!(actual, expected_red_rows);
934        }
935
936        // Test 3: Range query
937        let query = SargableQuery::Range(
938            std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
939            std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
940        );
941        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
942
943        let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
944        if let SearchResult::Exact(row_ids) = result {
945            let mut actual: Vec<u64> = row_ids
946                .true_rows()
947                .row_addrs()
948                .unwrap()
949                .map(|id| id.into())
950                .collect();
951            actual.sort();
952            assert_eq!(actual, expected_range_rows);
953        }
954
955        // Test 4: IsIn query
956        let query = SargableQuery::IsIn(vec![
957            ScalarValue::Utf8(Some("red".to_string())),
958            ScalarValue::Utf8(Some("yellow".to_string())),
959        ]);
960        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
961
962        let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
963        if let SearchResult::Exact(row_ids) = result {
964            let mut actual: Vec<u64> = row_ids
965                .true_rows()
966                .row_addrs()
967                .unwrap()
968                .map(|id| id.into())
969                .collect();
970            actual.sort();
971            assert_eq!(actual, expected_in_rows);
972        }
973    }
974
975    #[tokio::test]
976    #[ignore]
977    async fn test_big_bitmap_index() {
978        // WARNING: This test allocates a huge state to force overflow over int32 on BinaryArray
979        // You must run it only on a machine with enough resources (or skip it normally).
980        use super::{BitmapIndex, BITMAP_LOOKUP_NAME};
981        use crate::scalar::lance_format::LanceIndexStore;
982        use crate::scalar::IndexStore;
983        use arrow_schema::DataType;
984        use datafusion_common::ScalarValue;
985        use lance_core::cache::LanceCache;
986        use lance_core::utils::mask::RowAddrTreeMap;
987        use lance_io::object_store::ObjectStore;
988        use std::collections::HashMap;
989        use std::sync::Arc;
990
991        // Adjust these numbers so that:
992        //     m * (serialized size per bitmap) > 2^31 bytes.
993        //
994        // For example, if we assume each bitmap serializes to ~1000 bytes,
995        // you need m > 2.1e6.
996        let m: u32 = 2_500_000;
997        let per_bitmap_size = 1000; // assumed bytes per bitmap
998
999        let mut state = HashMap::new();
1000        for i in 0..m {
1001            // Create a bitmap that contains, say, 1000 row IDs.
1002            let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
1003
1004            let key = ScalarValue::UInt32(Some(i));
1005            state.insert(key, bitmap);
1006        }
1007
1008        // Create a temporary store.
1009        let tmpdir = TempObjDir::default();
1010        let test_store = LanceIndexStore::new(
1011            Arc::new(ObjectStore::local()),
1012            tmpdir.clone(),
1013            Arc::new(LanceCache::no_cache()),
1014        );
1015
1016        // This call should never trigger a "byte array offset overflow" error since now the code supports
1017        // read by chunks
1018        let result =
1019            BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
1020
1021        assert!(
1022            result.is_ok(),
1023            "Failed to write bitmap index: {:?}",
1024            result.err()
1025        );
1026
1027        // Verify the index file exists
1028        let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
1029        assert!(
1030            index_file.is_ok(),
1031            "Failed to open index file: {:?}",
1032            index_file.err()
1033        );
1034        let index_file = index_file.unwrap();
1035
1036        // Print stats about the index file
1037        tracing::info!(
1038            "Index file contains {} rows in total",
1039            index_file.num_rows()
1040        );
1041
1042        // Load the index using BitmapIndex::load
1043        tracing::info!("Loading index from disk...");
1044        let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
1045            .await
1046            .expect("Failed to load bitmap index");
1047
1048        // Verify the loaded index has the correct number of entries
1049        assert_eq!(
1050            loaded_index.index_map.len(),
1051            m as usize,
1052            "Loaded index has incorrect number of keys (expected {}, got {})",
1053            m,
1054            loaded_index.index_map.len()
1055        );
1056
1057        // Manually verify specific keys without using search()
1058        let test_keys = [0, m / 2, m - 1]; // Beginning, middle, and end
1059        for &key_val in &test_keys {
1060            let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
1061            // Load the bitmap for this key
1062            let bitmap = loaded_index
1063                .load_bitmap(&key, None)
1064                .await
1065                .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
1066
1067            // Convert RowAddrTreeMap to a vector for easier assertion
1068            let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
1069
1070            // Verify length
1071            assert_eq!(
1072                row_addrs.len(),
1073                per_bitmap_size as usize,
1074                "Bitmap for key {} has wrong size",
1075                key_val
1076            );
1077
1078            // Verify first few and last few elements
1079            for i in 0..5.min(per_bitmap_size) {
1080                assert!(
1081                    row_addrs.contains(&i),
1082                    "Bitmap for key {} should contain row_id {}",
1083                    key_val,
1084                    i
1085                );
1086            }
1087
1088            for i in (per_bitmap_size - 5)..per_bitmap_size {
1089                assert!(
1090                    row_addrs.contains(&i),
1091                    "Bitmap for key {} should contain row_id {}",
1092                    key_val,
1093                    i
1094                );
1095            }
1096
1097            // Verify exact range
1098            let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
1099            assert_eq!(
1100                row_addrs, expected_range,
1101                "Bitmap for key {} doesn't contain expected values",
1102                key_val
1103            );
1104
1105            tracing::info!(
1106                "✓ Verified bitmap for key {}: {} rows as expected",
1107                key_val,
1108                row_addrs.len()
1109            );
1110        }
1111
1112        tracing::info!("Test successful! Index properly contains {} keys", m);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_bitmap_prewarm() {
1117        // Create a temporary directory for the index
1118        let tmpdir = TempObjDir::default();
1119        let store = Arc::new(LanceIndexStore::new(
1120            Arc::new(ObjectStore::local()),
1121            tmpdir.clone(),
1122            Arc::new(LanceCache::no_cache()),
1123        ));
1124
1125        // Create test data with low cardinality
1126        let colors = vec![
1127            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1128            "red", "red", "blue", "green", "yellow",
1129        ];
1130
1131        let row_ids = (0u64..15u64).collect::<Vec<_>>();
1132
1133        let schema = Arc::new(Schema::new(vec![
1134            Field::new("value", DataType::Utf8, false),
1135            Field::new("_rowid", DataType::UInt64, false),
1136        ]));
1137
1138        let batch = RecordBatch::try_new(
1139            schema.clone(),
1140            vec![
1141                Arc::new(StringArray::from(colors.clone())),
1142                Arc::new(UInt64Array::from(row_ids.clone())),
1143            ],
1144        )
1145        .unwrap();
1146
1147        let stream = stream::once(async move { Ok(batch) });
1148        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1149
1150        // Train and write the bitmap index
1151        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1152            .await
1153            .unwrap();
1154
1155        // Create a cache with metrics tracking
1156        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
1157
1158        // Load the index (should only load metadata, not bitmaps)
1159        let index = BitmapIndex::load(store.clone(), None, &cache)
1160            .await
1161            .unwrap();
1162
1163        // Verify no bitmaps are cached yet
1164        let cache_key_red = BitmapKey {
1165            value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
1166        };
1167        let cache_key_blue = BitmapKey {
1168            value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
1169        };
1170
1171        assert!(cache
1172            .get_with_key::<BitmapKey>(&cache_key_red)
1173            .await
1174            .is_none());
1175        assert!(cache
1176            .get_with_key::<BitmapKey>(&cache_key_blue)
1177            .await
1178            .is_none());
1179
1180        // Call prewarm
1181        index.prewarm().await.unwrap();
1182
1183        // Verify all bitmaps are now cached
1184        assert!(cache
1185            .get_with_key::<BitmapKey>(&cache_key_red)
1186            .await
1187            .is_some());
1188        assert!(cache
1189            .get_with_key::<BitmapKey>(&cache_key_blue)
1190            .await
1191            .is_some());
1192
1193        // Verify cached bitmaps have correct content
1194        let cached_red = cache
1195            .get_with_key::<BitmapKey>(&cache_key_red)
1196            .await
1197            .unwrap();
1198        let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
1199        assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
1200
1201        // Call prewarm again - should be idempotent
1202        index.prewarm().await.unwrap();
1203
1204        // Verify cache still contains the same items
1205        let cached_red_2 = cache
1206            .get_with_key::<BitmapKey>(&cache_key_red)
1207            .await
1208            .unwrap();
1209        let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
1210        assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
1211    }
1212
1213    #[tokio::test]
1214    async fn test_remap_bitmap_with_null() {
1215        use arrow_array::UInt32Array;
1216
1217        // Create a temporary store.
1218        let tmpdir = TempObjDir::default();
1219        let test_store = Arc::new(LanceIndexStore::new(
1220            Arc::new(ObjectStore::local()),
1221            tmpdir.clone(),
1222            Arc::new(LanceCache::no_cache()),
1223        ));
1224
1225        // Create test data that simulates:
1226        // frag 1 - { 0: null, 1: null, 2: 1 }
1227        // frag 2 - { 0: 1, 1: 2, 2: 2 }
1228        // We'll create this data with specific row addresses
1229        let values = vec![
1230            None,       // row 0: null (will be at address (1,0))
1231            None,       // row 1: null (will be at address (1,1))
1232            Some(1u32), // row 2: 1    (will be at address (1,2))
1233            Some(1u32), // row 3: 1    (will be at address (2,0))
1234            Some(2u32), // row 4: 2    (will be at address (2,1))
1235            Some(2u32), // row 5: 2    (will be at address (2,2))
1236        ];
1237
1238        // Create row IDs with specific fragment addresses
1239        let row_ids: Vec<u64> = vec![
1240            RowAddress::new_from_parts(1, 0).into(),
1241            RowAddress::new_from_parts(1, 1).into(),
1242            RowAddress::new_from_parts(1, 2).into(),
1243            RowAddress::new_from_parts(2, 0).into(),
1244            RowAddress::new_from_parts(2, 1).into(),
1245            RowAddress::new_from_parts(2, 2).into(),
1246        ];
1247
1248        let schema = Arc::new(Schema::new(vec![
1249            Field::new("value", DataType::UInt32, true),
1250            Field::new("_rowid", DataType::UInt64, false),
1251        ]));
1252
1253        let batch = RecordBatch::try_new(
1254            schema.clone(),
1255            vec![
1256                Arc::new(UInt32Array::from(values)),
1257                Arc::new(UInt64Array::from(row_ids)),
1258            ],
1259        )
1260        .unwrap();
1261
1262        let stream = stream::once(async move { Ok(batch) });
1263        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1264
1265        // Create the bitmap index
1266        BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
1267            .await
1268            .unwrap();
1269
1270        // Load the index
1271        let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1272            .await
1273            .expect("Failed to load bitmap index");
1274
1275        // Verify initial state
1276        assert_eq!(index.index_map.len(), 2); // 2 non-null values (1 and 2)
1277        assert!(!index.null_map.is_empty()); // Should have null values
1278
1279        // Create a remap that simulates compaction of frags 1 and 2 into frag 3
1280        let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
1281        row_addr_map.insert(
1282            RowAddress::new_from_parts(1, 0).into(),
1283            Some(RowAddress::new_from_parts(3, 0).into()),
1284        );
1285        row_addr_map.insert(
1286            RowAddress::new_from_parts(1, 1).into(),
1287            Some(RowAddress::new_from_parts(3, 1).into()),
1288        );
1289        row_addr_map.insert(
1290            RowAddress::new_from_parts(1, 2).into(),
1291            Some(RowAddress::new_from_parts(3, 2).into()),
1292        );
1293        row_addr_map.insert(
1294            RowAddress::new_from_parts(2, 0).into(),
1295            Some(RowAddress::new_from_parts(3, 3).into()),
1296        );
1297        row_addr_map.insert(
1298            RowAddress::new_from_parts(2, 1).into(),
1299            Some(RowAddress::new_from_parts(3, 4).into()),
1300        );
1301        row_addr_map.insert(
1302            RowAddress::new_from_parts(2, 2).into(),
1303            Some(RowAddress::new_from_parts(3, 5).into()),
1304        );
1305
1306        // Perform remap
1307        index
1308            .remap(&row_addr_map, test_store.as_ref())
1309            .await
1310            .unwrap();
1311
1312        // Reload and check
1313        let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
1314            .await
1315            .expect("Failed to load remapped bitmap index");
1316
1317        // Verify the null bitmap was remapped correctly
1318        let expected_null_addrs: Vec<u64> = vec![
1319            RowAddress::new_from_parts(3, 0).into(),
1320            RowAddress::new_from_parts(3, 1).into(),
1321        ];
1322        let actual_null_addrs: Vec<u64> = reloaded_idx
1323            .null_map
1324            .row_addrs()
1325            .unwrap()
1326            .map(u64::from)
1327            .collect();
1328        assert_eq!(
1329            actual_null_addrs, expected_null_addrs,
1330            "Null bitmap not remapped correctly"
1331        );
1332
1333        // Search for value 1 and verify remapped addresses
1334        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
1335        let result = reloaded_idx
1336            .search(&query, &NoOpMetricsCollector)
1337            .await
1338            .unwrap();
1339        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1340            let mut actual: Vec<u64> = row_ids
1341                .true_rows()
1342                .row_addrs()
1343                .unwrap()
1344                .map(u64::from)
1345                .collect();
1346            actual.sort();
1347            let expected: Vec<u64> = vec![
1348                RowAddress::new_from_parts(3, 2).into(),
1349                RowAddress::new_from_parts(3, 3).into(),
1350            ];
1351            assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
1352        }
1353
1354        // Search for value 2 and verify remapped addresses
1355        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
1356        let result = reloaded_idx
1357            .search(&query, &NoOpMetricsCollector)
1358            .await
1359            .unwrap();
1360        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1361            let mut actual: Vec<u64> = row_ids
1362                .true_rows()
1363                .row_addrs()
1364                .unwrap()
1365                .map(u64::from)
1366                .collect();
1367            actual.sort();
1368            let expected: Vec<u64> = vec![
1369                RowAddress::new_from_parts(3, 4).into(),
1370                RowAddress::new_from_parts(3, 5).into(),
1371            ];
1372            assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
1373        }
1374
1375        // Search for null values
1376        let query = SargableQuery::IsNull();
1377        let result = reloaded_idx
1378            .search(&query, &NoOpMetricsCollector)
1379            .await
1380            .unwrap();
1381        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1382            let mut actual: Vec<u64> = row_ids
1383                .true_rows()
1384                .row_addrs()
1385                .unwrap()
1386                .map(u64::from)
1387                .collect();
1388            actual.sort();
1389            assert_eq!(
1390                actual, expected_null_addrs,
1391                "Null search results not correct"
1392            );
1393        }
1394    }
1395
1396    #[tokio::test]
1397    async fn test_bitmap_null_handling_in_queries() {
1398        // Test that bitmap index correctly returns null_list for queries
1399        let tmpdir = TempObjDir::default();
1400        let store = Arc::new(LanceIndexStore::new(
1401            Arc::new(ObjectStore::local()),
1402            tmpdir.clone(),
1403            Arc::new(LanceCache::no_cache()),
1404        ));
1405
1406        // Create test data: [0, 5, null]
1407        let batch = record_batch!(
1408            ("value", Int64, [Some(0), Some(5), None]),
1409            ("_rowid", UInt64, [0, 1, 2])
1410        )
1411        .unwrap();
1412        let schema = batch.schema();
1413        let stream = stream::once(async move { Ok(batch) });
1414        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1415
1416        // Train and write the bitmap index
1417        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1418            .await
1419            .unwrap();
1420
1421        let cache = LanceCache::with_capacity(1024 * 1024);
1422        let index = BitmapIndex::load(store.clone(), None, &cache)
1423            .await
1424            .unwrap();
1425
1426        // Test 1: Search for value 5 - should return allow=[1], null=[2]
1427        let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
1428        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1429
1430        match result {
1431            SearchResult::Exact(row_ids) => {
1432                let actual_rows: Vec<u64> = row_ids
1433                    .true_rows()
1434                    .row_addrs()
1435                    .unwrap()
1436                    .map(u64::from)
1437                    .collect();
1438                assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
1439
1440                let null_row_ids = row_ids.null_rows();
1441                // Check that null_row_ids contains row 2
1442                assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1443                let null_rows: Vec<u64> =
1444                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1445                assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1446            }
1447            _ => panic!("Expected Exact search result"),
1448        }
1449
1450        // Test 2: Search for null values - should return allow=[2], null=None
1451        let query = SargableQuery::IsNull();
1452        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1453
1454        match result {
1455            SearchResult::Exact(row_addrs) => {
1456                let actual_rows: Vec<u64> = row_addrs
1457                    .true_rows()
1458                    .row_addrs()
1459                    .unwrap()
1460                    .map(u64::from)
1461                    .collect();
1462                assert_eq!(
1463                    actual_rows,
1464                    vec![2],
1465                    "IsNull should find row 2 where value is null"
1466                );
1467
1468                let null_row_ids = row_addrs.null_rows();
1469                // When querying FOR nulls, null_row_ids should be None (nulls are the TRUE result)
1470                assert!(
1471                    null_row_ids.is_empty(),
1472                    "null_row_ids should be None for IsNull query"
1473                );
1474            }
1475            _ => panic!("Expected Exact search result"),
1476        }
1477
1478        // Test 3: Range query - should return matching rows and null_list
1479        let query = SargableQuery::Range(
1480            std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
1481            std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
1482        );
1483        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1484
1485        match result {
1486            SearchResult::Exact(row_addrs) => {
1487                let actual_rows: Vec<u64> = row_addrs
1488                    .true_rows()
1489                    .row_addrs()
1490                    .unwrap()
1491                    .map(u64::from)
1492                    .collect();
1493                assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
1494
1495                // Should report row 2 as null
1496                let null_row_ids = row_addrs.null_rows();
1497                assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1498                let null_rows: Vec<u64> =
1499                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1500                assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1501            }
1502            _ => panic!("Expected Exact search result"),
1503        }
1504    }
1505}