Skip to main content

lance_index/scalar/
bitmap.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{BTreeMap, HashMap},
7    fmt::Debug,
8    ops::Bound,
9    sync::Arc,
10};
11
12use arrow::array::BinaryBuilder;
13use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array, new_null_array};
14use arrow_schema::{DataType, Field, Schema};
15use async_trait::async_trait;
16use bytes::Bytes;
17use datafusion::physical_plan::SendableRecordBatchStream;
18use datafusion_common::ScalarValue;
19use deepsize::DeepSizeOf;
20use futures::{StreamExt, TryStreamExt, stream};
21use lance_core::utils::mask::RowSetOps;
22use lance_core::{
23    Error, ROW_ID, Result,
24    cache::{CacheKey, LanceCache, WeakLanceCache},
25    error::LanceOptionExt,
26    utils::{
27        mask::{NullableRowAddrSet, RowAddrTreeMap},
28        tokio::get_num_compute_intensive_cpus,
29    },
30};
31use roaring::RoaringBitmap;
32use serde::Serialize;
33use tracing::instrument;
34
35use super::{AnyQuery, IndexStore, ScalarIndex};
36use super::{
37    BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue,
38};
39use crate::pbold;
40use crate::{Index, IndexType, metrics::MetricsCollector};
41use crate::{
42    frag_reuse::FragReuseIndex,
43    scalar::{
44        CreatedIndex, UpdateCriteria,
45        expression::SargableQueryParser,
46        registry::{
47            DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering,
48            TrainingRequest, VALUE_COLUMN_NAME,
49        },
50    },
51};
52use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser};
53
54pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
55pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
56
57const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom
58
59const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
60
61const BITMAP_INDEX_VERSION: u32 = 0;
62
63// We only need to open a file reader if we need to load a bitmap. If all
64// bitmaps are cached we don't open it. If we do open it we should only open it once.
65#[derive(Clone)]
66struct LazyIndexReader {
67    index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
68    store: Arc<dyn IndexStore>,
69}
70
71impl std::fmt::Debug for LazyIndexReader {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("LazyIndexReader")
74            .field("store", &self.store)
75            .finish()
76    }
77}
78
79impl LazyIndexReader {
80    fn new(store: Arc<dyn IndexStore>) -> Self {
81        Self {
82            index_reader: Arc::new(tokio::sync::Mutex::new(None)),
83            store,
84        }
85    }
86
87    async fn get(&self) -> Result<Arc<dyn IndexReader>> {
88        let mut reader = self.index_reader.lock().await;
89        if reader.is_none() {
90            let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
91            *reader = Some(index_reader);
92        }
93        Ok(reader.as_ref().unwrap().clone())
94    }
95}
96
97/// A scalar index that stores a bitmap for each possible value
98///
99/// This index works best for low-cardinality columns, where the number of unique values is small.
100/// The bitmap stores a list of row ids where the value is present.
101#[derive(Clone, Debug)]
102pub struct BitmapIndex {
103    /// Maps each unique value to its bitmap location in the index file
104    /// The usize value is the row offset in the bitmap_page_lookup.lance file
105    /// for quickly locating the row and reading it out
106    index_map: BTreeMap<OrderableScalarValue, usize>,
107
108    null_map: Arc<RowAddrTreeMap>,
109
110    value_type: DataType,
111
112    store: Arc<dyn IndexStore>,
113
114    index_cache: WeakLanceCache,
115
116    frag_reuse_index: Option<Arc<FragReuseIndex>>,
117
118    lazy_reader: LazyIndexReader,
119}
120
121#[derive(Debug, Clone)]
122pub struct BitmapKey {
123    value: OrderableScalarValue,
124}
125
126impl CacheKey for BitmapKey {
127    type ValueType = RowAddrTreeMap;
128
129    fn key(&self) -> std::borrow::Cow<'_, str> {
130        format!("{}", self.value.0).into()
131    }
132
133    fn type_name() -> &'static str {
134        "Bitmap"
135    }
136}
137
138impl BitmapIndex {
139    fn new(
140        index_map: BTreeMap<OrderableScalarValue, usize>,
141        null_map: Arc<RowAddrTreeMap>,
142        value_type: DataType,
143        store: Arc<dyn IndexStore>,
144        index_cache: WeakLanceCache,
145        frag_reuse_index: Option<Arc<FragReuseIndex>>,
146    ) -> Self {
147        let lazy_reader = LazyIndexReader::new(store.clone());
148        Self {
149            index_map,
150            null_map,
151            value_type,
152            store,
153            index_cache,
154            frag_reuse_index,
155            lazy_reader,
156        }
157    }
158
159    pub(crate) async fn load(
160        store: Arc<dyn IndexStore>,
161        frag_reuse_index: Option<Arc<FragReuseIndex>>,
162        index_cache: &LanceCache,
163    ) -> Result<Arc<Self>> {
164        let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
165        let total_rows = page_lookup_file.num_rows();
166
167        if total_rows == 0 {
168            let schema = page_lookup_file.schema();
169            let data_type = schema.fields[0].data_type();
170            return Ok(Arc::new(Self::new(
171                BTreeMap::new(),
172                Arc::new(RowAddrTreeMap::default()),
173                data_type,
174                store,
175                WeakLanceCache::from(index_cache),
176                frag_reuse_index,
177            )));
178        }
179
180        let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
181        let mut null_map = Arc::new(RowAddrTreeMap::default());
182        let mut value_type: Option<DataType> = None;
183        let mut null_location: Option<usize> = None;
184        let mut row_offset = 0;
185
186        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
187            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
188            let chunk = page_lookup_file
189                .read_range(start_row..end_row, Some(&["keys"]))
190                .await?;
191
192            if chunk.num_rows() == 0 {
193                continue;
194            }
195
196            if value_type.is_none() {
197                value_type = Some(chunk.schema().field(0).data_type().clone());
198            }
199
200            let dict_keys = chunk.column(0);
201
202            for idx in 0..chunk.num_rows() {
203                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
204
205                if key.0.is_null() {
206                    null_location = Some(row_offset);
207                } else {
208                    index_map.insert(key, row_offset);
209                }
210
211                row_offset += 1;
212            }
213        }
214
215        if let Some(null_loc) = null_location {
216            let batch = page_lookup_file
217                .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
218                .await?;
219
220            let binary_bitmaps = batch
221                .column(0)
222                .as_any()
223                .downcast_ref::<BinaryArray>()
224                .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
225            let bitmap_bytes = binary_bitmaps.value(0);
226            let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
227
228            // Apply fragment remapping if needed
229            if let Some(fri) = &frag_reuse_index {
230                bitmap = fri.remap_row_addrs_tree_map(&bitmap);
231            }
232
233            null_map = Arc::new(bitmap);
234        }
235
236        let final_value_type = value_type.expect_ok()?;
237
238        Ok(Arc::new(Self::new(
239            index_map,
240            null_map,
241            final_value_type,
242            store,
243            WeakLanceCache::from(index_cache),
244            frag_reuse_index,
245        )))
246    }
247
248    async fn load_bitmap(
249        &self,
250        key: &OrderableScalarValue,
251        metrics: Option<&dyn MetricsCollector>,
252    ) -> Result<Arc<RowAddrTreeMap>> {
253        if key.0.is_null() {
254            return Ok(self.null_map.clone());
255        }
256
257        let cache_key = BitmapKey { value: key.clone() };
258
259        if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
260            return Ok(cached);
261        }
262
263        // Record that we're loading a partition from disk
264        if let Some(metrics) = metrics {
265            metrics.record_part_load();
266        }
267
268        let row_offset = match self.index_map.get(key) {
269            Some(loc) => *loc,
270            None => return Ok(Arc::new(RowAddrTreeMap::default())),
271        };
272
273        let page_lookup_file = self.lazy_reader.get().await?;
274        let batch = page_lookup_file
275            .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
276            .await?;
277
278        let binary_bitmaps = batch
279            .column(0)
280            .as_any()
281            .downcast_ref::<BinaryArray>()
282            .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
283        let bitmap_bytes = binary_bitmaps.value(0); // First (and only) row
284        let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
285
286        if let Some(fri) = &self.frag_reuse_index {
287            bitmap = fri.remap_row_addrs_tree_map(&bitmap);
288        }
289
290        self.index_cache
291            .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
292            .await;
293
294        Ok(Arc::new(bitmap))
295    }
296
297    pub(crate) fn value_type(&self) -> &DataType {
298        &self.value_type
299    }
300
301    /// Loads the current bitmap index into an in-memory value-to-row-id map.
302    pub(crate) async fn load_bitmap_index_state(
303        &self,
304    ) -> Result<HashMap<ScalarValue, RowAddrTreeMap>> {
305        let mut state = HashMap::new();
306
307        for key in self.index_map.keys() {
308            let bitmap = self.load_bitmap(key, None).await?;
309            state.insert(key.0.clone(), (*bitmap).clone());
310        }
311
312        if !self.null_map.is_empty() {
313            let existing_null = new_null_array(&self.value_type, 1);
314            let existing_null = ScalarValue::try_from_array(existing_null.as_ref(), 0)?;
315            state.insert(existing_null, (*self.null_map).clone());
316        }
317
318        Ok(state)
319    }
320}
321
322impl DeepSizeOf for BitmapIndex {
323    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
324        let mut total_size = 0;
325
326        total_size += self.index_map.deep_size_of_children(context);
327        total_size += self.store.deep_size_of_children(context);
328
329        total_size
330    }
331}
332
333#[derive(Serialize)]
334struct BitmapStatistics {
335    num_bitmaps: usize,
336}
337
338#[async_trait]
339impl Index for BitmapIndex {
340    fn as_any(&self) -> &dyn Any {
341        self
342    }
343
344    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
345        self
346    }
347
348    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
349        Err(Error::not_supported_source(
350            "BitmapIndex is not a vector index".into(),
351        ))
352    }
353
354    async fn prewarm(&self) -> Result<()> {
355        let page_lookup_file = self.lazy_reader.get().await?;
356        let total_rows = page_lookup_file.num_rows();
357
358        if total_rows == 0 {
359            return Ok(());
360        }
361
362        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
363            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
364            let chunk = page_lookup_file
365                .read_range(start_row..end_row, None)
366                .await?;
367
368            if chunk.num_rows() == 0 {
369                continue;
370            }
371
372            let dict_keys = chunk.column(0);
373            let binary_bitmaps = chunk.column(1);
374            let bitmap_binary_array = binary_bitmaps
375                .as_any()
376                .downcast_ref::<BinaryArray>()
377                .unwrap();
378
379            for idx in 0..chunk.num_rows() {
380                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
381
382                if key.0.is_null() {
383                    continue;
384                }
385
386                let bitmap_bytes = bitmap_binary_array.value(idx);
387                let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
388
389                if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
390                    bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
391                }
392
393                let cache_key = BitmapKey { value: key };
394                self.index_cache
395                    .insert_with_key(&cache_key, Arc::new(bitmap))
396                    .await;
397            }
398        }
399
400        Ok(())
401    }
402
403    fn index_type(&self) -> IndexType {
404        IndexType::Bitmap
405    }
406
407    fn statistics(&self) -> Result<serde_json::Value> {
408        let stats = BitmapStatistics {
409            num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
410        };
411        serde_json::to_value(stats).map_err(|e| {
412            Error::internal(format!(
413                "failed to serialize bitmap index statistics: {}",
414                e
415            ))
416        })
417    }
418
419    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
420        unimplemented!()
421    }
422}
423
424#[async_trait]
425impl ScalarIndex for BitmapIndex {
426    #[instrument(name = "bitmap_search", level = "debug", skip_all)]
427    async fn search(
428        &self,
429        query: &dyn AnyQuery,
430        metrics: &dyn MetricsCollector,
431    ) -> Result<SearchResult> {
432        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
433
434        let (row_ids, null_row_ids) = match query {
435            SargableQuery::Equals(val) => {
436                metrics.record_comparisons(1);
437                if val.is_null() {
438                    // Querying FOR nulls - they are the TRUE result, not NULL result
439                    ((*self.null_map).clone(), None)
440                } else {
441                    let key = OrderableScalarValue(val.clone());
442                    let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
443                    let null_rows = if !self.null_map.is_empty() {
444                        Some((*self.null_map).clone())
445                    } else {
446                        None
447                    };
448                    ((*bitmap).clone(), null_rows)
449                }
450            }
451            SargableQuery::Range(start, end) => {
452                let range_start = match start {
453                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
454                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
455                    Bound::Unbounded => Bound::Unbounded,
456                };
457
458                let range_end = match end {
459                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
460                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
461                    Bound::Unbounded => Bound::Unbounded,
462                };
463
464                // Empty range if lower > upper, or if any bound is excluded and lower >= upper.
465                let empty_range = match (&range_start, &range_end) {
466                    (Bound::Included(lower), Bound::Included(upper)) => lower > upper,
467                    (Bound::Included(lower), Bound::Excluded(upper))
468                    | (Bound::Excluded(lower), Bound::Included(upper))
469                    | (Bound::Excluded(lower), Bound::Excluded(upper)) => lower >= upper,
470                    _ => false,
471                };
472
473                let keys: Vec<_> = if empty_range {
474                    Vec::new()
475                } else {
476                    self.index_map
477                        .range((range_start, range_end))
478                        .map(|(k, _v)| k.clone())
479                        .collect()
480                };
481
482                metrics.record_comparisons(keys.len());
483
484                let result = if keys.is_empty() {
485                    RowAddrTreeMap::default()
486                } else {
487                    let bitmaps: Vec<_> = stream::iter(
488                        keys.into_iter()
489                            .map(|key| async move { self.load_bitmap(&key, None).await }),
490                    )
491                    .buffer_unordered(get_num_compute_intensive_cpus())
492                    .try_collect()
493                    .await?;
494
495                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
496                    RowAddrTreeMap::union_all(&bitmap_refs)
497                };
498
499                let null_rows = if !self.null_map.is_empty() {
500                    Some((*self.null_map).clone())
501                } else {
502                    None
503                };
504                (result, null_rows)
505            }
506            SargableQuery::IsIn(values) => {
507                metrics.record_comparisons(values.len());
508
509                // Collect keys that exist in the index, tracking if we need nulls
510                let mut has_null = false;
511                let keys: Vec<_> = values
512                    .iter()
513                    .filter_map(|val| {
514                        if val.is_null() {
515                            has_null = true;
516                            None
517                        } else {
518                            let key = OrderableScalarValue(val.clone());
519                            if self.index_map.contains_key(&key) {
520                                Some(key)
521                            } else {
522                                None
523                            }
524                        }
525                    })
526                    .collect();
527
528                // Load bitmaps in parallel
529                let mut bitmaps: Vec<_> = stream::iter(
530                    keys.into_iter()
531                        .map(|key| async move { self.load_bitmap(&key, None).await }),
532                )
533                .buffer_unordered(get_num_compute_intensive_cpus())
534                .try_collect()
535                .await?;
536
537                // Add null bitmap if needed
538                if has_null && !self.null_map.is_empty() {
539                    bitmaps.push(self.null_map.clone());
540                }
541
542                let result = if bitmaps.is_empty() {
543                    RowAddrTreeMap::default()
544                } else {
545                    // Convert Arc<RowAddrTreeMap> to &RowAddrTreeMap for union_all
546                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
547                    RowAddrTreeMap::union_all(&bitmap_refs)
548                };
549
550                // If the query explicitly includes null, then nulls are TRUE (not NULL)
551                // Otherwise, nulls remain NULL (unknown)
552                let null_rows = if !has_null && !self.null_map.is_empty() {
553                    Some((*self.null_map).clone())
554                } else {
555                    None
556                };
557                (result, null_rows)
558            }
559            SargableQuery::IsNull() => {
560                metrics.record_comparisons(1);
561                // Querying FOR nulls - they are the TRUE result, not NULL result
562                ((*self.null_map).clone(), None)
563            }
564            SargableQuery::FullTextSearch(_) => {
565                return Err(Error::not_supported_source(
566                    "full text search is not supported for bitmap indexes".into(),
567                ));
568            }
569            SargableQuery::LikePrefix(_) => {
570                return Err(Error::not_supported_source(
571                    "LIKE prefix queries are not supported for bitmap indexes".into(),
572                ));
573            }
574        };
575
576        let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
577        Ok(SearchResult::Exact(selection))
578    }
579
580    fn can_remap(&self) -> bool {
581        true
582    }
583
584    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
585    async fn remap(
586        &self,
587        mapping: &HashMap<u64, Option<u64>>,
588        dest_store: &dyn IndexStore,
589    ) -> Result<CreatedIndex> {
590        let state = self.load_bitmap_index_state().await?;
591        let remapped_state = BitmapIndexPlugin::remap_bitmap_state(state, mapping);
592        BitmapIndexPlugin::write_bitmap_index(remapped_state, dest_store, &self.value_type).await?;
593
594        Ok(CreatedIndex {
595            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
596                .unwrap(),
597            index_version: BITMAP_INDEX_VERSION,
598            files: Some(dest_store.list_files_with_sizes().await?),
599        })
600    }
601
602    /// Add the new data into the index, creating an updated version of the index in `dest_store`
603    async fn update(
604        &self,
605        new_data: SendableRecordBatchStream,
606        dest_store: &dyn IndexStore,
607        _old_data_filter: Option<super::OldIndexDataFilter>,
608    ) -> Result<CreatedIndex> {
609        BitmapIndexPlugin::streaming_build_and_write(new_data, Some(self), dest_store).await?;
610
611        Ok(CreatedIndex {
612            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
613                .unwrap(),
614            index_version: BITMAP_INDEX_VERSION,
615            files: Some(dest_store.list_files_with_sizes().await?),
616        })
617    }
618
619    fn update_criteria(&self) -> UpdateCriteria {
620        UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
621    }
622
623    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
624        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
625    }
626}
627
628/// Buffers serialized (key, bitmap) pairs and flushes them as record batches
629/// to the index file, respecting the MAX_BITMAP_ARRAY_LENGTH limit.
630struct BitmapBatchWriter {
631    file: Box<dyn super::IndexWriter>,
632    keys: Vec<ScalarValue>,
633    serialized: Vec<Vec<u8>>,
634    bytes: usize,
635    num_bitmaps: usize,
636}
637
638impl BitmapBatchWriter {
639    fn new(file: Box<dyn super::IndexWriter>) -> Self {
640        Self {
641            file,
642            keys: Vec::new(),
643            serialized: Vec::new(),
644            bytes: 0,
645            num_bitmaps: 0,
646        }
647    }
648
649    /// Serialize and buffer a single (key, bitmap) pair, flushing the current
650    /// batch to disk if adding it would exceed MAX_BITMAP_ARRAY_LENGTH.
651    async fn emit(&mut self, key: ScalarValue, bitmap: &RowAddrTreeMap) -> Result<()> {
652        let mut buf = Vec::new();
653        bitmap.serialize_into(&mut buf).unwrap();
654        let size = buf.len();
655
656        if self.bytes + size > MAX_BITMAP_ARRAY_LENGTH {
657            self.flush().await?;
658        }
659
660        self.keys.push(key);
661        self.serialized.push(buf);
662        self.bytes += size;
663        self.num_bitmaps += 1;
664        Ok(())
665    }
666
667    /// Write the current batch to disk.
668    async fn flush(&mut self) -> Result<()> {
669        if self.keys.is_empty() {
670            return Ok(());
671        }
672        let keys_array =
673            ScalarValue::iter_to_array(self.keys.drain(..).collect::<Vec<_>>().into_iter())
674                .unwrap();
675        let total_size: usize = self.serialized.iter().map(|b| b.len()).sum();
676        let mut binary_builder = BinaryBuilder::with_capacity(self.serialized.len(), total_size);
677        for b in self.serialized.drain(..) {
678            binary_builder.append_value(&b);
679        }
680        let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
681        let batch = BitmapIndexPlugin::get_batch_from_arrays(keys_array, bitmaps_array)?;
682        self.file.write_record_batch(batch).await?;
683        self.bytes = 0;
684        Ok(())
685    }
686
687    /// Flush any remaining data, write index statistics, and finalize the file.
688    async fn finish(mut self) -> Result<()> {
689        self.flush().await?;
690        let stats_json = serde_json::to_string(&BitmapStatistics {
691            num_bitmaps: self.num_bitmaps,
692        })
693        .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
694        let mut metadata = HashMap::new();
695        metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
696        self.file.finish_with_metadata(metadata).await?;
697        Ok(())
698    }
699}
700
701#[derive(Debug, Default)]
702pub struct BitmapIndexPlugin;
703
704impl BitmapIndexPlugin {
705    fn get_batch_from_arrays(
706        keys: Arc<dyn Array>,
707        binary_bitmaps: Arc<dyn Array>,
708    ) -> Result<RecordBatch> {
709        let schema = Arc::new(Schema::new(vec![
710            Field::new("keys", keys.data_type().clone(), true),
711            Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
712        ]));
713
714        let columns = vec![keys, binary_bitmaps];
715
716        Ok(RecordBatch::try_new(schema, columns)?)
717    }
718
719    async fn write_bitmap_index(
720        state: HashMap<ScalarValue, RowAddrTreeMap>,
721        index_store: &dyn IndexStore,
722        value_type: &DataType,
723    ) -> Result<()> {
724        Self::write_bitmap_index_with_extras(
725            state,
726            index_store,
727            value_type,
728            HashMap::new(),
729            Vec::new(),
730        )
731        .await
732    }
733
734    /// Writes a bitmap index and attaches extra metadata and global buffers.
735    pub(crate) async fn write_bitmap_index_with_extras(
736        state: HashMap<ScalarValue, RowAddrTreeMap>,
737        index_store: &dyn IndexStore,
738        value_type: &DataType,
739        mut metadata: HashMap<String, String>,
740        global_buffers: Vec<(String, Bytes)>,
741    ) -> Result<()> {
742        let num_bitmaps = state.len();
743        let schema = Arc::new(Schema::new(vec![
744            Field::new("keys", value_type.clone(), true),
745            Field::new("bitmaps", DataType::Binary, true),
746        ]));
747
748        let mut bitmap_index_file = index_store
749            .new_index_file(BITMAP_LOOKUP_NAME, schema)
750            .await?;
751
752        for (metadata_key, data) in global_buffers {
753            let buffer_idx = bitmap_index_file.add_global_buffer(data).await?;
754            metadata.insert(metadata_key, buffer_idx.to_string());
755        }
756
757        let mut cur_keys = Vec::new();
758        let mut cur_bitmaps = Vec::new();
759        let mut cur_bytes = 0;
760
761        for (key, bitmap) in state.into_iter() {
762            let mut bytes = Vec::new();
763            bitmap.serialize_into(&mut bytes).unwrap();
764            let bitmap_size = bytes.len();
765
766            if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
767                let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
768                let mut binary_builder = BinaryBuilder::new();
769                for b in &cur_bitmaps {
770                    binary_builder.append_value(b);
771                }
772                let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
773
774                let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
775                bitmap_index_file.write_record_batch(record_batch).await?;
776
777                cur_keys.clear();
778                cur_bitmaps.clear();
779                cur_bytes = 0;
780            }
781
782            cur_keys.push(key);
783            cur_bitmaps.push(bytes);
784            cur_bytes += bitmap_size;
785        }
786
787        // Flush any remaining
788        if !cur_keys.is_empty() {
789            let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
790            let mut binary_builder = BinaryBuilder::new();
791            for b in &cur_bitmaps {
792                binary_builder.append_value(b);
793            }
794            let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
795
796            let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
797            bitmap_index_file.write_record_batch(record_batch).await?;
798        }
799
800        // Finish file with metadata that allows lightweight statistics reads
801        let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps })
802            .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
803        metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
804
805        bitmap_index_file.finish_with_metadata(metadata).await?;
806
807        Ok(())
808    }
809
810    /// Builds bitmap index state from a `(value, row_id)` stream without writing it.
811    pub(crate) async fn build_bitmap_index_state(
812        mut data_source: SendableRecordBatchStream,
813        mut state: HashMap<ScalarValue, RowAddrTreeMap>,
814    ) -> Result<(HashMap<ScalarValue, RowAddrTreeMap>, DataType)> {
815        let value_type = data_source.schema().field(0).data_type().clone();
816        while let Some(batch) = data_source.try_next().await? {
817            let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
818            let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
819            debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
820
821            let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
822
823            for i in 0..values.len() {
824                let row_id = row_id_column.value(i);
825                let key = ScalarValue::try_from_array(values.as_ref(), i)?;
826                state.entry(key.clone()).or_default().insert(row_id);
827            }
828        }
829
830        Ok((state, value_type))
831    }
832
833    pub async fn train_bitmap_index(
834        data: SendableRecordBatchStream,
835        index_store: &dyn IndexStore,
836    ) -> Result<()> {
837        Self::streaming_build_and_write(data, None, index_store).await
838    }
839
840    /// Builds and writes a bitmap index in a streaming fashion from value-sorted
841    /// input. Only one value's bitmap is in memory at a time, reducing peak memory
842    /// from O(unique_values * avg_bitmap) to O(largest_single_bitmap).
843    ///
844    /// If `old_index` is provided, its existing bitmaps are merged with the new
845    /// data via a sorted merge-join (the old index_map is a BTreeMap, already
846    /// sorted by value).
847    async fn streaming_build_and_write(
848        mut data_source: SendableRecordBatchStream,
849        old_index: Option<&BitmapIndex>,
850        index_store: &dyn IndexStore,
851    ) -> Result<()> {
852        let value_type = data_source.schema().field(0).data_type().clone();
853
854        let schema = Arc::new(Schema::new(vec![
855            Field::new("keys", value_type.clone(), true),
856            Field::new("bitmaps", DataType::Binary, true),
857        ]));
858
859        let index_file = index_store
860            .new_index_file(BITMAP_LOOKUP_NAME, schema)
861            .await?;
862        let mut writer = BitmapBatchWriter::new(index_file);
863
864        // Collect old index keys (already in memory as BTreeMap keys — this is
865        // just a Vec of references, not a copy of the bitmaps themselves).
866        let old_keys: Vec<OrderableScalarValue> = old_index
867            .map(|idx| idx.index_map.keys().cloned().collect())
868            .unwrap_or_default();
869        let mut old_pos: usize = 0;
870
871        // Current value being accumulated from the new data stream.
872        let mut current_key: Option<ScalarValue> = None;
873        let mut current_bitmap = RowAddrTreeMap::default();
874        // Track whether we emitted a null bitmap (old index stores nulls
875        // separately in null_map, not in index_map).
876        let mut emitted_null = false;
877
878        while let Some(batch) = data_source.try_next().await? {
879            let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
880            let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
881            debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
882            let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
883
884            for i in 0..values.len() {
885                let row_id = row_id_column.value(i);
886                let key = ScalarValue::try_from_array(values.as_ref(), i)?;
887
888                match &current_key {
889                    Some(cur) if *cur == key => {
890                        current_bitmap.insert(row_id);
891                    }
892                    _ => {
893                        // Value changed — flush the previous run.
894                        if let Some(prev_key) = current_key.take() {
895                            let mut prev_bitmap = std::mem::take(&mut current_bitmap);
896                            Self::finish_run(
897                                prev_key,
898                                &mut prev_bitmap,
899                                old_index,
900                                &old_keys,
901                                &mut old_pos,
902                                &mut emitted_null,
903                                &mut writer,
904                            )
905                            .await?;
906                        }
907                        current_key = Some(key);
908                        current_bitmap = RowAddrTreeMap::default();
909                        current_bitmap.insert(row_id);
910                    }
911                }
912            }
913        }
914
915        // Flush the last accumulated run from new data.
916        if let Some(last_key) = current_key.take() {
917            let mut last_bitmap = std::mem::take(&mut current_bitmap);
918            Self::finish_run(
919                last_key,
920                &mut last_bitmap,
921                old_index,
922                &old_keys,
923                &mut old_pos,
924                &mut emitted_null,
925                &mut writer,
926            )
927            .await?;
928        }
929
930        // Emit any remaining old-only entries.
931        if let Some(idx) = old_index {
932            while old_pos < old_keys.len() {
933                let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?;
934                writer
935                    .emit(old_keys[old_pos].0.clone(), &old_bitmap)
936                    .await?;
937                old_pos += 1;
938            }
939        }
940
941        // Emit old null bitmap if we didn't already merge it with new nulls.
942        if !emitted_null
943            && let Some(idx) = old_index
944            && !idx.null_map.is_empty()
945        {
946            let null_key = new_null_array(&value_type, 1);
947            let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?;
948            writer.emit(null_key, &idx.null_map).await?;
949        }
950
951        writer.finish().await?;
952
953        Ok(())
954    }
955
956    /// Flush a completed value-run from the new data stream, emitting any
957    /// old-only entries that sort before it and merging the old bitmap if the
958    /// key exists in both old and new.
959    async fn finish_run(
960        key: ScalarValue,
961        bitmap: &mut RowAddrTreeMap,
962        old_index: Option<&BitmapIndex>,
963        old_keys: &[OrderableScalarValue],
964        old_pos: &mut usize,
965        emitted_null: &mut bool,
966        writer: &mut BitmapBatchWriter,
967    ) -> Result<()> {
968        if key.is_null() {
969            // Null values are stored separately in the old index's null_map.
970            if let Some(idx) = old_index
971                && !idx.null_map.is_empty()
972            {
973                *bitmap |= &*idx.null_map;
974            }
975            *emitted_null = true;
976            writer.emit(key, bitmap).await?;
977        } else if let Some(idx) = old_index {
978            let orderable = OrderableScalarValue(key.clone());
979
980            // Emit old-only entries that sort before this key.
981            while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable {
982                let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
983                writer
984                    .emit(old_keys[*old_pos].0.clone(), &old_bitmap)
985                    .await?;
986                *old_pos += 1;
987            }
988
989            // If the old index also has this key, merge its bitmap.
990            if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable {
991                let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
992                *bitmap |= &*old_bitmap;
993                *old_pos += 1;
994            }
995
996            writer.emit(key, bitmap).await?;
997        } else {
998            writer.emit(key, bitmap).await?;
999        }
1000        Ok(())
1001    }
1002
1003    /// Remaps every bitmap in a materialized bitmap-index state using row-id mappings.
1004    pub(crate) fn remap_bitmap_state(
1005        state: HashMap<ScalarValue, RowAddrTreeMap>,
1006        mapping: &HashMap<u64, Option<u64>>,
1007    ) -> HashMap<ScalarValue, RowAddrTreeMap> {
1008        state
1009            .into_iter()
1010            .map(|(key, bitmap)| {
1011                let remapped_bitmap =
1012                    RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
1013                        let addr_as_u64 = u64::from(addr);
1014                        mapping
1015                            .get(&addr_as_u64)
1016                            .copied()
1017                            .unwrap_or(Some(addr_as_u64))
1018                    }));
1019                (key, remapped_bitmap)
1020            })
1021            .collect()
1022    }
1023}
1024
1025#[async_trait]
1026impl ScalarIndexPlugin for BitmapIndexPlugin {
1027    fn name(&self) -> &str {
1028        "Bitmap"
1029    }
1030
1031    fn new_training_request(
1032        &self,
1033        _params: &str,
1034        field: &Field,
1035    ) -> Result<Box<dyn TrainingRequest>> {
1036        if field.data_type().is_nested() {
1037            return Err(Error::invalid_input_source(
1038                "A bitmap index can only be created on a non-nested field.".into(),
1039            ));
1040        }
1041        Ok(Box::new(DefaultTrainingRequest::new(
1042            TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
1043        )))
1044    }
1045
1046    fn provides_exact_answer(&self) -> bool {
1047        true
1048    }
1049
1050    fn version(&self) -> u32 {
1051        BITMAP_INDEX_VERSION
1052    }
1053
1054    fn new_query_parser(
1055        &self,
1056        index_name: String,
1057        _index_details: &prost_types::Any,
1058    ) -> Option<Box<dyn ScalarQueryParser>> {
1059        Some(Box::new(SargableQueryParser::new(index_name, false)))
1060    }
1061
1062    async fn train_index(
1063        &self,
1064        data: SendableRecordBatchStream,
1065        index_store: &dyn IndexStore,
1066        _request: Box<dyn TrainingRequest>,
1067        fragment_ids: Option<Vec<u32>>,
1068        _progress: Arc<dyn crate::progress::IndexBuildProgress>,
1069    ) -> Result<CreatedIndex> {
1070        if fragment_ids.is_some() {
1071            return Err(Error::invalid_input_source(
1072                "Bitmap index does not support fragment training".into(),
1073            ));
1074        }
1075
1076        Self::train_bitmap_index(data, index_store).await?;
1077        Ok(CreatedIndex {
1078            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
1079                .unwrap(),
1080            index_version: BITMAP_INDEX_VERSION,
1081            files: Some(index_store.list_files_with_sizes().await?),
1082        })
1083    }
1084
1085    /// Load an index from storage
1086    async fn load_index(
1087        &self,
1088        index_store: Arc<dyn IndexStore>,
1089        _index_details: &prost_types::Any,
1090        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1091        cache: &LanceCache,
1092    ) -> Result<Arc<dyn ScalarIndex>> {
1093        Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
1094    }
1095
1096    async fn load_statistics(
1097        &self,
1098        index_store: Arc<dyn IndexStore>,
1099        _index_details: &prost_types::Any,
1100    ) -> Result<Option<serde_json::Value>> {
1101        let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
1102        if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
1103            let stats = serde_json::from_str(value).map_err(|e| {
1104                Error::internal(format!("failed to parse bitmap statistics metadata: {e}"))
1105            })?;
1106            Ok(Some(stats))
1107        } else {
1108            Ok(None)
1109        }
1110    }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115    use super::*;
1116    use crate::metrics::NoOpMetricsCollector;
1117    use crate::scalar::lance_format::LanceIndexStore;
1118    use arrow_array::{RecordBatch, StringArray, UInt64Array, record_batch};
1119    use arrow_schema::{DataType, Field, Schema};
1120
1121    /// Sort a (value, row_id) RecordBatch by the value column so that unit tests
1122    /// match the ordering the production scanner applies via TrainingOrdering::Values.
1123    fn sort_batch_by_value(batch: &RecordBatch) -> RecordBatch {
1124        use arrow::compute::SortOptions;
1125        let values = batch.column(0);
1126        let row_ids = batch.column(1);
1127        let options = SortOptions {
1128            descending: false,
1129            nulls_first: true,
1130        };
1131        let indices = arrow::compute::sort_to_indices(values, Some(options), None).unwrap();
1132        let sorted_values = arrow::compute::take(values.as_ref(), &indices, None).unwrap();
1133        let sorted_row_ids = arrow::compute::take(row_ids.as_ref(), &indices, None).unwrap();
1134        RecordBatch::try_new(batch.schema(), vec![sorted_values, sorted_row_ids]).unwrap()
1135    }
1136    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1137    use futures::stream;
1138    use lance_core::utils::mask::RowSetOps;
1139    use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
1140    use lance_io::object_store::ObjectStore;
1141    use std::collections::HashMap;
1142
1143    #[tokio::test]
1144    async fn test_bitmap_lazy_loading_and_cache() {
1145        // Create a temporary directory for the index
1146        let tmpdir = TempObjDir::default();
1147        let store = Arc::new(LanceIndexStore::new(
1148            Arc::new(ObjectStore::local()),
1149            tmpdir.clone(),
1150            Arc::new(LanceCache::no_cache()),
1151        ));
1152
1153        // Create test data with low cardinality column
1154        let colors = vec![
1155            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1156            "red", "red", "blue", "green", "yellow",
1157        ];
1158
1159        let row_ids = (0u64..15u64).collect::<Vec<_>>();
1160
1161        let schema = Arc::new(Schema::new(vec![
1162            Field::new("value", DataType::Utf8, false),
1163            Field::new("_rowid", DataType::UInt64, false),
1164        ]));
1165
1166        let batch = RecordBatch::try_new(
1167            schema.clone(),
1168            vec![
1169                Arc::new(StringArray::from(colors.clone())),
1170                Arc::new(UInt64Array::from(row_ids.clone())),
1171            ],
1172        )
1173        .unwrap();
1174
1175        let batch = sort_batch_by_value(&batch);
1176        let stream = stream::once(async move { Ok(batch) });
1177        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1178
1179        // Train and write the bitmap index
1180        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1181            .await
1182            .unwrap();
1183
1184        // Create a cache with limited capacity
1185        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
1186
1187        // Load the index (should only load metadata, not bitmaps)
1188        let index = BitmapIndex::load(store.clone(), None, &cache)
1189            .await
1190            .unwrap();
1191
1192        assert_eq!(index.index_map.len(), 4); // 4 non-null unique values (red, blue, green, yellow)
1193        assert!(index.null_map.is_empty()); // No nulls in test data
1194
1195        // Test 1: Search for "red"
1196        let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
1197        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1198
1199        // Verify results
1200        let expected_red_rows = vec![0u64, 3, 6, 10, 11];
1201        if let SearchResult::Exact(row_ids) = result {
1202            let mut actual: Vec<u64> = row_ids
1203                .true_rows()
1204                .row_addrs()
1205                .unwrap()
1206                .map(|id| id.into())
1207                .collect();
1208            actual.sort();
1209            assert_eq!(actual, expected_red_rows);
1210        } else {
1211            panic!("Expected exact search result");
1212        }
1213
1214        // Test 2: Search for "red" again - should hit cache
1215        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1216        if let SearchResult::Exact(row_ids) = result {
1217            let mut actual: Vec<u64> = row_ids
1218                .true_rows()
1219                .row_addrs()
1220                .unwrap()
1221                .map(|id| id.into())
1222                .collect();
1223            actual.sort();
1224            assert_eq!(actual, expected_red_rows);
1225        }
1226
1227        // Test 3: Range query
1228        let query = SargableQuery::Range(
1229            std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1230            std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1231        );
1232        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1233
1234        let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
1235        if let SearchResult::Exact(row_ids) = result {
1236            let mut actual: Vec<u64> = row_ids
1237                .true_rows()
1238                .row_addrs()
1239                .unwrap()
1240                .map(|id| id.into())
1241                .collect();
1242            actual.sort();
1243            assert_eq!(actual, expected_range_rows);
1244        }
1245
1246        // Test 3b: Inverted range query should return empty result
1247        let query = SargableQuery::Range(
1248            std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1249            std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1250        );
1251        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1252        if let SearchResult::Exact(row_ids) = result {
1253            assert!(row_ids.true_rows().is_empty());
1254        } else {
1255            panic!("Expected exact search result");
1256        }
1257
1258        // Test 4: IsIn query
1259        let query = SargableQuery::IsIn(vec![
1260            ScalarValue::Utf8(Some("red".to_string())),
1261            ScalarValue::Utf8(Some("yellow".to_string())),
1262        ]);
1263        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1264
1265        let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
1266        if let SearchResult::Exact(row_ids) = result {
1267            let mut actual: Vec<u64> = row_ids
1268                .true_rows()
1269                .row_addrs()
1270                .unwrap()
1271                .map(|id| id.into())
1272                .collect();
1273            actual.sort();
1274            assert_eq!(actual, expected_in_rows);
1275        }
1276    }
1277
1278    #[tokio::test]
1279    #[ignore]
1280    async fn test_big_bitmap_index() {
1281        // WARNING: This test allocates a huge state to force overflow over int32 on BinaryArray
1282        // You must run it only on a machine with enough resources (or skip it normally).
1283        use super::{BITMAP_LOOKUP_NAME, BitmapIndex};
1284        use crate::scalar::IndexStore;
1285        use crate::scalar::lance_format::LanceIndexStore;
1286        use arrow_schema::DataType;
1287        use datafusion_common::ScalarValue;
1288        use lance_core::cache::LanceCache;
1289        use lance_core::utils::mask::RowAddrTreeMap;
1290        use lance_io::object_store::ObjectStore;
1291        use std::collections::HashMap;
1292        use std::sync::Arc;
1293
1294        // Adjust these numbers so that:
1295        //     m * (serialized size per bitmap) > 2^31 bytes.
1296        //
1297        // For example, if we assume each bitmap serializes to ~1000 bytes,
1298        // you need m > 2.1e6.
1299        let m: u32 = 2_500_000;
1300        let per_bitmap_size = 1000; // assumed bytes per bitmap
1301
1302        let mut state = HashMap::new();
1303        for i in 0..m {
1304            // Create a bitmap that contains, say, 1000 row IDs.
1305            let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
1306
1307            let key = ScalarValue::UInt32(Some(i));
1308            state.insert(key, bitmap);
1309        }
1310
1311        // Create a temporary store.
1312        let tmpdir = TempObjDir::default();
1313        let test_store = LanceIndexStore::new(
1314            Arc::new(ObjectStore::local()),
1315            tmpdir.clone(),
1316            Arc::new(LanceCache::no_cache()),
1317        );
1318
1319        // This call should never trigger a "byte array offset overflow" error since now the code supports
1320        // read by chunks
1321        let result =
1322            BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
1323
1324        assert!(
1325            result.is_ok(),
1326            "Failed to write bitmap index: {:?}",
1327            result.err()
1328        );
1329
1330        // Verify the index file exists
1331        let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
1332        assert!(
1333            index_file.is_ok(),
1334            "Failed to open index file: {:?}",
1335            index_file.err()
1336        );
1337        let index_file = index_file.unwrap();
1338
1339        // Print stats about the index file
1340        tracing::info!(
1341            "Index file contains {} rows in total",
1342            index_file.num_rows()
1343        );
1344
1345        // Load the index using BitmapIndex::load
1346        tracing::info!("Loading index from disk...");
1347        let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
1348            .await
1349            .expect("Failed to load bitmap index");
1350
1351        // Verify the loaded index has the correct number of entries
1352        assert_eq!(
1353            loaded_index.index_map.len(),
1354            m as usize,
1355            "Loaded index has incorrect number of keys (expected {}, got {})",
1356            m,
1357            loaded_index.index_map.len()
1358        );
1359
1360        // Manually verify specific keys without using search()
1361        let test_keys = [0, m / 2, m - 1]; // Beginning, middle, and end
1362        for &key_val in &test_keys {
1363            let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
1364            // Load the bitmap for this key
1365            let bitmap = loaded_index
1366                .load_bitmap(&key, None)
1367                .await
1368                .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
1369
1370            // Convert RowAddrTreeMap to a vector for easier assertion
1371            let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
1372
1373            // Verify length
1374            assert_eq!(
1375                row_addrs.len(),
1376                per_bitmap_size as usize,
1377                "Bitmap for key {} has wrong size",
1378                key_val
1379            );
1380
1381            // Verify first few and last few elements
1382            for i in 0..5.min(per_bitmap_size) {
1383                assert!(
1384                    row_addrs.contains(&i),
1385                    "Bitmap for key {} should contain row_id {}",
1386                    key_val,
1387                    i
1388                );
1389            }
1390
1391            for i in (per_bitmap_size - 5)..per_bitmap_size {
1392                assert!(
1393                    row_addrs.contains(&i),
1394                    "Bitmap for key {} should contain row_id {}",
1395                    key_val,
1396                    i
1397                );
1398            }
1399
1400            // Verify exact range
1401            let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
1402            assert_eq!(
1403                row_addrs, expected_range,
1404                "Bitmap for key {} doesn't contain expected values",
1405                key_val
1406            );
1407
1408            tracing::info!(
1409                "✓ Verified bitmap for key {}: {} rows as expected",
1410                key_val,
1411                row_addrs.len()
1412            );
1413        }
1414
1415        tracing::info!("Test successful! Index properly contains {} keys", m);
1416    }
1417
1418    #[tokio::test]
1419    async fn test_bitmap_prewarm() {
1420        // Create a temporary directory for the index
1421        let tmpdir = TempObjDir::default();
1422        let store = Arc::new(LanceIndexStore::new(
1423            Arc::new(ObjectStore::local()),
1424            tmpdir.clone(),
1425            Arc::new(LanceCache::no_cache()),
1426        ));
1427
1428        // Create test data with low cardinality
1429        let colors = vec![
1430            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1431            "red", "red", "blue", "green", "yellow",
1432        ];
1433
1434        let row_ids = (0u64..15u64).collect::<Vec<_>>();
1435
1436        let schema = Arc::new(Schema::new(vec![
1437            Field::new("value", DataType::Utf8, false),
1438            Field::new("_rowid", DataType::UInt64, false),
1439        ]));
1440
1441        let batch = RecordBatch::try_new(
1442            schema.clone(),
1443            vec![
1444                Arc::new(StringArray::from(colors.clone())),
1445                Arc::new(UInt64Array::from(row_ids.clone())),
1446            ],
1447        )
1448        .unwrap();
1449
1450        let batch = sort_batch_by_value(&batch);
1451        let stream = stream::once(async move { Ok(batch) });
1452        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1453
1454        // Train and write the bitmap index
1455        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1456            .await
1457            .unwrap();
1458
1459        // Create a cache with metrics tracking
1460        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
1461
1462        // Load the index (should only load metadata, not bitmaps)
1463        let index = BitmapIndex::load(store.clone(), None, &cache)
1464            .await
1465            .unwrap();
1466
1467        // Verify no bitmaps are cached yet
1468        let cache_key_red = BitmapKey {
1469            value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
1470        };
1471        let cache_key_blue = BitmapKey {
1472            value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
1473        };
1474
1475        assert!(
1476            cache
1477                .get_with_key::<BitmapKey>(&cache_key_red)
1478                .await
1479                .is_none()
1480        );
1481        assert!(
1482            cache
1483                .get_with_key::<BitmapKey>(&cache_key_blue)
1484                .await
1485                .is_none()
1486        );
1487
1488        // Call prewarm
1489        index.prewarm().await.unwrap();
1490
1491        // Verify all bitmaps are now cached
1492        assert!(
1493            cache
1494                .get_with_key::<BitmapKey>(&cache_key_red)
1495                .await
1496                .is_some()
1497        );
1498        assert!(
1499            cache
1500                .get_with_key::<BitmapKey>(&cache_key_blue)
1501                .await
1502                .is_some()
1503        );
1504
1505        // Verify cached bitmaps have correct content
1506        let cached_red = cache
1507            .get_with_key::<BitmapKey>(&cache_key_red)
1508            .await
1509            .unwrap();
1510        let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
1511        assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
1512
1513        // Call prewarm again - should be idempotent
1514        index.prewarm().await.unwrap();
1515
1516        // Verify cache still contains the same items
1517        let cached_red_2 = cache
1518            .get_with_key::<BitmapKey>(&cache_key_red)
1519            .await
1520            .unwrap();
1521        let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
1522        assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
1523    }
1524
1525    #[tokio::test]
1526    async fn test_remap_bitmap_with_null() {
1527        use arrow_array::UInt32Array;
1528
1529        // Create a temporary store.
1530        let tmpdir = TempObjDir::default();
1531        let test_store = Arc::new(LanceIndexStore::new(
1532            Arc::new(ObjectStore::local()),
1533            tmpdir.clone(),
1534            Arc::new(LanceCache::no_cache()),
1535        ));
1536
1537        // Create test data that simulates:
1538        // frag 1 - { 0: null, 1: null, 2: 1 }
1539        // frag 2 - { 0: 1, 1: 2, 2: 2 }
1540        // We'll create this data with specific row addresses
1541        let values = vec![
1542            None,       // row 0: null (will be at address (1,0))
1543            None,       // row 1: null (will be at address (1,1))
1544            Some(1u32), // row 2: 1    (will be at address (1,2))
1545            Some(1u32), // row 3: 1    (will be at address (2,0))
1546            Some(2u32), // row 4: 2    (will be at address (2,1))
1547            Some(2u32), // row 5: 2    (will be at address (2,2))
1548        ];
1549
1550        // Create row IDs with specific fragment addresses
1551        let row_ids: Vec<u64> = vec![
1552            RowAddress::new_from_parts(1, 0).into(),
1553            RowAddress::new_from_parts(1, 1).into(),
1554            RowAddress::new_from_parts(1, 2).into(),
1555            RowAddress::new_from_parts(2, 0).into(),
1556            RowAddress::new_from_parts(2, 1).into(),
1557            RowAddress::new_from_parts(2, 2).into(),
1558        ];
1559
1560        let schema = Arc::new(Schema::new(vec![
1561            Field::new("value", DataType::UInt32, true),
1562            Field::new("_rowid", DataType::UInt64, false),
1563        ]));
1564
1565        let batch = RecordBatch::try_new(
1566            schema.clone(),
1567            vec![
1568                Arc::new(UInt32Array::from(values)),
1569                Arc::new(UInt64Array::from(row_ids)),
1570            ],
1571        )
1572        .unwrap();
1573
1574        let stream = stream::once(async move { Ok(batch) });
1575        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1576
1577        // Create the bitmap index
1578        BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
1579            .await
1580            .unwrap();
1581
1582        // Load the index
1583        let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1584            .await
1585            .expect("Failed to load bitmap index");
1586
1587        // Verify initial state
1588        assert_eq!(index.index_map.len(), 2); // 2 non-null values (1 and 2)
1589        assert!(!index.null_map.is_empty()); // Should have null values
1590
1591        // Create a remap that simulates compaction of frags 1 and 2 into frag 3
1592        let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
1593        row_addr_map.insert(
1594            RowAddress::new_from_parts(1, 0).into(),
1595            Some(RowAddress::new_from_parts(3, 0).into()),
1596        );
1597        row_addr_map.insert(
1598            RowAddress::new_from_parts(1, 1).into(),
1599            Some(RowAddress::new_from_parts(3, 1).into()),
1600        );
1601        row_addr_map.insert(
1602            RowAddress::new_from_parts(1, 2).into(),
1603            Some(RowAddress::new_from_parts(3, 2).into()),
1604        );
1605        row_addr_map.insert(
1606            RowAddress::new_from_parts(2, 0).into(),
1607            Some(RowAddress::new_from_parts(3, 3).into()),
1608        );
1609        row_addr_map.insert(
1610            RowAddress::new_from_parts(2, 1).into(),
1611            Some(RowAddress::new_from_parts(3, 4).into()),
1612        );
1613        row_addr_map.insert(
1614            RowAddress::new_from_parts(2, 2).into(),
1615            Some(RowAddress::new_from_parts(3, 5).into()),
1616        );
1617
1618        // Perform remap
1619        index
1620            .remap(&row_addr_map, test_store.as_ref())
1621            .await
1622            .unwrap();
1623
1624        // Reload and check
1625        let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
1626            .await
1627            .expect("Failed to load remapped bitmap index");
1628
1629        // Verify the null bitmap was remapped correctly
1630        let expected_null_addrs: Vec<u64> = vec![
1631            RowAddress::new_from_parts(3, 0).into(),
1632            RowAddress::new_from_parts(3, 1).into(),
1633        ];
1634        let actual_null_addrs: Vec<u64> = reloaded_idx
1635            .null_map
1636            .row_addrs()
1637            .unwrap()
1638            .map(u64::from)
1639            .collect();
1640        assert_eq!(
1641            actual_null_addrs, expected_null_addrs,
1642            "Null bitmap not remapped correctly"
1643        );
1644
1645        // Search for value 1 and verify remapped addresses
1646        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
1647        let result = reloaded_idx
1648            .search(&query, &NoOpMetricsCollector)
1649            .await
1650            .unwrap();
1651        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1652            let mut actual: Vec<u64> = row_ids
1653                .true_rows()
1654                .row_addrs()
1655                .unwrap()
1656                .map(u64::from)
1657                .collect();
1658            actual.sort();
1659            let expected: Vec<u64> = vec![
1660                RowAddress::new_from_parts(3, 2).into(),
1661                RowAddress::new_from_parts(3, 3).into(),
1662            ];
1663            assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
1664        }
1665
1666        // Search for value 2 and verify remapped addresses
1667        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
1668        let result = reloaded_idx
1669            .search(&query, &NoOpMetricsCollector)
1670            .await
1671            .unwrap();
1672        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1673            let mut actual: Vec<u64> = row_ids
1674                .true_rows()
1675                .row_addrs()
1676                .unwrap()
1677                .map(u64::from)
1678                .collect();
1679            actual.sort();
1680            let expected: Vec<u64> = vec![
1681                RowAddress::new_from_parts(3, 4).into(),
1682                RowAddress::new_from_parts(3, 5).into(),
1683            ];
1684            assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
1685        }
1686
1687        // Search for null values
1688        let query = SargableQuery::IsNull();
1689        let result = reloaded_idx
1690            .search(&query, &NoOpMetricsCollector)
1691            .await
1692            .unwrap();
1693        if let crate::scalar::SearchResult::Exact(row_ids) = result {
1694            let mut actual: Vec<u64> = row_ids
1695                .true_rows()
1696                .row_addrs()
1697                .unwrap()
1698                .map(u64::from)
1699                .collect();
1700            actual.sort();
1701            assert_eq!(
1702                actual, expected_null_addrs,
1703                "Null search results not correct"
1704            );
1705        }
1706    }
1707
1708    #[tokio::test]
1709    async fn test_bitmap_null_handling_in_queries() {
1710        // Test that bitmap index correctly returns null_list for queries
1711        let tmpdir = TempObjDir::default();
1712        let store = Arc::new(LanceIndexStore::new(
1713            Arc::new(ObjectStore::local()),
1714            tmpdir.clone(),
1715            Arc::new(LanceCache::no_cache()),
1716        ));
1717
1718        // Create test data: [0, 5, null]
1719        let batch = record_batch!(
1720            ("value", Int64, [Some(0), Some(5), None]),
1721            ("_rowid", UInt64, [0, 1, 2])
1722        )
1723        .unwrap();
1724        let schema = batch.schema();
1725        let stream = stream::once(async move { Ok(batch) });
1726        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1727
1728        // Train and write the bitmap index
1729        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1730            .await
1731            .unwrap();
1732
1733        let cache = LanceCache::with_capacity(1024 * 1024);
1734        let index = BitmapIndex::load(store.clone(), None, &cache)
1735            .await
1736            .unwrap();
1737
1738        // Test 1: Search for value 5 - should return allow=[1], null=[2]
1739        let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
1740        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1741
1742        match result {
1743            SearchResult::Exact(row_ids) => {
1744                let actual_rows: Vec<u64> = row_ids
1745                    .true_rows()
1746                    .row_addrs()
1747                    .unwrap()
1748                    .map(u64::from)
1749                    .collect();
1750                assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
1751
1752                let null_row_ids = row_ids.null_rows();
1753                // Check that null_row_ids contains row 2
1754                assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1755                let null_rows: Vec<u64> =
1756                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1757                assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1758            }
1759            _ => panic!("Expected Exact search result"),
1760        }
1761
1762        // Test 2: Search for null values - should return allow=[2], null=None
1763        let query = SargableQuery::IsNull();
1764        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1765
1766        match result {
1767            SearchResult::Exact(row_addrs) => {
1768                let actual_rows: Vec<u64> = row_addrs
1769                    .true_rows()
1770                    .row_addrs()
1771                    .unwrap()
1772                    .map(u64::from)
1773                    .collect();
1774                assert_eq!(
1775                    actual_rows,
1776                    vec![2],
1777                    "IsNull should find row 2 where value is null"
1778                );
1779
1780                let null_row_ids = row_addrs.null_rows();
1781                // When querying FOR nulls, null_row_ids should be None (nulls are the TRUE result)
1782                assert!(
1783                    null_row_ids.is_empty(),
1784                    "null_row_ids should be None for IsNull query"
1785                );
1786            }
1787            _ => panic!("Expected Exact search result"),
1788        }
1789
1790        // Test 3: Range query - should return matching rows and null_list
1791        let query = SargableQuery::Range(
1792            std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
1793            std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
1794        );
1795        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1796
1797        match result {
1798            SearchResult::Exact(row_addrs) => {
1799                let actual_rows: Vec<u64> = row_addrs
1800                    .true_rows()
1801                    .row_addrs()
1802                    .unwrap()
1803                    .map(u64::from)
1804                    .collect();
1805                assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
1806
1807                // Should report row 2 as null
1808                let null_row_ids = row_addrs.null_rows();
1809                assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
1810                let null_rows: Vec<u64> =
1811                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
1812                assert_eq!(null_rows, vec![2], "Should report row 2 as null");
1813            }
1814            _ => panic!("Expected Exact search result"),
1815        }
1816    }
1817}