Skip to main content

lance_index/scalar/
bitmap.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    cmp::Reverse,
7    collections::{BTreeMap, BinaryHeap, HashMap},
8    fmt::Debug,
9    ops::Bound,
10    sync::Arc,
11};
12
13use arrow::array::BinaryBuilder;
14use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array, new_null_array};
15use arrow_schema::{DataType, Field, Schema};
16use async_trait::async_trait;
17use bytes::Bytes;
18use datafusion::physical_plan::SendableRecordBatchStream;
19use datafusion_common::ScalarValue;
20use deepsize::DeepSizeOf;
21use futures::{StreamExt, TryStreamExt, stream};
22use lance_arrow::ipc::{
23    read_ipc_stream_single_at, read_len_prefixed_bytes_at, write_ipc_stream,
24    write_len_prefixed_bytes,
25};
26use lance_core::utils::mask::RowSetOps;
27use lance_core::{
28    Error, ROW_ID, Result,
29    cache::{CacheCodec, CacheCodecImpl, CacheKey, LanceCache, WeakLanceCache},
30    error::LanceOptionExt,
31    utils::{
32        mask::{NullableRowAddrSet, RowAddrTreeMap},
33        tokio::get_num_compute_intensive_cpus,
34    },
35};
36use lance_io::object_store::ObjectStore;
37use object_store::path::Path;
38use roaring::RoaringBitmap;
39use serde::{Deserialize, Serialize};
40use tracing::{instrument, warn};
41
42use super::{AnyQuery, IndexStore, ScalarIndex};
43use super::{
44    BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue,
45};
46use crate::pbold;
47use crate::{Index, IndexType, metrics::MetricsCollector};
48use crate::{
49    frag_reuse::FragReuseIndex,
50    progress::IndexBuildProgress,
51    scalar::{
52        CreatedIndex, UpdateCriteria,
53        expression::SargableQueryParser,
54        registry::{
55            ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
56            VALUE_COLUMN_NAME,
57        },
58    },
59};
60use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser};
61
62pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
63pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";
64const BITMAP_PART_LOOKUP_PREFIX: &str = "part_";
65const BITMAP_PART_LOOKUP_SUFFIX: &str = "_bitmap_page_lookup.lance";
66const EXPLICIT_SHARD_ID_TAG: u64 = 0;
67const IMPLICIT_FRAGMENT_ID_TAG: u64 = 1;
68
69const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom
70
71const MAX_ROWS_PER_CHUNK: usize = 2 * 1024;
72// Smaller than MAX_ROWS_PER_CHUNK to bound the per-cursor in-memory batch
73// footprint during a k-way merge (N cursors × chunk), while still amortising
74// I/O over a reasonable number of rows per read.
75const MERGE_ROWS_PER_CHUNK: usize = 512;
76
77const BITMAP_INDEX_VERSION: u32 = 0;
78
79// We only need to open a file reader if we need to load a bitmap. If all
80// bitmaps are cached we don't open it. If we do open it we should only open it once.
81#[derive(Clone)]
82struct LazyIndexReader {
83    index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
84    store: Arc<dyn IndexStore>,
85}
86
87impl std::fmt::Debug for LazyIndexReader {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("LazyIndexReader")
90            .field("store", &self.store)
91            .finish()
92    }
93}
94
95impl LazyIndexReader {
96    fn new(store: Arc<dyn IndexStore>) -> Self {
97        Self {
98            index_reader: Arc::new(tokio::sync::Mutex::new(None)),
99            store,
100        }
101    }
102
103    async fn get(&self) -> Result<Arc<dyn IndexReader>> {
104        let mut reader = self.index_reader.lock().await;
105        if reader.is_none() {
106            let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?;
107            *reader = Some(index_reader);
108        }
109        Ok(reader.as_ref().unwrap().clone())
110    }
111}
112
113/// A scalar index that stores a bitmap for each possible value
114///
115/// This index works best for low-cardinality columns, where the number of unique values is small.
116/// The bitmap stores a list of row ids where the value is present.
117#[derive(Clone, Debug)]
118pub struct BitmapIndex {
119    /// Maps each unique value to its bitmap location in the index file
120    /// The usize value is the row offset in the bitmap_page_lookup.lance file
121    /// for quickly locating the row and reading it out
122    index_map: BTreeMap<OrderableScalarValue, usize>,
123
124    null_map: Arc<RowAddrTreeMap>,
125
126    value_type: DataType,
127
128    store: Arc<dyn IndexStore>,
129
130    index_cache: WeakLanceCache,
131
132    frag_reuse_index: Option<Arc<FragReuseIndex>>,
133
134    lazy_reader: LazyIndexReader,
135}
136
137#[derive(Debug, Clone)]
138pub struct BitmapKey {
139    value: OrderableScalarValue,
140}
141
142impl CacheKey for BitmapKey {
143    type ValueType = RowAddrTreeMap;
144
145    fn key(&self) -> std::borrow::Cow<'_, str> {
146        format!("{}", self.value.0).into()
147    }
148
149    fn type_name() -> &'static str {
150        "Bitmap"
151    }
152
153    fn codec() -> Option<CacheCodec> {
154        Some(CacheCodec::from_impl::<RowAddrTreeMap>())
155    }
156}
157
158/// The serializable state of a [`BitmapIndex`].
159///
160/// `BitmapIndex` holds non-serializable infrastructure (an `IndexStore`, a
161/// cache handle, a lazy reader, a fragment-reuse index). `BitmapIndexState`
162/// captures just the data needed to rebuild it: the value→file-offset map,
163/// the null bitmap, and the value type.
164#[derive(Debug, Clone)]
165pub struct BitmapIndexState {
166    /// Value-to-row-offset lookup, encoded as an Arrow `RecordBatch` so we can
167    /// reuse the existing IPC utilities for zero-copy round trips.
168    ///
169    /// Schema: `keys: <value_type>`, `offsets: UInt64`. Iteration order of
170    /// `index_map` is preserved on serialize and the `BTreeMap` resorts the
171    /// entries on deserialize, so the wire form does not need to be sorted.
172    lookup_batch: RecordBatch,
173    /// Already-remapped null bitmap (remapping is applied during load, so the
174    /// cached state matches the in-memory representation).
175    null_map: Arc<RowAddrTreeMap>,
176    /// Cached separately from the schema for the empty-index case where the
177    /// `lookup_batch` is empty but we still need to remember the column type.
178    value_type: DataType,
179}
180
181impl DeepSizeOf for BitmapIndexState {
182    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
183        self.lookup_batch.get_array_memory_size() + self.null_map.deep_size_of_children(context)
184    }
185}
186
187impl BitmapIndexState {
188    pub(crate) fn from_index(index: &BitmapIndex) -> Result<Self> {
189        Ok(Self {
190            lookup_batch: build_lookup_batch(&index.index_map, &index.value_type)?,
191            null_map: index.null_map.clone(),
192            value_type: index.value_type.clone(),
193        })
194    }
195
196    pub(crate) fn into_bitmap_index(
197        self,
198        store: Arc<dyn IndexStore>,
199        index_cache: &LanceCache,
200        frag_reuse_index: Option<Arc<FragReuseIndex>>,
201    ) -> Result<Arc<BitmapIndex>> {
202        let index_map = parse_lookup_batch(&self.lookup_batch)?;
203        Ok(Arc::new(BitmapIndex::new(
204            index_map,
205            self.null_map,
206            self.value_type,
207            store,
208            WeakLanceCache::from(index_cache),
209            frag_reuse_index,
210        )))
211    }
212}
213
214fn build_lookup_batch(
215    index_map: &BTreeMap<OrderableScalarValue, usize>,
216    value_type: &DataType,
217) -> Result<RecordBatch> {
218    let keys = if index_map.is_empty() {
219        arrow_array::new_empty_array(value_type)
220    } else {
221        ScalarValue::iter_to_array(index_map.keys().map(|k| k.0.clone()))?
222    };
223    let offsets = Arc::new(UInt64Array::from_iter_values(
224        index_map.values().map(|v| *v as u64),
225    ));
226    let schema = Arc::new(Schema::new(vec![
227        Field::new("keys", value_type.clone(), true),
228        Field::new("offsets", DataType::UInt64, false),
229    ]));
230    Ok(RecordBatch::try_new(schema, vec![keys, offsets])?)
231}
232
233fn parse_lookup_batch(batch: &RecordBatch) -> Result<BTreeMap<OrderableScalarValue, usize>> {
234    let keys = batch.column(0);
235    let offsets = batch
236        .column(1)
237        .as_any()
238        .downcast_ref::<UInt64Array>()
239        .ok_or_else(|| {
240            Error::internal("BitmapIndexState: expected UInt64 offsets column".to_string())
241        })?;
242    let mut index_map = BTreeMap::new();
243    for idx in 0..batch.num_rows() {
244        let value = OrderableScalarValue(ScalarValue::try_from_array(keys, idx)?);
245        index_map.insert(value, offsets.value(idx) as usize);
246    }
247    Ok(index_map)
248}
249
250impl CacheCodecImpl for BitmapIndexState {
251    /// Wire format:
252    /// ```text
253    /// [u64 null_map_len][null_map bytes]
254    /// [arrow IPC stream: (keys: <value_type>, offsets: UInt64)]
255    /// ```
256    /// The value type is recovered from the IPC stream schema.
257    fn serialize(&self, writer: &mut dyn std::io::Write) -> Result<()> {
258        let mut null_bytes = Vec::with_capacity(self.null_map.serialized_size());
259        self.null_map.serialize_into(&mut null_bytes)?;
260        write_len_prefixed_bytes(writer, &null_bytes)?;
261        write_ipc_stream(&self.lookup_batch, writer)?;
262        Ok(())
263    }
264
265    fn deserialize(data: &bytes::Bytes) -> Result<Self> {
266        let mut offset = 0;
267        let null_bytes = read_len_prefixed_bytes_at(data, &mut offset)?;
268        let null_map = Arc::new(RowAddrTreeMap::deserialize_from(null_bytes.as_ref())?);
269        let lookup_batch = read_ipc_stream_single_at(data, &mut offset)?;
270        let value_type = lookup_batch.schema().field(0).data_type().clone();
271        Ok(Self {
272            lookup_batch,
273            null_map,
274            value_type,
275        })
276    }
277}
278
279/// Cache key for a [`BitmapIndexState`]. The cache is already namespaced
280/// per-index by the caller, so a constant key suffices.
281struct BitmapIndexStateKey;
282
283impl CacheKey for BitmapIndexStateKey {
284    type ValueType = BitmapIndexState;
285
286    fn key(&self) -> std::borrow::Cow<'_, str> {
287        "state".into()
288    }
289
290    fn type_name() -> &'static str {
291        "BitmapIndexState"
292    }
293
294    fn codec() -> Option<CacheCodec> {
295        Some(CacheCodec::from_impl::<BitmapIndexState>())
296    }
297}
298
299impl BitmapIndex {
300    fn new(
301        index_map: BTreeMap<OrderableScalarValue, usize>,
302        null_map: Arc<RowAddrTreeMap>,
303        value_type: DataType,
304        store: Arc<dyn IndexStore>,
305        index_cache: WeakLanceCache,
306        frag_reuse_index: Option<Arc<FragReuseIndex>>,
307    ) -> Self {
308        let lazy_reader = LazyIndexReader::new(store.clone());
309        Self {
310            index_map,
311            null_map,
312            value_type,
313            store,
314            index_cache,
315            frag_reuse_index,
316            lazy_reader,
317        }
318    }
319
320    pub(crate) async fn load(
321        store: Arc<dyn IndexStore>,
322        frag_reuse_index: Option<Arc<FragReuseIndex>>,
323        index_cache: &LanceCache,
324    ) -> Result<Arc<Self>> {
325        let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
326        let total_rows = page_lookup_file.num_rows();
327
328        if total_rows == 0 {
329            let schema = page_lookup_file.schema();
330            let data_type = schema.fields[0].data_type();
331            return Ok(Arc::new(Self::new(
332                BTreeMap::new(),
333                Arc::new(RowAddrTreeMap::default()),
334                data_type,
335                store,
336                WeakLanceCache::from(index_cache),
337                frag_reuse_index,
338            )));
339        }
340
341        let mut index_map: BTreeMap<OrderableScalarValue, usize> = BTreeMap::new();
342        let mut null_map = Arc::new(RowAddrTreeMap::default());
343        let mut null_location: Option<usize> = None;
344        let value_type = page_lookup_file.schema().fields[0].data_type();
345
346        // Stream keys in bounded batches to avoid loading the entire keys
347        // column into memory at once.
348        let mut keys_stream = page_lookup_file
349            .read_range_stream(0..total_rows, Some(&["keys"]))
350            .await?;
351        let mut row_offset: usize = 0;
352        while let Some(keys_batch) = keys_stream.try_next().await? {
353            let dict_keys = keys_batch.column(0);
354            for idx in 0..keys_batch.num_rows() {
355                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
356                if key.0.is_null() {
357                    null_location = Some(row_offset);
358                } else {
359                    index_map.insert(key, row_offset);
360                }
361                row_offset += 1;
362            }
363        }
364
365        if let Some(null_loc) = null_location {
366            let batch = page_lookup_file
367                .read_range(null_loc..null_loc + 1, Some(&["bitmaps"]))
368                .await?;
369
370            let binary_bitmaps = batch
371                .column(0)
372                .as_any()
373                .downcast_ref::<BinaryArray>()
374                .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
375            let bitmap_bytes = binary_bitmaps.value(0);
376            let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
377
378            // Apply fragment remapping if needed
379            if let Some(fri) = &frag_reuse_index {
380                bitmap = fri.remap_row_addrs_tree_map(&bitmap);
381            }
382
383            null_map = Arc::new(bitmap);
384        }
385
386        Ok(Arc::new(Self::new(
387            index_map,
388            null_map,
389            value_type,
390            store,
391            WeakLanceCache::from(index_cache),
392            frag_reuse_index,
393        )))
394    }
395
396    async fn load_bitmap(
397        &self,
398        key: &OrderableScalarValue,
399        metrics: Option<&dyn MetricsCollector>,
400    ) -> Result<Arc<RowAddrTreeMap>> {
401        if key.0.is_null() {
402            return Ok(self.null_map.clone());
403        }
404
405        let cache_key = BitmapKey { value: key.clone() };
406
407        if let Some(cached) = self.index_cache.get_with_key(&cache_key).await {
408            return Ok(cached);
409        }
410
411        // Record that we're loading a partition from disk
412        if let Some(metrics) = metrics {
413            metrics.record_part_load();
414        }
415
416        let row_offset = match self.index_map.get(key) {
417            Some(loc) => *loc,
418            None => return Ok(Arc::new(RowAddrTreeMap::default())),
419        };
420
421        let page_lookup_file = self.lazy_reader.get().await?;
422        let batch = page_lookup_file
423            .read_range(row_offset..row_offset + 1, Some(&["bitmaps"]))
424            .await?;
425
426        let binary_bitmaps = batch
427            .column(0)
428            .as_any()
429            .downcast_ref::<BinaryArray>()
430            .ok_or_else(|| Error::internal("Invalid bitmap column type".to_string()))?;
431        let bitmap_bytes = binary_bitmaps.value(0); // First (and only) row
432        let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
433
434        if let Some(fri) = &self.frag_reuse_index {
435            bitmap = fri.remap_row_addrs_tree_map(&bitmap);
436        }
437
438        self.index_cache
439            .insert_with_key(&cache_key, Arc::new(bitmap.clone()))
440            .await;
441
442        Ok(Arc::new(bitmap))
443    }
444
445    pub(crate) fn value_type(&self) -> &DataType {
446        &self.value_type
447    }
448
449    /// Loads the current bitmap index into an in-memory value-to-row-id map.
450    pub(crate) async fn load_bitmap_index_state(
451        &self,
452    ) -> Result<HashMap<ScalarValue, RowAddrTreeMap>> {
453        let mut state = HashMap::new();
454
455        for key in self.index_map.keys() {
456            let bitmap = self.load_bitmap(key, None).await?;
457            state.insert(key.0.clone(), (*bitmap).clone());
458        }
459
460        if !self.null_map.is_empty() {
461            let existing_null = new_null_array(&self.value_type, 1);
462            let existing_null = ScalarValue::try_from_array(existing_null.as_ref(), 0)?;
463            state.insert(existing_null, (*self.null_map).clone());
464        }
465
466        Ok(state)
467    }
468}
469
470impl DeepSizeOf for BitmapIndex {
471    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
472        let mut total_size = 0;
473
474        total_size += self.index_map.deep_size_of_children(context);
475        total_size += self.store.deep_size_of_children(context);
476
477        total_size
478    }
479}
480
481#[derive(Serialize)]
482struct BitmapStatistics {
483    num_bitmaps: usize,
484}
485
486#[derive(Debug, Clone, Default, Serialize, Deserialize)]
487pub struct BitmapParameters {
488    /// Optional shard identifier for distributed bitmap builds spanning
489    /// multiple fragments.
490    pub shard_id: Option<u32>,
491}
492
493struct BitmapTrainingRequest {
494    parameters: BitmapParameters,
495    criteria: TrainingCriteria,
496}
497
498impl BitmapTrainingRequest {
499    fn new(parameters: BitmapParameters) -> Self {
500        Self {
501            parameters,
502            criteria: TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
503        }
504    }
505}
506
507impl TrainingRequest for BitmapTrainingRequest {
508    fn as_any(&self) -> &dyn std::any::Any {
509        self
510    }
511
512    fn criteria(&self) -> &TrainingCriteria {
513        &self.criteria
514    }
515}
516
517#[async_trait]
518impl Index for BitmapIndex {
519    fn as_any(&self) -> &dyn Any {
520        self
521    }
522
523    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
524        self
525    }
526
527    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
528        Err(Error::not_supported_source(
529            "BitmapIndex is not a vector index".into(),
530        ))
531    }
532
533    async fn prewarm(&self) -> Result<()> {
534        let page_lookup_file = self.lazy_reader.get().await?;
535        let total_rows = page_lookup_file.num_rows();
536
537        if total_rows == 0 {
538            return Ok(());
539        }
540
541        for start_row in (0..total_rows).step_by(MAX_ROWS_PER_CHUNK) {
542            let end_row = (start_row + MAX_ROWS_PER_CHUNK).min(total_rows);
543            let chunk = page_lookup_file
544                .read_range(start_row..end_row, None)
545                .await?;
546
547            if chunk.num_rows() == 0 {
548                continue;
549            }
550
551            let dict_keys = chunk.column(0);
552            let binary_bitmaps = chunk.column(1);
553            let bitmap_binary_array = binary_bitmaps
554                .as_any()
555                .downcast_ref::<BinaryArray>()
556                .unwrap();
557
558            for idx in 0..chunk.num_rows() {
559                let key = OrderableScalarValue(ScalarValue::try_from_array(dict_keys, idx)?);
560
561                if key.0.is_null() {
562                    continue;
563                }
564
565                let bitmap_bytes = bitmap_binary_array.value(idx);
566                let mut bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap();
567
568                if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
569                    bitmap = frag_reuse_index_ref.remap_row_addrs_tree_map(&bitmap);
570                }
571
572                let cache_key = BitmapKey { value: key };
573                self.index_cache
574                    .insert_with_key(&cache_key, Arc::new(bitmap))
575                    .await;
576            }
577        }
578
579        Ok(())
580    }
581
582    fn index_type(&self) -> IndexType {
583        IndexType::Bitmap
584    }
585
586    fn statistics(&self) -> Result<serde_json::Value> {
587        let stats = BitmapStatistics {
588            num_bitmaps: self.index_map.len() + if !self.null_map.is_empty() { 1 } else { 0 },
589        };
590        serde_json::to_value(stats).map_err(|e| {
591            Error::internal(format!(
592                "failed to serialize bitmap index statistics: {}",
593                e
594            ))
595        })
596    }
597
598    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
599        unimplemented!()
600    }
601}
602
603#[async_trait]
604impl ScalarIndex for BitmapIndex {
605    #[instrument(name = "bitmap_search", level = "debug", skip_all)]
606    async fn search(
607        &self,
608        query: &dyn AnyQuery,
609        metrics: &dyn MetricsCollector,
610    ) -> Result<SearchResult> {
611        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
612
613        let (row_ids, null_row_ids) = match query {
614            SargableQuery::Equals(val) => {
615                metrics.record_comparisons(1);
616                if val.is_null() {
617                    // Querying FOR nulls - they are the TRUE result, not NULL result
618                    ((*self.null_map).clone(), None)
619                } else {
620                    let key = OrderableScalarValue(val.clone());
621                    let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
622                    let null_rows = if !self.null_map.is_empty() {
623                        Some((*self.null_map).clone())
624                    } else {
625                        None
626                    };
627                    ((*bitmap).clone(), null_rows)
628                }
629            }
630            SargableQuery::Range(start, end) => {
631                let range_start = match start {
632                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
633                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
634                    Bound::Unbounded => Bound::Unbounded,
635                };
636
637                let range_end = match end {
638                    Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
639                    Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
640                    Bound::Unbounded => Bound::Unbounded,
641                };
642
643                // Empty range if lower > upper, or if any bound is excluded and lower >= upper.
644                let empty_range = match (&range_start, &range_end) {
645                    (Bound::Included(lower), Bound::Included(upper)) => lower > upper,
646                    (Bound::Included(lower), Bound::Excluded(upper))
647                    | (Bound::Excluded(lower), Bound::Included(upper))
648                    | (Bound::Excluded(lower), Bound::Excluded(upper)) => lower >= upper,
649                    _ => false,
650                };
651
652                let keys: Vec<_> = if empty_range {
653                    Vec::new()
654                } else {
655                    self.index_map
656                        .range((range_start, range_end))
657                        .map(|(k, _v)| k.clone())
658                        .collect()
659                };
660
661                metrics.record_comparisons(keys.len());
662
663                let result = if keys.is_empty() {
664                    RowAddrTreeMap::default()
665                } else {
666                    let bitmaps: Vec<_> = stream::iter(
667                        keys.into_iter()
668                            .map(|key| async move { self.load_bitmap(&key, None).await }),
669                    )
670                    .buffer_unordered(get_num_compute_intensive_cpus())
671                    .try_collect()
672                    .await?;
673
674                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
675                    RowAddrTreeMap::union_all(&bitmap_refs)
676                };
677
678                let null_rows = if !self.null_map.is_empty() {
679                    Some((*self.null_map).clone())
680                } else {
681                    None
682                };
683                (result, null_rows)
684            }
685            SargableQuery::IsIn(values) => {
686                metrics.record_comparisons(values.len());
687
688                // Collect keys that exist in the index, tracking if we need nulls
689                let mut has_null = false;
690                let keys: Vec<_> = values
691                    .iter()
692                    .filter_map(|val| {
693                        if val.is_null() {
694                            has_null = true;
695                            None
696                        } else {
697                            let key = OrderableScalarValue(val.clone());
698                            if self.index_map.contains_key(&key) {
699                                Some(key)
700                            } else {
701                                None
702                            }
703                        }
704                    })
705                    .collect();
706
707                // Load bitmaps in parallel
708                let mut bitmaps: Vec<_> = stream::iter(
709                    keys.into_iter()
710                        .map(|key| async move { self.load_bitmap(&key, None).await }),
711                )
712                .buffer_unordered(get_num_compute_intensive_cpus())
713                .try_collect()
714                .await?;
715
716                // Add null bitmap if needed
717                if has_null && !self.null_map.is_empty() {
718                    bitmaps.push(self.null_map.clone());
719                }
720
721                let result = if bitmaps.is_empty() {
722                    RowAddrTreeMap::default()
723                } else {
724                    // Convert Arc<RowAddrTreeMap> to &RowAddrTreeMap for union_all
725                    let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
726                    RowAddrTreeMap::union_all(&bitmap_refs)
727                };
728
729                // If the query explicitly includes null, then nulls are TRUE (not NULL)
730                // Otherwise, nulls remain NULL (unknown)
731                let null_rows = if !has_null && !self.null_map.is_empty() {
732                    Some((*self.null_map).clone())
733                } else {
734                    None
735                };
736                (result, null_rows)
737            }
738            SargableQuery::IsNull() => {
739                metrics.record_comparisons(1);
740                // Querying FOR nulls - they are the TRUE result, not NULL result
741                ((*self.null_map).clone(), None)
742            }
743            SargableQuery::FullTextSearch(_) => {
744                return Err(Error::not_supported_source(
745                    "full text search is not supported for bitmap indexes".into(),
746                ));
747            }
748            SargableQuery::LikePrefix(_) => {
749                return Err(Error::not_supported_source(
750                    "LIKE prefix queries are not supported for bitmap indexes".into(),
751                ));
752            }
753        };
754
755        let selection = NullableRowAddrSet::new(row_ids, null_row_ids.unwrap_or_default());
756        Ok(SearchResult::Exact(selection))
757    }
758
759    fn can_remap(&self) -> bool {
760        true
761    }
762
763    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
764    async fn remap(
765        &self,
766        mapping: &HashMap<u64, Option<u64>>,
767        dest_store: &dyn IndexStore,
768    ) -> Result<CreatedIndex> {
769        let state = self.load_bitmap_index_state().await?;
770        let remapped_state = BitmapIndexPlugin::remap_bitmap_state(state, mapping);
771        BitmapIndexPlugin::write_bitmap_index(remapped_state, dest_store, &self.value_type).await?;
772
773        Ok(CreatedIndex {
774            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
775                .unwrap(),
776            index_version: BITMAP_INDEX_VERSION,
777            files: Some(dest_store.list_files_with_sizes().await?),
778        })
779    }
780
781    /// Add the new data into the index, creating an updated version of the index in `dest_store`
782    async fn update(
783        &self,
784        new_data: SendableRecordBatchStream,
785        dest_store: &dyn IndexStore,
786        _old_data_filter: Option<super::OldIndexDataFilter>,
787    ) -> Result<CreatedIndex> {
788        BitmapIndexPlugin::streaming_build_and_write(
789            new_data,
790            Some(self),
791            dest_store,
792            BITMAP_LOOKUP_NAME,
793        )
794        .await?;
795
796        Ok(CreatedIndex {
797            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
798                .unwrap(),
799            index_version: BITMAP_INDEX_VERSION,
800            files: Some(dest_store.list_files_with_sizes().await?),
801        })
802    }
803
804    fn update_criteria(&self) -> UpdateCriteria {
805        UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
806    }
807
808    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
809        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap))
810    }
811}
812
813/// Buffers serialized (key, bitmap) pairs and flushes them as record batches
814/// to the index file, respecting the MAX_BITMAP_ARRAY_LENGTH limit.
815struct BitmapBatchWriter {
816    file: Box<dyn super::IndexWriter>,
817    keys: Vec<ScalarValue>,
818    serialized: Vec<Vec<u8>>,
819    bytes: usize,
820    num_bitmaps: usize,
821}
822
823impl BitmapBatchWriter {
824    fn new(file: Box<dyn super::IndexWriter>) -> Self {
825        Self {
826            file,
827            keys: Vec::new(),
828            serialized: Vec::new(),
829            bytes: 0,
830            num_bitmaps: 0,
831        }
832    }
833
834    /// Serialize and buffer a single (key, bitmap) pair, flushing the current
835    /// batch to disk if adding it would exceed MAX_BITMAP_ARRAY_LENGTH.
836    async fn emit(&mut self, key: ScalarValue, bitmap: &RowAddrTreeMap) -> Result<()> {
837        let mut buf = Vec::new();
838        bitmap.serialize_into(&mut buf).unwrap();
839        let size = buf.len();
840
841        if self.bytes + size > MAX_BITMAP_ARRAY_LENGTH {
842            self.flush().await?;
843        }
844
845        self.keys.push(key);
846        self.serialized.push(buf);
847        self.bytes += size;
848        self.num_bitmaps += 1;
849        Ok(())
850    }
851
852    /// Write the current batch to disk.
853    async fn flush(&mut self) -> Result<()> {
854        if self.keys.is_empty() {
855            return Ok(());
856        }
857        let keys_array =
858            ScalarValue::iter_to_array(self.keys.drain(..).collect::<Vec<_>>().into_iter())
859                .unwrap();
860        let total_size: usize = self.serialized.iter().map(|b| b.len()).sum();
861        let mut binary_builder = BinaryBuilder::with_capacity(self.serialized.len(), total_size);
862        for b in self.serialized.drain(..) {
863            binary_builder.append_value(&b);
864        }
865        let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
866        let batch = BitmapIndexPlugin::get_batch_from_arrays(keys_array, bitmaps_array)?;
867        self.file.write_record_batch(batch).await?;
868        self.bytes = 0;
869        Ok(())
870    }
871
872    /// Flush any remaining data, write index statistics, and finalize the file.
873    async fn finish(mut self) -> Result<()> {
874        self.flush().await?;
875        let stats_json = serde_json::to_string(&BitmapStatistics {
876            num_bitmaps: self.num_bitmaps,
877        })
878        .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
879        let mut metadata = HashMap::new();
880        metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
881        self.file.finish_with_metadata(metadata).await?;
882        Ok(())
883    }
884}
885
886fn bitmap_shard_file_name(partition_id: u64) -> String {
887    format!("{BITMAP_PART_LOOKUP_PREFIX}{partition_id}{BITMAP_PART_LOOKUP_SUFFIX}")
888}
889
890fn tagged_bitmap_partition_id(id: u32, tag: u64) -> u64 {
891    ((id as u64) << 32) | tag
892}
893
894fn bitmap_shard_partition_id(fragment_ids: &[u32], shard_id: Option<u32>) -> Result<u64> {
895    if fragment_ids.is_empty() {
896        return Err(Error::invalid_input(
897            "Bitmap shard build requires at least one fragment id".to_string(),
898        ));
899    }
900
901    if let Some(shard_id) = shard_id {
902        return Ok(tagged_bitmap_partition_id(shard_id, EXPLICIT_SHARD_ID_TAG));
903    }
904
905    let [fragment_id] = fragment_ids else {
906        return Err(Error::invalid_input(format!(
907            "Bitmap distributed build over multiple fragments requires an explicit shard_id. \
908             Received {} fragment ids: {:?}. Please assign mutually exclusive shard_id values \
909             to disjoint fragment groups.",
910            fragment_ids.len(),
911            fragment_ids
912        )));
913    };
914
915    Ok(tagged_bitmap_partition_id(
916        *fragment_id,
917        IMPLICIT_FRAGMENT_ID_TAG,
918    ))
919}
920
921fn extract_bitmap_shard_id(filename: &str) -> Result<u64> {
922    let partition_id = filename
923        .strip_prefix(BITMAP_PART_LOOKUP_PREFIX)
924        .and_then(|name| name.strip_suffix(BITMAP_PART_LOOKUP_SUFFIX))
925        .ok_or_else(|| {
926            Error::internal(format!("Invalid bitmap shard file name format: {filename}"))
927        })?;
928    partition_id.parse::<u64>().map_err(|_| {
929        Error::internal(format!(
930            "Failed to parse bitmap partition id from file name: {filename}"
931        ))
932    })
933}
934
935fn deserialize_bitmap(bitmap_bytes: &[u8], file_name: &str) -> Result<RowAddrTreeMap> {
936    RowAddrTreeMap::deserialize_from(bitmap_bytes).map_err(|error| {
937        Error::corrupt_file(
938            Path::from(file_name),
939            format!("Failed to deserialize bitmap bytes: {error}"),
940        )
941    })
942}
943
944async fn new_bitmap_batch_writer(
945    index_store: &dyn IndexStore,
946    file_name: &str,
947    value_type: &DataType,
948) -> Result<BitmapBatchWriter> {
949    let schema = Arc::new(Schema::new(vec![
950        Field::new("keys", value_type.clone(), true),
951        Field::new("bitmaps", DataType::Binary, true),
952    ]));
953    let index_file = index_store.new_index_file(file_name, schema).await?;
954    Ok(BitmapBatchWriter::new(index_file))
955}
956
957#[derive(Clone, Debug, Eq, PartialEq)]
958struct BitmapHeapItem {
959    key: OrderableScalarValue,
960    shard_idx: usize,
961}
962
963impl Ord for BitmapHeapItem {
964    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
965        self.key
966            .cmp(&other.key)
967            .then_with(|| self.shard_idx.cmp(&other.shard_idx))
968    }
969}
970
971impl PartialOrd for BitmapHeapItem {
972    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
973        Some(self.cmp(other))
974    }
975}
976
977struct BitmapShardCursor {
978    file_name: String,
979    reader: Arc<dyn IndexReader>,
980    total_rows: usize,
981    next_row_offset: usize,
982    batch: Option<RecordBatch>,
983    batch_row_idx: usize,
984}
985
986impl BitmapShardCursor {
987    async fn try_new(file_name: String, reader: Arc<dyn IndexReader>) -> Result<Option<Self>> {
988        let total_rows = reader.num_rows();
989        if total_rows == 0 {
990            return Ok(None);
991        }
992
993        let mut cursor = Self {
994            file_name,
995            reader,
996            total_rows,
997            next_row_offset: 0,
998            batch: None,
999            batch_row_idx: 0,
1000        };
1001        if cursor.advance().await? {
1002            Ok(Some(cursor))
1003        } else {
1004            Ok(None)
1005        }
1006    }
1007
1008    fn peek_key(&self) -> Result<OrderableScalarValue> {
1009        let batch = self.batch.as_ref().ok_or_else(|| {
1010            Error::internal(format!(
1011                "Bitmap shard {} has no active batch",
1012                self.file_name
1013            ))
1014        })?;
1015        let key = ScalarValue::try_from_array(batch.column(0), self.batch_row_idx)?;
1016        Ok(OrderableScalarValue(key))
1017    }
1018
1019    fn take_current(&mut self) -> Result<(ScalarValue, RowAddrTreeMap)> {
1020        let batch = self.batch.as_ref().ok_or_else(|| {
1021            Error::internal(format!(
1022                "Bitmap shard {} has no active batch",
1023                self.file_name
1024            ))
1025        })?;
1026        let keys = batch.column(0);
1027        let binary_bitmaps = batch
1028            .column(1)
1029            .as_any()
1030            .downcast_ref::<BinaryArray>()
1031            .ok_or_else(|| {
1032                Error::corrupt_file(
1033                    Path::from(self.file_name.as_str()),
1034                    "Bitmap shard batch has non-binary bitmap column".to_string(),
1035                )
1036            })?;
1037        let key = ScalarValue::try_from_array(keys, self.batch_row_idx)?;
1038        let bitmap = deserialize_bitmap(binary_bitmaps.value(self.batch_row_idx), &self.file_name)?;
1039        self.batch_row_idx += 1;
1040        Ok((key, bitmap))
1041    }
1042
1043    async fn advance(&mut self) -> Result<bool> {
1044        loop {
1045            if let Some(batch) = &self.batch
1046                && self.batch_row_idx < batch.num_rows()
1047            {
1048                return Ok(true);
1049            }
1050
1051            if self.next_row_offset >= self.total_rows {
1052                self.batch = None;
1053                return Ok(false);
1054            }
1055
1056            let end_row = (self.next_row_offset + MERGE_ROWS_PER_CHUNK).min(self.total_rows);
1057            let batch = self
1058                .reader
1059                .read_range(self.next_row_offset..end_row, None)
1060                .await?;
1061            self.next_row_offset = end_row;
1062            self.batch = Some(batch);
1063            self.batch_row_idx = 0;
1064        }
1065    }
1066}
1067
1068async fn advance_cursor_and_push(
1069    cursors: &mut [BitmapShardCursor],
1070    heap: &mut BinaryHeap<Reverse<BitmapHeapItem>>,
1071    shard_idx: usize,
1072) -> Result<()> {
1073    if cursors[shard_idx].advance().await? {
1074        heap.push(Reverse(BitmapHeapItem {
1075            key: cursors[shard_idx].peek_key()?,
1076            shard_idx,
1077        }));
1078    }
1079    Ok(())
1080}
1081
1082async fn drain_same_key_bitmaps(
1083    cursors: &mut [BitmapShardCursor],
1084    heap: &mut BinaryHeap<Reverse<BitmapHeapItem>>,
1085    item: BitmapHeapItem,
1086) -> Result<(ScalarValue, RowAddrTreeMap)> {
1087    let (key, mut merged_bitmap) = cursors[item.shard_idx].take_current()?;
1088    let merged_key = OrderableScalarValue(key);
1089    advance_cursor_and_push(cursors, heap, item.shard_idx).await?;
1090
1091    loop {
1092        let Some(Reverse(next_item)) = heap.peek() else {
1093            break;
1094        };
1095        if next_item.key != merged_key {
1096            break;
1097        }
1098
1099        let shard_idx = next_item.shard_idx;
1100        let _ = heap.pop();
1101        let (_, bitmap) = cursors[shard_idx].take_current()?;
1102        merged_bitmap |= &bitmap;
1103        advance_cursor_and_push(cursors, heap, shard_idx).await?;
1104    }
1105
1106    Ok((merged_key.0, merged_bitmap))
1107}
1108
1109async fn list_bitmap_shard_files(
1110    object_store: &ObjectStore,
1111    index_dir: &Path,
1112    progress: &dyn IndexBuildProgress,
1113) -> Result<Vec<String>> {
1114    let mut shard_files = Vec::new();
1115    let mut list_stream = object_store.list(Some(index_dir.clone()));
1116    while let Some(item) = list_stream.next().await {
1117        match item {
1118            Ok(meta) => {
1119                let file_name = meta.location.filename().unwrap_or_default();
1120                if file_name.starts_with(BITMAP_PART_LOOKUP_PREFIX)
1121                    && file_name.ends_with(BITMAP_PART_LOOKUP_SUFFIX)
1122                {
1123                    shard_files.push(file_name.to_string());
1124                    progress
1125                        .stage_progress("scan_bitmap_shards", shard_files.len() as u64)
1126                        .await?;
1127                }
1128            }
1129            Err(err) => {
1130                return Err(Error::io(format!(
1131                    "Failed to list bitmap shard files in {}: {err}",
1132                    index_dir
1133                )));
1134            }
1135        }
1136    }
1137    let mut shard_files = shard_files
1138        .into_iter()
1139        .map(|file_name| extract_bitmap_shard_id(&file_name).map(|shard_id| (shard_id, file_name)))
1140        .collect::<Result<Vec<_>>>()?;
1141    shard_files.sort_unstable_by_key(|(shard_id, _)| *shard_id);
1142    let shard_files = shard_files
1143        .into_iter()
1144        .map(|(_, file_name)| file_name)
1145        .collect::<Vec<_>>();
1146    if shard_files.is_empty() {
1147        return Err(Error::invalid_input(format!(
1148            "No bitmap shard files found in index directory: {}; \
1149             call build_index for each fragment before calling merge_index_metadata",
1150            index_dir
1151        )));
1152    }
1153    Ok(shard_files)
1154}
1155
1156async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_files: &[String]) {
1157    for file_name in shard_files {
1158        if let Err(error) = store.delete_index_file(file_name).await {
1159            warn!(
1160                "Failed to delete bitmap shard file '{}': {}. \
1161                 This does not affect the merged bitmap index, but the shard file \
1162                 may need manual cleanup.",
1163                file_name, error
1164            );
1165        }
1166    }
1167}
1168
1169#[derive(Debug, Default)]
1170pub struct BitmapIndexPlugin;
1171
1172impl BitmapIndexPlugin {
1173    fn get_batch_from_arrays(
1174        keys: Arc<dyn Array>,
1175        binary_bitmaps: Arc<dyn Array>,
1176    ) -> Result<RecordBatch> {
1177        let schema = Arc::new(Schema::new(vec![
1178            Field::new("keys", keys.data_type().clone(), true),
1179            Field::new("bitmaps", binary_bitmaps.data_type().clone(), true),
1180        ]));
1181
1182        let columns = vec![keys, binary_bitmaps];
1183
1184        Ok(RecordBatch::try_new(schema, columns)?)
1185    }
1186
1187    async fn write_bitmap_index(
1188        state: HashMap<ScalarValue, RowAddrTreeMap>,
1189        index_store: &dyn IndexStore,
1190        value_type: &DataType,
1191    ) -> Result<()> {
1192        Self::write_bitmap_index_with_extras(
1193            state,
1194            index_store,
1195            value_type,
1196            HashMap::new(),
1197            Vec::new(),
1198        )
1199        .await
1200    }
1201
1202    /// Writes a bitmap index and attaches extra metadata and global buffers.
1203    pub(crate) async fn write_bitmap_index_with_extras(
1204        state: HashMap<ScalarValue, RowAddrTreeMap>,
1205        index_store: &dyn IndexStore,
1206        value_type: &DataType,
1207        mut metadata: HashMap<String, String>,
1208        global_buffers: Vec<(String, Bytes)>,
1209    ) -> Result<()> {
1210        let num_bitmaps = state.len();
1211        let schema = Arc::new(Schema::new(vec![
1212            Field::new("keys", value_type.clone(), true),
1213            Field::new("bitmaps", DataType::Binary, true),
1214        ]));
1215
1216        let mut bitmap_index_file = index_store
1217            .new_index_file(BITMAP_LOOKUP_NAME, schema)
1218            .await?;
1219
1220        for (metadata_key, data) in global_buffers {
1221            let buffer_idx = bitmap_index_file.add_global_buffer(data).await?;
1222            metadata.insert(metadata_key, buffer_idx.to_string());
1223        }
1224
1225        let mut cur_keys = Vec::new();
1226        let mut cur_bitmaps = Vec::new();
1227        let mut cur_bytes = 0;
1228
1229        for (key, bitmap) in state.into_iter() {
1230            let mut bytes = Vec::new();
1231            bitmap.serialize_into(&mut bytes).unwrap();
1232            let bitmap_size = bytes.len();
1233
1234            if cur_bytes + bitmap_size > MAX_BITMAP_ARRAY_LENGTH {
1235                let keys_array = ScalarValue::iter_to_array(cur_keys.clone().into_iter()).unwrap();
1236                let mut binary_builder = BinaryBuilder::new();
1237                for b in &cur_bitmaps {
1238                    binary_builder.append_value(b);
1239                }
1240                let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
1241
1242                let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
1243                bitmap_index_file.write_record_batch(record_batch).await?;
1244
1245                cur_keys.clear();
1246                cur_bitmaps.clear();
1247                cur_bytes = 0;
1248            }
1249
1250            cur_keys.push(key);
1251            cur_bitmaps.push(bytes);
1252            cur_bytes += bitmap_size;
1253        }
1254
1255        // Flush any remaining
1256        if !cur_keys.is_empty() {
1257            let keys_array = ScalarValue::iter_to_array(cur_keys).unwrap();
1258            let mut binary_builder = BinaryBuilder::new();
1259            for b in &cur_bitmaps {
1260                binary_builder.append_value(b);
1261            }
1262            let bitmaps_array = Arc::new(binary_builder.finish()) as Arc<dyn Array>;
1263
1264            let record_batch = Self::get_batch_from_arrays(keys_array, bitmaps_array)?;
1265            bitmap_index_file.write_record_batch(record_batch).await?;
1266        }
1267
1268        // Finish file with metadata that allows lightweight statistics reads
1269        let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps })
1270            .map_err(|e| Error::internal(format!("failed to serialize bitmap statistics: {e}")))?;
1271        metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);
1272
1273        bitmap_index_file.finish_with_metadata(metadata).await?;
1274
1275        Ok(())
1276    }
1277
1278    /// Builds bitmap index state from a `(value, row_id)` stream without writing it.
1279    pub(crate) async fn build_bitmap_index_state(
1280        mut data_source: SendableRecordBatchStream,
1281        mut state: HashMap<ScalarValue, RowAddrTreeMap>,
1282    ) -> Result<(HashMap<ScalarValue, RowAddrTreeMap>, DataType)> {
1283        let value_type = data_source.schema().field(0).data_type().clone();
1284        while let Some(batch) = data_source.try_next().await? {
1285            let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1286            let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
1287            debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
1288
1289            let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
1290
1291            for i in 0..values.len() {
1292                let row_id = row_id_column.value(i);
1293                let key = ScalarValue::try_from_array(values.as_ref(), i)?;
1294                state.entry(key.clone()).or_default().insert(row_id);
1295            }
1296        }
1297
1298        Ok((state, value_type))
1299    }
1300
1301    pub async fn train_bitmap_index(
1302        data: SendableRecordBatchStream,
1303        index_store: &dyn IndexStore,
1304    ) -> Result<()> {
1305        Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await
1306    }
1307
1308    async fn train_bitmap_shard(
1309        data: SendableRecordBatchStream,
1310        index_store: &dyn IndexStore,
1311        fragment_ids: &[u32],
1312        shard_id: Option<u32>,
1313        progress: Arc<dyn crate::progress::IndexBuildProgress>,
1314    ) -> Result<()> {
1315        let partition_id = bitmap_shard_partition_id(fragment_ids, shard_id)?;
1316        let file_name = bitmap_shard_file_name(partition_id);
1317        progress
1318            .stage_start("build_bitmap_shard", None, "rows")
1319            .await?;
1320        Self::streaming_build_and_write(data, None, index_store, &file_name).await?;
1321        progress.stage_complete("build_bitmap_shard").await?;
1322        Ok(())
1323    }
1324
1325    /// Builds and writes a bitmap index in a streaming fashion from value-sorted
1326    /// input. Only one value's bitmap is in memory at a time, reducing peak memory
1327    /// from O(unique_values * avg_bitmap) to O(largest_single_bitmap).
1328    ///
1329    /// If `old_index` is provided, its existing bitmaps are merged with the new
1330    /// data via a sorted merge-join (the old index_map is a BTreeMap, already
1331    /// sorted by value).
1332    async fn streaming_build_and_write(
1333        mut data_source: SendableRecordBatchStream,
1334        old_index: Option<&BitmapIndex>,
1335        index_store: &dyn IndexStore,
1336        output_file_name: &str,
1337    ) -> Result<()> {
1338        let value_type = data_source.schema().field(0).data_type().clone();
1339
1340        let mut writer =
1341            new_bitmap_batch_writer(index_store, output_file_name, &value_type).await?;
1342
1343        // Collect old index keys (already in memory as BTreeMap keys — this is
1344        // just a Vec of references, not a copy of the bitmaps themselves).
1345        let old_keys: Vec<OrderableScalarValue> = old_index
1346            .map(|idx| idx.index_map.keys().cloned().collect())
1347            .unwrap_or_default();
1348        let mut old_pos: usize = 0;
1349
1350        // Current value being accumulated from the new data stream.
1351        let mut current_key: Option<ScalarValue> = None;
1352        let mut current_bitmap = RowAddrTreeMap::default();
1353        // Track whether we emitted a null bitmap (old index stores nulls
1354        // separately in null_map, not in index_map).
1355        let mut emitted_null = false;
1356
1357        while let Some(batch) = data_source.try_next().await? {
1358            let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1359            let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
1360            debug_assert_eq!(row_ids.data_type(), &DataType::UInt64);
1361            let row_id_column = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();
1362
1363            for i in 0..values.len() {
1364                let row_id = row_id_column.value(i);
1365                let key = ScalarValue::try_from_array(values.as_ref(), i)?;
1366
1367                match &current_key {
1368                    Some(cur) if *cur == key => {
1369                        current_bitmap.insert(row_id);
1370                    }
1371                    _ => {
1372                        // Value changed — flush the previous run.
1373                        if let Some(prev_key) = current_key.take() {
1374                            let mut prev_bitmap = std::mem::take(&mut current_bitmap);
1375                            Self::finish_run(
1376                                prev_key,
1377                                &mut prev_bitmap,
1378                                old_index,
1379                                &old_keys,
1380                                &mut old_pos,
1381                                &mut emitted_null,
1382                                &mut writer,
1383                            )
1384                            .await?;
1385                        }
1386                        current_key = Some(key);
1387                        current_bitmap = RowAddrTreeMap::default();
1388                        current_bitmap.insert(row_id);
1389                    }
1390                }
1391            }
1392        }
1393
1394        // Flush the last accumulated run from new data.
1395        if let Some(last_key) = current_key.take() {
1396            let mut last_bitmap = std::mem::take(&mut current_bitmap);
1397            Self::finish_run(
1398                last_key,
1399                &mut last_bitmap,
1400                old_index,
1401                &old_keys,
1402                &mut old_pos,
1403                &mut emitted_null,
1404                &mut writer,
1405            )
1406            .await?;
1407        }
1408
1409        // Emit any remaining old-only entries.
1410        if let Some(idx) = old_index {
1411            while old_pos < old_keys.len() {
1412                let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?;
1413                writer
1414                    .emit(old_keys[old_pos].0.clone(), &old_bitmap)
1415                    .await?;
1416                old_pos += 1;
1417            }
1418        }
1419
1420        // Emit old null bitmap if we didn't already merge it with new nulls.
1421        if !emitted_null
1422            && let Some(idx) = old_index
1423            && !idx.null_map.is_empty()
1424        {
1425            let null_key = new_null_array(&value_type, 1);
1426            let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?;
1427            writer.emit(null_key, &idx.null_map).await?;
1428        }
1429
1430        writer.finish().await?;
1431
1432        Ok(())
1433    }
1434
1435    /// Flush a completed value-run from the new data stream, emitting any
1436    /// old-only entries that sort before it and merging the old bitmap if the
1437    /// key exists in both old and new.
1438    async fn finish_run(
1439        key: ScalarValue,
1440        bitmap: &mut RowAddrTreeMap,
1441        old_index: Option<&BitmapIndex>,
1442        old_keys: &[OrderableScalarValue],
1443        old_pos: &mut usize,
1444        emitted_null: &mut bool,
1445        writer: &mut BitmapBatchWriter,
1446    ) -> Result<()> {
1447        if key.is_null() {
1448            // Null values are stored separately in the old index's null_map.
1449            if let Some(idx) = old_index
1450                && !idx.null_map.is_empty()
1451            {
1452                *bitmap |= &*idx.null_map;
1453            }
1454            *emitted_null = true;
1455            writer.emit(key, bitmap).await?;
1456        } else if let Some(idx) = old_index {
1457            let orderable = OrderableScalarValue(key.clone());
1458
1459            // Emit old-only entries that sort before this key.
1460            while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable {
1461                let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
1462                writer
1463                    .emit(old_keys[*old_pos].0.clone(), &old_bitmap)
1464                    .await?;
1465                *old_pos += 1;
1466            }
1467
1468            // If the old index also has this key, merge its bitmap.
1469            if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable {
1470                let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
1471                *bitmap |= &*old_bitmap;
1472                *old_pos += 1;
1473            }
1474
1475            writer.emit(key, bitmap).await?;
1476        } else {
1477            writer.emit(key, bitmap).await?;
1478        }
1479        Ok(())
1480    }
1481
1482    /// Remaps every bitmap in a materialized bitmap-index state using row-id mappings.
1483    pub(crate) fn remap_bitmap_state(
1484        state: HashMap<ScalarValue, RowAddrTreeMap>,
1485        mapping: &HashMap<u64, Option<u64>>,
1486    ) -> HashMap<ScalarValue, RowAddrTreeMap> {
1487        state
1488            .into_iter()
1489            .map(|(key, bitmap)| {
1490                let remapped_bitmap =
1491                    RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| {
1492                        let addr_as_u64 = u64::from(addr);
1493                        mapping
1494                            .get(&addr_as_u64)
1495                            .copied()
1496                            .unwrap_or(Some(addr_as_u64))
1497                    }));
1498                (key, remapped_bitmap)
1499            })
1500            .collect()
1501    }
1502
1503    /// Merge per-shard bitmap lookup files into a single bitmap index file.
1504    ///
1505    /// Each shard file is already sorted by key and can contain many distinct keys.
1506    /// This method does not materialize an entire shard in memory. Instead, it keeps
1507    /// one cursor per shard, where each cursor tracks the shard's current row within
1508    /// a small in-memory batch. A min-heap stores the current key for each shard.
1509    ///
1510    /// The merge then proceeds as a streaming K-way merge:
1511    /// - pop the smallest current key across all shards
1512    /// - union the bitmap for that key with any other shards currently positioned on
1513    ///   the same key
1514    /// - advance only those shards that participated in the union and push their next
1515    ///   keys back into the heap
1516    ///
1517    /// This keeps memory usage proportional to the number of shards plus the bitmaps
1518    /// currently being merged, instead of the total number of keys across all shards.
1519    async fn merge_shards(
1520        store: &dyn IndexStore,
1521        shard_files: &[String],
1522        progress: Arc<dyn IndexBuildProgress>,
1523    ) -> Result<()> {
1524        progress
1525            .stage_start("merge_bitmap_shards", None, "bitmaps")
1526            .await?;
1527
1528        let mut cursors = Vec::with_capacity(shard_files.len());
1529        let mut heap = BinaryHeap::with_capacity(shard_files.len());
1530        let mut value_type: Option<DataType> = None;
1531
1532        for file_name in shard_files {
1533            let reader = store.open_index_file(file_name).await?;
1534            let shard_value_type = reader.schema().fields[0].data_type().clone();
1535            if let Some(existing_type) = &value_type {
1536                if existing_type != &shard_value_type {
1537                    return Err(Error::invalid_input(format!(
1538                        "Bitmap shard {} has value type {:?}, expected {:?}",
1539                        file_name, shard_value_type, existing_type
1540                    )));
1541                }
1542            } else {
1543                value_type = Some(shard_value_type);
1544            }
1545            if let Some(cursor) = BitmapShardCursor::try_new(file_name.clone(), reader).await? {
1546                let key = cursor.peek_key()?;
1547                let shard_idx = cursors.len();
1548                cursors.push(cursor);
1549                heap.push(Reverse(BitmapHeapItem { key, shard_idx }));
1550            }
1551        }
1552
1553        let value_type = value_type.ok_or_else(|| {
1554            Error::invalid_input("Bitmap shard merge requires at least one shard file".to_string())
1555        })?;
1556        let mut writer = new_bitmap_batch_writer(store, BITMAP_LOOKUP_NAME, &value_type).await?;
1557        let mut merged_keys = 0u64;
1558
1559        while let Some(Reverse(item)) = heap.pop() {
1560            let (key, merged_bitmap) =
1561                drain_same_key_bitmaps(&mut cursors, &mut heap, item).await?;
1562            writer.emit(key, &merged_bitmap).await?;
1563            merged_keys += 1;
1564            progress
1565                .stage_progress("merge_bitmap_shards", merged_keys)
1566                .await?;
1567        }
1568
1569        progress.stage_complete("merge_bitmap_shards").await?;
1570        progress
1571            .stage_start("write_bitmap_index", Some(1), "files")
1572            .await?;
1573        writer.finish().await?;
1574        progress.stage_progress("write_bitmap_index", 1).await?;
1575        progress.stage_complete("write_bitmap_index").await?;
1576        Ok(())
1577    }
1578}
1579
1580pub async fn merge_index_files(
1581    object_store: &ObjectStore,
1582    index_dir: &Path,
1583    store: Arc<dyn IndexStore>,
1584    progress: Arc<dyn IndexBuildProgress>,
1585) -> Result<()> {
1586    progress
1587        .stage_start("scan_bitmap_shards", None, "files")
1588        .await?;
1589    let shard_files = list_bitmap_shard_files(object_store, index_dir, progress.as_ref()).await?;
1590    progress.stage_complete("scan_bitmap_shards").await?;
1591
1592    BitmapIndexPlugin::merge_shards(store.as_ref(), &shard_files, progress).await?;
1593    cleanup_bitmap_shard_files(store.as_ref(), &shard_files).await;
1594    Ok(())
1595}
1596
1597#[async_trait]
1598impl ScalarIndexPlugin for BitmapIndexPlugin {
1599    fn name(&self) -> &str {
1600        "Bitmap"
1601    }
1602
1603    fn new_training_request(
1604        &self,
1605        params: &str,
1606        field: &Field,
1607    ) -> Result<Box<dyn TrainingRequest>> {
1608        if field.data_type().is_nested() {
1609            return Err(Error::invalid_input_source(
1610                "A bitmap index can only be created on a non-nested field.".into(),
1611            ));
1612        }
1613        let params = if params.is_empty() {
1614            BitmapParameters::default()
1615        } else {
1616            serde_json::from_str::<BitmapParameters>(params)?
1617        };
1618        Ok(Box::new(BitmapTrainingRequest::new(params)))
1619    }
1620
1621    fn provides_exact_answer(&self) -> bool {
1622        true
1623    }
1624
1625    fn version(&self) -> u32 {
1626        BITMAP_INDEX_VERSION
1627    }
1628
1629    fn new_query_parser(
1630        &self,
1631        index_name: String,
1632        _index_details: &prost_types::Any,
1633    ) -> Option<Box<dyn ScalarQueryParser>> {
1634        Some(Box::new(SargableQueryParser::new(
1635            index_name,
1636            self.name().to_string(),
1637            false,
1638        )))
1639    }
1640
1641    async fn train_index(
1642        &self,
1643        data: SendableRecordBatchStream,
1644        index_store: &dyn IndexStore,
1645        request: Box<dyn TrainingRequest>,
1646        fragment_ids: Option<Vec<u32>>,
1647        progress: Arc<dyn crate::progress::IndexBuildProgress>,
1648    ) -> Result<CreatedIndex> {
1649        let request = request
1650            .as_any()
1651            .downcast_ref::<BitmapTrainingRequest>()
1652            .ok_or_else(|| {
1653                Error::internal(
1654                    "BitmapIndexPlugin::train_index received a non-bitmap training request"
1655                        .to_string(),
1656                )
1657            })?;
1658        if let Some(fragment_ids) = fragment_ids.as_ref() {
1659            Self::train_bitmap_shard(
1660                data,
1661                index_store,
1662                fragment_ids,
1663                request.parameters.shard_id,
1664                progress,
1665            )
1666            .await?;
1667        } else if request.parameters.shard_id.is_some() {
1668            return Err(Error::invalid_input(
1669                "Bitmap shard_id requires fragment_ids and is only supported for distributed shard builds"
1670                    .to_string(),
1671            ));
1672        } else {
1673            Self::train_bitmap_index(data, index_store).await?;
1674        }
1675        Ok(CreatedIndex {
1676            index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default())
1677                .unwrap(),
1678            index_version: BITMAP_INDEX_VERSION,
1679            files: Some(index_store.list_files_with_sizes().await?),
1680        })
1681    }
1682
1683    /// Load an index from storage
1684    async fn load_index(
1685        &self,
1686        index_store: Arc<dyn IndexStore>,
1687        _index_details: &prost_types::Any,
1688        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1689        cache: &LanceCache,
1690    ) -> Result<Arc<dyn ScalarIndex>> {
1691        Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
1692    }
1693
1694    async fn get_from_cache(
1695        &self,
1696        index_store: Arc<dyn IndexStore>,
1697        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1698        cache: &LanceCache,
1699    ) -> Result<Option<Arc<dyn ScalarIndex>>> {
1700        let Some(state) = cache.get_with_key(&BitmapIndexStateKey).await else {
1701            return Ok(None);
1702        };
1703        let state = (*state).clone();
1704        let index = state.into_bitmap_index(index_store, cache, frag_reuse_index)?;
1705        Ok(Some(index as Arc<dyn ScalarIndex>))
1706    }
1707
1708    async fn put_in_cache(&self, cache: &LanceCache, index: Arc<dyn ScalarIndex>) -> Result<()> {
1709        let bitmap = index
1710            .as_any()
1711            .downcast_ref::<BitmapIndex>()
1712            .ok_or_else(|| {
1713                Error::internal("BitmapIndexPlugin::put_in_cache called with a non-bitmap index")
1714            })?;
1715        let state = BitmapIndexState::from_index(bitmap)?;
1716        cache
1717            .insert_with_key(&BitmapIndexStateKey, Arc::new(state))
1718            .await;
1719        Ok(())
1720    }
1721
1722    async fn load_statistics(
1723        &self,
1724        index_store: Arc<dyn IndexStore>,
1725        _index_details: &prost_types::Any,
1726    ) -> Result<Option<serde_json::Value>> {
1727        let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
1728        if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
1729            let stats = serde_json::from_str(value).map_err(|e| {
1730                Error::internal(format!("failed to parse bitmap statistics metadata: {e}"))
1731            })?;
1732            Ok(Some(stats))
1733        } else {
1734            Ok(None)
1735        }
1736    }
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741    use super::*;
1742    use crate::metrics::NoOpMetricsCollector;
1743    use crate::scalar::lance_format::LanceIndexStore;
1744    use arrow_array::{RecordBatch, StringArray, UInt64Array, record_batch};
1745    use arrow_schema::{DataType, Field, Schema};
1746
1747    /// Sort a (value, row_id) RecordBatch by the value column so that unit tests
1748    /// match the ordering the production scanner applies via TrainingOrdering::Values.
1749    fn sort_batch_by_value(batch: &RecordBatch) -> RecordBatch {
1750        use arrow::compute::SortOptions;
1751        let values = batch.column(0);
1752        let row_ids = batch.column(1);
1753        let options = SortOptions {
1754            descending: false,
1755            nulls_first: true,
1756        };
1757        let indices = arrow::compute::sort_to_indices(values, Some(options), None).unwrap();
1758        let sorted_values = arrow::compute::take(values.as_ref(), &indices, None).unwrap();
1759        let sorted_row_ids = arrow::compute::take(row_ids.as_ref(), &indices, None).unwrap();
1760        RecordBatch::try_new(batch.schema(), vec![sorted_values, sorted_row_ids]).unwrap()
1761    }
1762    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1763    use futures::stream;
1764    use lance_core::utils::mask::RowSetOps;
1765    use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
1766    use lance_io::object_store::ObjectStore;
1767    use std::collections::HashMap;
1768
1769    fn assert_state_roundtrips(state: &BitmapIndexState) {
1770        let mut buf = Vec::new();
1771        state.serialize(&mut buf).unwrap();
1772        let restored = BitmapIndexState::deserialize(&bytes::Bytes::from(buf)).unwrap();
1773        assert_eq!(restored.lookup_batch, state.lookup_batch);
1774        assert_eq!(&*restored.null_map, &*state.null_map);
1775        assert_eq!(restored.value_type, state.value_type);
1776    }
1777
1778    #[test]
1779    fn test_bitmap_index_state_codec_roundtrip() {
1780        // Non-empty state with a few keys and a populated null map.
1781        let mut index_map = BTreeMap::new();
1782        index_map.insert(OrderableScalarValue(ScalarValue::Int32(Some(1))), 0);
1783        index_map.insert(OrderableScalarValue(ScalarValue::Int32(Some(7))), 1);
1784        index_map.insert(OrderableScalarValue(ScalarValue::Int32(Some(42))), 2);
1785        let mut null_map = RowAddrTreeMap::new();
1786        null_map.insert(RowAddress::new_from_parts(0, 3).into());
1787        null_map.insert(RowAddress::new_from_parts(0, 5).into());
1788        let state = BitmapIndexState {
1789            lookup_batch: build_lookup_batch(&index_map, &DataType::Int32).unwrap(),
1790            null_map: Arc::new(null_map),
1791            value_type: DataType::Int32,
1792        };
1793        assert_state_roundtrips(&state);
1794
1795        // Empty state: no keys, empty null map. Schema still carries the type.
1796        let empty_state = BitmapIndexState {
1797            lookup_batch: build_lookup_batch(&BTreeMap::new(), &DataType::Utf8).unwrap(),
1798            null_map: Arc::new(RowAddrTreeMap::new()),
1799            value_type: DataType::Utf8,
1800        };
1801        assert_state_roundtrips(&empty_state);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_bitmap_lazy_loading_and_cache() {
1806        // Create a temporary directory for the index
1807        let tmpdir = TempObjDir::default();
1808        let store = Arc::new(LanceIndexStore::new(
1809            Arc::new(ObjectStore::local()),
1810            tmpdir.clone(),
1811            Arc::new(LanceCache::no_cache()),
1812        ));
1813
1814        // Create test data with low cardinality column
1815        let colors = vec![
1816            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
1817            "red", "red", "blue", "green", "yellow",
1818        ];
1819
1820        let row_ids = (0u64..15u64).collect::<Vec<_>>();
1821
1822        let schema = Arc::new(Schema::new(vec![
1823            Field::new("value", DataType::Utf8, false),
1824            Field::new("_rowid", DataType::UInt64, false),
1825        ]));
1826
1827        let batch = RecordBatch::try_new(
1828            schema.clone(),
1829            vec![
1830                Arc::new(StringArray::from(colors.clone())),
1831                Arc::new(UInt64Array::from(row_ids.clone())),
1832            ],
1833        )
1834        .unwrap();
1835
1836        let batch = sort_batch_by_value(&batch);
1837        let stream = stream::once(async move { Ok(batch) });
1838        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1839
1840        // Train and write the bitmap index
1841        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
1842            .await
1843            .unwrap();
1844
1845        // Create a cache with limited capacity
1846        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
1847
1848        // Load the index (should only load metadata, not bitmaps)
1849        let index = BitmapIndex::load(store.clone(), None, &cache)
1850            .await
1851            .unwrap();
1852
1853        assert_eq!(index.index_map.len(), 4); // 4 non-null unique values (red, blue, green, yellow)
1854        assert!(index.null_map.is_empty()); // No nulls in test data
1855
1856        // Test 1: Search for "red"
1857        let query = SargableQuery::Equals(ScalarValue::Utf8(Some("red".to_string())));
1858        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1859
1860        // Verify results
1861        let expected_red_rows = vec![0u64, 3, 6, 10, 11];
1862        if let SearchResult::Exact(row_ids) = result {
1863            let mut actual: Vec<u64> = row_ids
1864                .true_rows()
1865                .row_addrs()
1866                .unwrap()
1867                .map(|id| id.into())
1868                .collect();
1869            actual.sort();
1870            assert_eq!(actual, expected_red_rows);
1871        } else {
1872            panic!("Expected exact search result");
1873        }
1874
1875        // Test 2: Search for "red" again - should hit cache
1876        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1877        if let SearchResult::Exact(row_ids) = result {
1878            let mut actual: Vec<u64> = row_ids
1879                .true_rows()
1880                .row_addrs()
1881                .unwrap()
1882                .map(|id| id.into())
1883                .collect();
1884            actual.sort();
1885            assert_eq!(actual, expected_red_rows);
1886        }
1887
1888        // Test 3: Range query
1889        let query = SargableQuery::Range(
1890            std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1891            std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1892        );
1893        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1894
1895        let expected_range_rows = vec![1u64, 2, 5, 7, 8, 12, 13];
1896        if let SearchResult::Exact(row_ids) = result {
1897            let mut actual: Vec<u64> = row_ids
1898                .true_rows()
1899                .row_addrs()
1900                .unwrap()
1901                .map(|id| id.into())
1902                .collect();
1903            actual.sort();
1904            assert_eq!(actual, expected_range_rows);
1905        }
1906
1907        // Test 3b: Inverted range query should return empty result
1908        let query = SargableQuery::Range(
1909            std::ops::Bound::Included(ScalarValue::Utf8(Some("green".to_string()))),
1910            std::ops::Bound::Included(ScalarValue::Utf8(Some("blue".to_string()))),
1911        );
1912        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1913        if let SearchResult::Exact(row_ids) = result {
1914            assert!(row_ids.true_rows().is_empty());
1915        } else {
1916            panic!("Expected exact search result");
1917        }
1918
1919        // Test 4: IsIn query
1920        let query = SargableQuery::IsIn(vec![
1921            ScalarValue::Utf8(Some("red".to_string())),
1922            ScalarValue::Utf8(Some("yellow".to_string())),
1923        ]);
1924        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1925
1926        let expected_in_rows = vec![0u64, 3, 4, 6, 9, 10, 11, 14];
1927        if let SearchResult::Exact(row_ids) = result {
1928            let mut actual: Vec<u64> = row_ids
1929                .true_rows()
1930                .row_addrs()
1931                .unwrap()
1932                .map(|id| id.into())
1933                .collect();
1934            actual.sort();
1935            assert_eq!(actual, expected_in_rows);
1936        }
1937    }
1938
1939    #[tokio::test]
1940    #[ignore]
1941    async fn test_big_bitmap_index() {
1942        // WARNING: This test allocates a huge state to force overflow over int32 on BinaryArray
1943        // You must run it only on a machine with enough resources (or skip it normally).
1944        use super::{BITMAP_LOOKUP_NAME, BitmapIndex};
1945        use crate::scalar::IndexStore;
1946        use crate::scalar::lance_format::LanceIndexStore;
1947        use arrow_schema::DataType;
1948        use datafusion_common::ScalarValue;
1949        use lance_core::cache::LanceCache;
1950        use lance_core::utils::mask::RowAddrTreeMap;
1951        use lance_io::object_store::ObjectStore;
1952        use std::collections::HashMap;
1953        use std::sync::Arc;
1954
1955        // Adjust these numbers so that:
1956        //     m * (serialized size per bitmap) > 2^31 bytes.
1957        //
1958        // For example, if we assume each bitmap serializes to ~1000 bytes,
1959        // you need m > 2.1e6.
1960        let m: u32 = 2_500_000;
1961        let per_bitmap_size = 1000; // assumed bytes per bitmap
1962
1963        let mut state = HashMap::new();
1964        for i in 0..m {
1965            // Create a bitmap that contains, say, 1000 row IDs.
1966            let bitmap = RowAddrTreeMap::from_iter(0..per_bitmap_size);
1967
1968            let key = ScalarValue::UInt32(Some(i));
1969            state.insert(key, bitmap);
1970        }
1971
1972        // Create a temporary store.
1973        let tmpdir = TempObjDir::default();
1974        let test_store = LanceIndexStore::new(
1975            Arc::new(ObjectStore::local()),
1976            tmpdir.clone(),
1977            Arc::new(LanceCache::no_cache()),
1978        );
1979
1980        // This call should never trigger a "byte array offset overflow" error since now the code supports
1981        // read by chunks
1982        let result =
1983            BitmapIndexPlugin::write_bitmap_index(state, &test_store, &DataType::UInt32).await;
1984
1985        assert!(
1986            result.is_ok(),
1987            "Failed to write bitmap index: {:?}",
1988            result.err()
1989        );
1990
1991        // Verify the index file exists
1992        let index_file = test_store.open_index_file(BITMAP_LOOKUP_NAME).await;
1993        assert!(
1994            index_file.is_ok(),
1995            "Failed to open index file: {:?}",
1996            index_file.err()
1997        );
1998        let index_file = index_file.unwrap();
1999
2000        // Print stats about the index file
2001        tracing::info!(
2002            "Index file contains {} rows in total",
2003            index_file.num_rows()
2004        );
2005
2006        // Load the index using BitmapIndex::load
2007        tracing::info!("Loading index from disk...");
2008        let loaded_index = BitmapIndex::load(Arc::new(test_store), None, &LanceCache::no_cache())
2009            .await
2010            .expect("Failed to load bitmap index");
2011
2012        // Verify the loaded index has the correct number of entries
2013        assert_eq!(
2014            loaded_index.index_map.len(),
2015            m as usize,
2016            "Loaded index has incorrect number of keys (expected {}, got {})",
2017            m,
2018            loaded_index.index_map.len()
2019        );
2020
2021        // Manually verify specific keys without using search()
2022        let test_keys = [0, m / 2, m - 1]; // Beginning, middle, and end
2023        for &key_val in &test_keys {
2024            let key = OrderableScalarValue(ScalarValue::UInt32(Some(key_val)));
2025            // Load the bitmap for this key
2026            let bitmap = loaded_index
2027                .load_bitmap(&key, None)
2028                .await
2029                .unwrap_or_else(|_| panic!("Key {} should exist", key_val));
2030
2031            // Convert RowAddrTreeMap to a vector for easier assertion
2032            let row_addrs: Vec<u64> = bitmap.row_addrs().unwrap().map(u64::from).collect();
2033
2034            // Verify length
2035            assert_eq!(
2036                row_addrs.len(),
2037                per_bitmap_size as usize,
2038                "Bitmap for key {} has wrong size",
2039                key_val
2040            );
2041
2042            // Verify first few and last few elements
2043            for i in 0..5.min(per_bitmap_size) {
2044                assert!(
2045                    row_addrs.contains(&i),
2046                    "Bitmap for key {} should contain row_id {}",
2047                    key_val,
2048                    i
2049                );
2050            }
2051
2052            for i in (per_bitmap_size - 5)..per_bitmap_size {
2053                assert!(
2054                    row_addrs.contains(&i),
2055                    "Bitmap for key {} should contain row_id {}",
2056                    key_val,
2057                    i
2058                );
2059            }
2060
2061            // Verify exact range
2062            let expected_range: Vec<u64> = (0..per_bitmap_size).collect();
2063            assert_eq!(
2064                row_addrs, expected_range,
2065                "Bitmap for key {} doesn't contain expected values",
2066                key_val
2067            );
2068
2069            tracing::info!(
2070                "✓ Verified bitmap for key {}: {} rows as expected",
2071                key_val,
2072                row_addrs.len()
2073            );
2074        }
2075
2076        tracing::info!("Test successful! Index properly contains {} keys", m);
2077    }
2078
2079    #[tokio::test]
2080    async fn test_bitmap_prewarm() {
2081        // Create a temporary directory for the index
2082        let tmpdir = TempObjDir::default();
2083        let store = Arc::new(LanceIndexStore::new(
2084            Arc::new(ObjectStore::local()),
2085            tmpdir.clone(),
2086            Arc::new(LanceCache::no_cache()),
2087        ));
2088
2089        // Create test data with low cardinality
2090        let colors = vec![
2091            "red", "blue", "green", "red", "yellow", "blue", "red", "green", "blue", "yellow",
2092            "red", "red", "blue", "green", "yellow",
2093        ];
2094
2095        let row_ids = (0u64..15u64).collect::<Vec<_>>();
2096
2097        let schema = Arc::new(Schema::new(vec![
2098            Field::new("value", DataType::Utf8, false),
2099            Field::new("_rowid", DataType::UInt64, false),
2100        ]));
2101
2102        let batch = RecordBatch::try_new(
2103            schema.clone(),
2104            vec![
2105                Arc::new(StringArray::from(colors.clone())),
2106                Arc::new(UInt64Array::from(row_ids.clone())),
2107            ],
2108        )
2109        .unwrap();
2110
2111        let batch = sort_batch_by_value(&batch);
2112        let stream = stream::once(async move { Ok(batch) });
2113        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
2114
2115        // Train and write the bitmap index
2116        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
2117            .await
2118            .unwrap();
2119
2120        // Create a cache with metrics tracking
2121        let cache = LanceCache::with_capacity(1024 * 1024); // 1MB cache
2122
2123        // Load the index (should only load metadata, not bitmaps)
2124        let index = BitmapIndex::load(store.clone(), None, &cache)
2125            .await
2126            .unwrap();
2127
2128        // Verify no bitmaps are cached yet
2129        let cache_key_red = BitmapKey {
2130            value: OrderableScalarValue(ScalarValue::Utf8(Some("red".to_string()))),
2131        };
2132        let cache_key_blue = BitmapKey {
2133            value: OrderableScalarValue(ScalarValue::Utf8(Some("blue".to_string()))),
2134        };
2135
2136        assert!(
2137            cache
2138                .get_with_key::<BitmapKey>(&cache_key_red)
2139                .await
2140                .is_none()
2141        );
2142        assert!(
2143            cache
2144                .get_with_key::<BitmapKey>(&cache_key_blue)
2145                .await
2146                .is_none()
2147        );
2148
2149        // Call prewarm
2150        index.prewarm().await.unwrap();
2151
2152        // Verify all bitmaps are now cached
2153        assert!(
2154            cache
2155                .get_with_key::<BitmapKey>(&cache_key_red)
2156                .await
2157                .is_some()
2158        );
2159        assert!(
2160            cache
2161                .get_with_key::<BitmapKey>(&cache_key_blue)
2162                .await
2163                .is_some()
2164        );
2165
2166        // Verify cached bitmaps have correct content
2167        let cached_red = cache
2168            .get_with_key::<BitmapKey>(&cache_key_red)
2169            .await
2170            .unwrap();
2171        let red_rows: Vec<u64> = cached_red.row_addrs().unwrap().map(u64::from).collect();
2172        assert_eq!(red_rows, vec![0, 3, 6, 10, 11]);
2173
2174        // Call prewarm again - should be idempotent
2175        index.prewarm().await.unwrap();
2176
2177        // Verify cache still contains the same items
2178        let cached_red_2 = cache
2179            .get_with_key::<BitmapKey>(&cache_key_red)
2180            .await
2181            .unwrap();
2182        let red_rows_2: Vec<u64> = cached_red_2.row_addrs().unwrap().map(u64::from).collect();
2183        assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]);
2184    }
2185
2186    #[tokio::test]
2187    async fn test_remap_bitmap_with_null() {
2188        use arrow_array::UInt32Array;
2189
2190        // Create a temporary store.
2191        let tmpdir = TempObjDir::default();
2192        let test_store = Arc::new(LanceIndexStore::new(
2193            Arc::new(ObjectStore::local()),
2194            tmpdir.clone(),
2195            Arc::new(LanceCache::no_cache()),
2196        ));
2197
2198        // Create test data that simulates:
2199        // frag 1 - { 0: null, 1: null, 2: 1 }
2200        // frag 2 - { 0: 1, 1: 2, 2: 2 }
2201        // We'll create this data with specific row addresses
2202        let values = vec![
2203            None,       // row 0: null (will be at address (1,0))
2204            None,       // row 1: null (will be at address (1,1))
2205            Some(1u32), // row 2: 1    (will be at address (1,2))
2206            Some(1u32), // row 3: 1    (will be at address (2,0))
2207            Some(2u32), // row 4: 2    (will be at address (2,1))
2208            Some(2u32), // row 5: 2    (will be at address (2,2))
2209        ];
2210
2211        // Create row IDs with specific fragment addresses
2212        let row_ids: Vec<u64> = vec![
2213            RowAddress::new_from_parts(1, 0).into(),
2214            RowAddress::new_from_parts(1, 1).into(),
2215            RowAddress::new_from_parts(1, 2).into(),
2216            RowAddress::new_from_parts(2, 0).into(),
2217            RowAddress::new_from_parts(2, 1).into(),
2218            RowAddress::new_from_parts(2, 2).into(),
2219        ];
2220
2221        let schema = Arc::new(Schema::new(vec![
2222            Field::new("value", DataType::UInt32, true),
2223            Field::new("_rowid", DataType::UInt64, false),
2224        ]));
2225
2226        let batch = RecordBatch::try_new(
2227            schema.clone(),
2228            vec![
2229                Arc::new(UInt32Array::from(values)),
2230                Arc::new(UInt64Array::from(row_ids)),
2231            ],
2232        )
2233        .unwrap();
2234
2235        let stream = stream::once(async move { Ok(batch) });
2236        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
2237
2238        // Create the bitmap index
2239        BitmapIndexPlugin::train_bitmap_index(stream, test_store.as_ref())
2240            .await
2241            .unwrap();
2242
2243        // Load the index
2244        let index = BitmapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2245            .await
2246            .expect("Failed to load bitmap index");
2247
2248        // Verify initial state
2249        assert_eq!(index.index_map.len(), 2); // 2 non-null values (1 and 2)
2250        assert!(!index.null_map.is_empty()); // Should have null values
2251
2252        // Create a remap that simulates compaction of frags 1 and 2 into frag 3
2253        let mut row_addr_map = HashMap::<u64, Option<u64>>::new();
2254        row_addr_map.insert(
2255            RowAddress::new_from_parts(1, 0).into(),
2256            Some(RowAddress::new_from_parts(3, 0).into()),
2257        );
2258        row_addr_map.insert(
2259            RowAddress::new_from_parts(1, 1).into(),
2260            Some(RowAddress::new_from_parts(3, 1).into()),
2261        );
2262        row_addr_map.insert(
2263            RowAddress::new_from_parts(1, 2).into(),
2264            Some(RowAddress::new_from_parts(3, 2).into()),
2265        );
2266        row_addr_map.insert(
2267            RowAddress::new_from_parts(2, 0).into(),
2268            Some(RowAddress::new_from_parts(3, 3).into()),
2269        );
2270        row_addr_map.insert(
2271            RowAddress::new_from_parts(2, 1).into(),
2272            Some(RowAddress::new_from_parts(3, 4).into()),
2273        );
2274        row_addr_map.insert(
2275            RowAddress::new_from_parts(2, 2).into(),
2276            Some(RowAddress::new_from_parts(3, 5).into()),
2277        );
2278
2279        // Perform remap
2280        index
2281            .remap(&row_addr_map, test_store.as_ref())
2282            .await
2283            .unwrap();
2284
2285        // Reload and check
2286        let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache())
2287            .await
2288            .expect("Failed to load remapped bitmap index");
2289
2290        // Verify the null bitmap was remapped correctly
2291        let expected_null_addrs: Vec<u64> = vec![
2292            RowAddress::new_from_parts(3, 0).into(),
2293            RowAddress::new_from_parts(3, 1).into(),
2294        ];
2295        let actual_null_addrs: Vec<u64> = reloaded_idx
2296            .null_map
2297            .row_addrs()
2298            .unwrap()
2299            .map(u64::from)
2300            .collect();
2301        assert_eq!(
2302            actual_null_addrs, expected_null_addrs,
2303            "Null bitmap not remapped correctly"
2304        );
2305
2306        // Search for value 1 and verify remapped addresses
2307        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(1)));
2308        let result = reloaded_idx
2309            .search(&query, &NoOpMetricsCollector)
2310            .await
2311            .unwrap();
2312        if let crate::scalar::SearchResult::Exact(row_ids) = result {
2313            let mut actual: Vec<u64> = row_ids
2314                .true_rows()
2315                .row_addrs()
2316                .unwrap()
2317                .map(u64::from)
2318                .collect();
2319            actual.sort();
2320            let expected: Vec<u64> = vec![
2321                RowAddress::new_from_parts(3, 2).into(),
2322                RowAddress::new_from_parts(3, 3).into(),
2323            ];
2324            assert_eq!(actual, expected, "Value 1 bitmap not remapped correctly");
2325        }
2326
2327        // Search for value 2 and verify remapped addresses
2328        let query = SargableQuery::Equals(ScalarValue::UInt32(Some(2)));
2329        let result = reloaded_idx
2330            .search(&query, &NoOpMetricsCollector)
2331            .await
2332            .unwrap();
2333        if let crate::scalar::SearchResult::Exact(row_ids) = result {
2334            let mut actual: Vec<u64> = row_ids
2335                .true_rows()
2336                .row_addrs()
2337                .unwrap()
2338                .map(u64::from)
2339                .collect();
2340            actual.sort();
2341            let expected: Vec<u64> = vec![
2342                RowAddress::new_from_parts(3, 4).into(),
2343                RowAddress::new_from_parts(3, 5).into(),
2344            ];
2345            assert_eq!(actual, expected, "Value 2 bitmap not remapped correctly");
2346        }
2347
2348        // Search for null values
2349        let query = SargableQuery::IsNull();
2350        let result = reloaded_idx
2351            .search(&query, &NoOpMetricsCollector)
2352            .await
2353            .unwrap();
2354        if let crate::scalar::SearchResult::Exact(row_ids) = result {
2355            let mut actual: Vec<u64> = row_ids
2356                .true_rows()
2357                .row_addrs()
2358                .unwrap()
2359                .map(u64::from)
2360                .collect();
2361            actual.sort();
2362            assert_eq!(
2363                actual, expected_null_addrs,
2364                "Null search results not correct"
2365            );
2366        }
2367    }
2368
2369    #[tokio::test]
2370    async fn test_bitmap_null_handling_in_queries() {
2371        // Test that bitmap index correctly returns null_list for queries
2372        let tmpdir = TempObjDir::default();
2373        let store = Arc::new(LanceIndexStore::new(
2374            Arc::new(ObjectStore::local()),
2375            tmpdir.clone(),
2376            Arc::new(LanceCache::no_cache()),
2377        ));
2378
2379        // Create test data: [0, 5, null]
2380        let batch = record_batch!(
2381            ("value", Int64, [Some(0), Some(5), None]),
2382            ("_rowid", UInt64, [0, 1, 2])
2383        )
2384        .unwrap();
2385        let schema = batch.schema();
2386        let stream = stream::once(async move { Ok(batch) });
2387        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
2388
2389        // Train and write the bitmap index
2390        BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
2391            .await
2392            .unwrap();
2393
2394        let cache = LanceCache::with_capacity(1024 * 1024);
2395        let index = BitmapIndex::load(store.clone(), None, &cache)
2396            .await
2397            .unwrap();
2398
2399        // Test 1: Search for value 5 - should return allow=[1], null=[2]
2400        let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
2401        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2402
2403        match result {
2404            SearchResult::Exact(row_ids) => {
2405                let actual_rows: Vec<u64> = row_ids
2406                    .true_rows()
2407                    .row_addrs()
2408                    .unwrap()
2409                    .map(u64::from)
2410                    .collect();
2411                assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 5");
2412
2413                let null_row_ids = row_ids.null_rows();
2414                // Check that null_row_ids contains row 2
2415                assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
2416                let null_rows: Vec<u64> =
2417                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
2418                assert_eq!(null_rows, vec![2], "Should report row 2 as null");
2419            }
2420            _ => panic!("Expected Exact search result"),
2421        }
2422
2423        // Test 2: Search for null values - should return allow=[2], null=None
2424        let query = SargableQuery::IsNull();
2425        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2426
2427        match result {
2428            SearchResult::Exact(row_addrs) => {
2429                let actual_rows: Vec<u64> = row_addrs
2430                    .true_rows()
2431                    .row_addrs()
2432                    .unwrap()
2433                    .map(u64::from)
2434                    .collect();
2435                assert_eq!(
2436                    actual_rows,
2437                    vec![2],
2438                    "IsNull should find row 2 where value is null"
2439                );
2440
2441                let null_row_ids = row_addrs.null_rows();
2442                // When querying FOR nulls, null_row_ids should be None (nulls are the TRUE result)
2443                assert!(
2444                    null_row_ids.is_empty(),
2445                    "null_row_ids should be None for IsNull query"
2446                );
2447            }
2448            _ => panic!("Expected Exact search result"),
2449        }
2450
2451        // Test 3: Range query - should return matching rows and null_list
2452        let query = SargableQuery::Range(
2453            std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
2454            std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
2455        );
2456        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2457
2458        match result {
2459            SearchResult::Exact(row_addrs) => {
2460                let actual_rows: Vec<u64> = row_addrs
2461                    .true_rows()
2462                    .row_addrs()
2463                    .unwrap()
2464                    .map(u64::from)
2465                    .collect();
2466                assert_eq!(actual_rows, vec![0], "Should find row 0 where value == 0");
2467
2468                // Should report row 2 as null
2469                let null_row_ids = row_addrs.null_rows();
2470                assert!(!null_row_ids.is_empty(), "null_row_ids should be Some");
2471                let null_rows: Vec<u64> =
2472                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
2473                assert_eq!(null_rows, vec![2], "Should report row 2 as null");
2474            }
2475            _ => panic!("Expected Exact search result"),
2476        }
2477    }
2478}