use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use ahash::AHashMap;
use roaring::RoaringTreemap;
use crate::common::{CompactArc, I64Map};
use crate::core::{DataType, Error, IndexEntry, IndexType, Operator, Result, RowIdVec, Value};
use crate::storage::expression::Expression;
use crate::storage::traits::Index;
const HIGH_CARDINALITY_WARNING_THRESHOLD: usize = 1000;
pub struct BitmapIndex {
name: String,
table_name: String,
column_names: Vec<String>,
column_ids: Vec<i32>,
data_types: Vec<DataType>,
is_unique: bool,
closed: AtomicBool,
bitmaps: RwLock<AHashMap<CompactArc<Value>, RoaringTreemap>>,
row_to_value: RwLock<I64Map<CompactArc<Value>>>,
distinct_count: AtomicUsize,
}
impl std::fmt::Debug for BitmapIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BitmapIndex")
.field("name", &self.name)
.field("table_name", &self.table_name)
.field("column_names", &self.column_names)
.field("column_ids", &self.column_ids)
.field("is_unique", &self.is_unique)
.field(
"distinct_count",
&self.distinct_count.load(AtomicOrdering::Relaxed),
)
.field("closed", &self.closed.load(AtomicOrdering::Relaxed))
.finish_non_exhaustive()
}
}
impl BitmapIndex {
pub fn new(
name: String,
table_name: String,
column_names: Vec<String>,
column_ids: Vec<i32>,
data_types: Vec<DataType>,
is_unique: bool,
expected_rows: usize,
) -> Self {
Self {
name,
table_name,
column_names,
column_ids,
data_types,
is_unique,
closed: AtomicBool::new(false),
bitmaps: RwLock::new(AHashMap::default()),
row_to_value: RwLock::new(if expected_rows > 0 {
I64Map::with_capacity(expected_rows)
} else {
I64Map::new()
}),
distinct_count: AtomicUsize::new(0),
}
}
pub fn cardinality(&self) -> usize {
self.distinct_count.load(AtomicOrdering::Relaxed)
}
pub fn is_high_cardinality(&self) -> bool {
self.cardinality() > HIGH_CARDINALITY_WARNING_THRESHOLD
}
pub fn get_bitmap(&self, value: &Value) -> Option<RoaringTreemap> {
let arc_key = self.value_to_arc_key(std::slice::from_ref(value));
let bitmaps = self.bitmaps.read();
bitmaps.get(&arc_key).cloned()
}
pub fn and_values(&self, values: &[Value]) -> RoaringTreemap {
let bitmaps = self.bitmaps.read();
let mut result: Option<RoaringTreemap> = None;
for value in values {
let arc_key = self.value_to_arc_key(std::slice::from_ref(value));
if let Some(bitmap) = bitmaps.get(&arc_key) {
result = Some(match result {
Some(r) => r & bitmap,
None => bitmap.clone(),
});
} else {
return RoaringTreemap::new();
}
}
result.unwrap_or_default()
}
pub fn or_values(&self, values: &[Value]) -> RoaringTreemap {
let bitmaps = self.bitmaps.read();
let mut result = RoaringTreemap::new();
for value in values {
let arc_key = self.value_to_arc_key(std::slice::from_ref(value));
if let Some(bitmap) = bitmaps.get(&arc_key) {
result |= bitmap;
}
}
result
}
pub fn not_value(&self, value: &Value) -> RoaringTreemap {
let arc_key = self.value_to_arc_key(std::slice::from_ref(value));
let bitmaps = self.bitmaps.read();
let mut all_rows = RoaringTreemap::new();
for bitmap in bitmaps.values() {
all_rows |= bitmap;
}
if let Some(bitmap) = bitmaps.get(&arc_key) {
all_rows - bitmap
} else {
all_rows
}
}
fn value_to_arc_key(&self, values: &[Value]) -> CompactArc<Value> {
if values.len() == 1 {
CompactArc::new(values[0].clone())
} else {
let composite = Value::Text(
values
.iter()
.map(|v| format!("{:?}", v))
.collect::<Vec<_>>()
.join("||")
.into(),
);
CompactArc::new(composite)
}
}
#[cold]
fn add_multi_column_slow(
&self,
values: &[Value],
row_id: i64,
row_id_u64: u64,
mut bitmaps: parking_lot::RwLockWriteGuard<'_, AHashMap<CompactArc<Value>, RoaringTreemap>>,
mut row_to_value: parking_lot::RwLockWriteGuard<'_, I64Map<CompactArc<Value>>>,
) -> Result<()> {
let arc_key = self.value_to_arc_key(values);
if self.is_unique {
let has_null = values.iter().any(|v| v.is_null());
if !has_null {
if let Some(bitmap) = bitmaps.get(&arc_key) {
let existing_count = if bitmap.contains(row_id_u64) {
bitmap.len() - 1
} else {
bitmap.len()
};
if existing_count > 0 {
let values_str: Vec<String> =
values.iter().map(|v| format!("{:?}", v)).collect();
return Err(Error::unique_constraint(
&self.name,
self.column_names.join(", "),
format!("[{}]", values_str.join(", ")),
));
}
}
}
}
if let Some(old_arc_key) = row_to_value.get(row_id).cloned() {
if !CompactArc::ptr_eq(&old_arc_key, &arc_key) {
if let Some(old_bitmap) = bitmaps.get_mut(&old_arc_key) {
old_bitmap.remove(row_id_u64);
if old_bitmap.is_empty() {
bitmaps.remove(&old_arc_key);
self.distinct_count.fetch_sub(1, AtomicOrdering::Relaxed);
}
}
}
}
let is_new_value = !bitmaps.contains_key(&arc_key);
let bitmap = bitmaps.entry(CompactArc::clone(&arc_key)).or_default();
bitmap.insert(row_id_u64);
row_to_value.insert(row_id, arc_key);
if is_new_value {
self.distinct_count.fetch_add(1, AtomicOrdering::Relaxed);
}
Ok(())
}
}
impl Index for BitmapIndex {
fn name(&self) -> &str {
&self.name
}
fn table_name(&self) -> &str {
&self.table_name
}
fn build(&mut self) -> Result<()> {
Ok(())
}
fn add(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
if row_id < 0 {
return Err(Error::internal(format!(
"bitmap index: row_id must be non-negative, got {}",
row_id
)));
}
let row_id_u64 = row_id as u64;
let num_cols = self.column_ids.len();
if values.len() != num_cols {
return Err(Error::internal(format!(
"expected {} values, got {}",
num_cols,
values.len()
)));
}
let mut bitmaps = self.bitmaps.write();
let mut row_to_value = self.row_to_value.write();
let lookup_value = if values.len() == 1 {
&values[0]
} else {
return self.add_multi_column_slow(values, row_id, row_id_u64, bitmaps, row_to_value);
};
if self.is_unique && !lookup_value.is_null() {
if let Some(bitmap) = bitmaps.get(lookup_value) {
let existing_count = if bitmap.contains(row_id_u64) {
bitmap.len() - 1
} else {
bitmap.len()
};
if existing_count > 0 {
return Err(Error::unique_constraint(
&self.name,
self.column_names.join(", "),
format!("{:?}", lookup_value),
));
}
}
}
let (arc_key, is_new_value) =
if let Some((existing_arc, _)) = bitmaps.get_key_value(lookup_value) {
(CompactArc::clone(existing_arc), false)
} else {
(CompactArc::new(lookup_value.clone()), true)
};
if let Some(old_arc_key) = row_to_value.get(row_id).cloned() {
if !CompactArc::ptr_eq(&old_arc_key, &arc_key) {
if let Some(old_bitmap) = bitmaps.get_mut(&old_arc_key) {
old_bitmap.remove(row_id_u64);
if old_bitmap.is_empty() {
bitmaps.remove(&old_arc_key);
self.distinct_count.fetch_sub(1, AtomicOrdering::Relaxed);
}
}
}
}
let bitmap = bitmaps.entry(CompactArc::clone(&arc_key)).or_default();
bitmap.insert(row_id_u64);
row_to_value.insert(row_id, arc_key);
if is_new_value {
self.distinct_count.fetch_add(1, AtomicOrdering::Relaxed);
}
Ok(())
}
fn add_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
for (row_id, values) in entries.iter() {
self.add(values, row_id, 0)?;
}
Ok(())
}
fn remove(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
if row_id < 0 {
return Err(Error::internal(format!(
"bitmap index: row_id must be non-negative, got {}",
row_id
)));
}
let row_id_u64 = row_id as u64;
let arc_key = self.value_to_arc_key(values);
let mut bitmaps = self.bitmaps.write();
let mut row_to_value = self.row_to_value.write();
if let Some(bitmap) = bitmaps.get_mut(&arc_key) {
bitmap.remove(row_id_u64);
if bitmap.is_empty() {
bitmaps.remove(&arc_key);
self.distinct_count.fetch_sub(1, AtomicOrdering::Relaxed);
}
}
row_to_value.remove(row_id);
Ok(())
}
fn remove_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
for (row_id, values) in entries.iter() {
self.remove(values, row_id, 0)?;
}
Ok(())
}
fn add_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
let num_cols = self.column_ids.len();
let mut bitmaps = self.bitmaps.write();
let mut row_to_value = self.row_to_value.write();
row_to_value.reserve(entries.len());
if self.is_unique {
let mut batch_keys: AHashMap<CompactArc<Value>, i64> =
AHashMap::with_capacity(entries.len());
for &(row_id, values) in entries {
if row_id < 0 || values.len() != num_cols {
continue;
}
if values.iter().any(|v| v.is_null()) {
continue;
}
let row_id_u64 = row_id as u64;
let arc_key = self.value_to_arc_key(values);
if let Some(&existing_row_id) = batch_keys.get(&arc_key) {
if existing_row_id != row_id {
return Err(Error::unique_constraint(
&self.name,
self.column_names.join(", "),
format!("{:?}", values),
));
}
}
if let Some(bitmap) = bitmaps.get(&arc_key) {
let existing_count = if bitmap.contains(row_id_u64) {
bitmap.len() - 1
} else {
bitmap.len()
};
if existing_count > 0 {
return Err(Error::unique_constraint(
&self.name,
self.column_names.join(", "),
format!("{:?}", values),
));
}
}
batch_keys.insert(arc_key, row_id);
}
}
for &(row_id, values) in entries {
if row_id < 0 {
continue; }
let row_id_u64 = row_id as u64;
if values.len() != num_cols {
continue;
}
let arc_key = self.value_to_arc_key(values);
let (final_arc_key, is_new_value) =
if let Some((existing_arc, _)) = bitmaps.get_key_value(&arc_key) {
(CompactArc::clone(existing_arc), false)
} else {
(arc_key, true)
};
if let Some(old_arc_key) = row_to_value.get(row_id).cloned() {
if !CompactArc::ptr_eq(&old_arc_key, &final_arc_key) {
if let Some(old_bitmap) = bitmaps.get_mut(&old_arc_key) {
old_bitmap.remove(row_id_u64);
if old_bitmap.is_empty() {
bitmaps.remove(&old_arc_key);
self.distinct_count.fetch_sub(1, AtomicOrdering::Relaxed);
}
}
}
}
let bitmap = bitmaps
.entry(CompactArc::clone(&final_arc_key))
.or_default();
bitmap.insert(row_id_u64);
row_to_value.insert(row_id, final_arc_key);
if is_new_value {
self.distinct_count.fetch_add(1, AtomicOrdering::Relaxed);
}
}
Ok(())
}
fn remove_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
let mut bitmaps = self.bitmaps.write();
let mut row_to_value = self.row_to_value.write();
for &(row_id, values) in entries {
if row_id < 0 {
continue;
}
let row_id_u64 = row_id as u64;
let arc_key = self.value_to_arc_key(values);
if let Some(bitmap) = bitmaps.get_mut(&arc_key) {
bitmap.remove(row_id_u64);
if bitmap.is_empty() {
bitmaps.remove(&arc_key);
self.distinct_count.fetch_sub(1, AtomicOrdering::Relaxed);
}
}
row_to_value.remove(row_id);
}
Ok(())
}
fn column_ids(&self) -> &[i32] {
&self.column_ids
}
fn column_names(&self) -> &[String] {
&self.column_names
}
fn data_types(&self) -> &[DataType] {
&self.data_types
}
fn index_type(&self) -> IndexType {
IndexType::Bitmap
}
fn is_unique(&self) -> bool {
self.is_unique
}
fn find(&self, values: &[Value]) -> Result<Vec<IndexEntry>> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
if values.len() != self.column_ids.len() {
return Err(Error::internal(
"bitmap index requires exact match on all columns",
));
}
let arc_key = self.value_to_arc_key(values);
let bitmaps = self.bitmaps.read();
if let Some(bitmap) = bitmaps.get(&arc_key) {
Ok(bitmap
.iter()
.map(|row_id| IndexEntry {
row_id: row_id as i64,
ref_id: 0,
})
.collect())
} else {
Ok(vec![])
}
}
fn find_range(
&self,
_min: &[Value],
_max: &[Value],
_min_inclusive: bool,
_max_inclusive: bool,
) -> Result<Vec<IndexEntry>> {
Err(Error::internal(
"bitmap index does not efficiently support range queries; use btree index instead",
))
}
fn find_with_operator(&self, op: Operator, values: &[Value]) -> Result<Vec<IndexEntry>> {
match op {
Operator::Eq => self.find(values),
Operator::Ne => {
if values.len() != self.column_ids.len() {
return Err(Error::internal(
"bitmap index requires exact match on all columns",
));
}
let result = if values.len() == 1 {
self.not_value(&values[0])
} else {
let composite = Value::Text(
values
.iter()
.map(|v| format!("{:?}", v))
.collect::<Vec<_>>()
.join("||")
.into(),
);
self.not_value(&composite)
};
Ok(result
.iter()
.map(|row_id| IndexEntry {
row_id: row_id as i64,
ref_id: 0,
})
.collect())
}
_ => Err(Error::internal(format!(
"bitmap index only supports = and != operators, not {:?}",
op
))),
}
}
fn get_row_ids_equal_into(&self, values: &[Value], buffer: &mut Vec<i64>) {
if self.closed.load(AtomicOrdering::Acquire) {
return;
}
if values.len() != self.column_ids.len() {
return;
}
let arc_key = self.value_to_arc_key(values);
let bitmaps = self.bitmaps.read();
if let Some(bitmap) = bitmaps.get(&arc_key) {
buffer.extend(bitmap.iter().map(|row_id| row_id as i64));
}
}
fn get_row_ids_in_range_into(
&self,
_min_value: &[Value],
_max_value: &[Value],
_include_min: bool,
_include_max: bool,
_buffer: &mut Vec<i64>,
) {
}
fn get_filtered_row_ids(&self, expr: &dyn Expression) -> RowIdVec {
let bitmaps = self.bitmaps.read();
let mut all_rows = RoaringTreemap::new();
for bitmap in bitmaps.values() {
all_rows |= bitmap;
}
let _ = expr;
let collected: Vec<i64> = all_rows.iter().map(|id| id as i64).collect();
RowIdVec::from_vec(collected)
}
fn get_all_values(&self) -> Vec<Value> {
let bitmaps = self.bitmaps.read();
bitmaps.keys().map(|arc| (**arc).clone()).collect()
}
fn clear(&self) -> Result<()> {
self.bitmaps.write().clear();
self.row_to_value.write().clear();
self.distinct_count.store(0, AtomicOrdering::Relaxed);
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn close(&mut self) -> Result<()> {
self.closed.store(true, AtomicOrdering::Release);
self.clear()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bitmap_index_basic() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("pending".into())], 2, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 3, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 4, 0).unwrap();
index.add(&[Value::Text("delivered".into())], 5, 0).unwrap();
assert_eq!(index.cardinality(), 3);
let results = index.find(&[Value::Text("pending".into())]).unwrap();
assert_eq!(results.len(), 2);
let results = index.find(&[Value::Text("shipped".into())]).unwrap();
assert_eq!(results.len(), 2);
let results = index.find(&[Value::Text("delivered".into())]).unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_bitmap_index_boolean() {
let index = BitmapIndex::new(
"idx_active".to_string(),
"users".to_string(),
vec!["active".to_string()],
vec![1],
vec![DataType::Boolean],
false,
0,
);
index.add(&[Value::Boolean(true)], 1, 0).unwrap();
index.add(&[Value::Boolean(true)], 2, 0).unwrap();
index.add(&[Value::Boolean(true)], 3, 0).unwrap();
index.add(&[Value::Boolean(false)], 4, 0).unwrap();
index.add(&[Value::Boolean(false)], 5, 0).unwrap();
assert_eq!(index.cardinality(), 2);
let active = index.find(&[Value::Boolean(true)]).unwrap();
assert_eq!(active.len(), 3);
let inactive = index.find(&[Value::Boolean(false)]).unwrap();
assert_eq!(inactive.len(), 2);
}
#[test]
fn test_bitmap_index_and_operation() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 2, 0).unwrap();
let result =
index.and_values(&[Value::Text("pending".into()), Value::Text("shipped".into())]);
assert!(result.is_empty());
}
#[test]
fn test_bitmap_index_or_operation() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("pending".into())], 2, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 3, 0).unwrap();
index.add(&[Value::Text("delivered".into())], 4, 0).unwrap();
let result =
index.or_values(&[Value::Text("pending".into()), Value::Text("shipped".into())]);
assert_eq!(result.len(), 3); }
#[test]
fn test_bitmap_index_not_operation() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("pending".into())], 2, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 3, 0).unwrap();
index.add(&[Value::Text("delivered".into())], 4, 0).unwrap();
let result = index.not_value(&Value::Text("pending".into()));
assert_eq!(result.len(), 2); }
#[test]
fn test_bitmap_index_remove() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("pending".into())], 2, 0).unwrap();
assert_eq!(index.cardinality(), 1);
index
.remove(&[Value::Text("pending".into())], 1, 0)
.unwrap();
let results = index.find(&[Value::Text("pending".into())]).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, 2);
index
.remove(&[Value::Text("pending".into())], 2, 0)
.unwrap();
assert_eq!(index.cardinality(), 0);
}
#[test]
fn test_bitmap_index_update() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
assert_eq!(index.cardinality(), 1);
index.add(&[Value::Text("shipped".into())], 1, 0).unwrap();
assert_eq!(index.cardinality(), 1);
let pending = index.find(&[Value::Text("pending".into())]).unwrap();
assert!(pending.is_empty());
let shipped = index.find(&[Value::Text("shipped".into())]).unwrap();
assert_eq!(shipped.len(), 1);
assert_eq!(shipped[0].row_id, 1);
}
#[test]
fn test_bitmap_index_not_equal() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 2, 0).unwrap();
index.add(&[Value::Text("delivered".into())], 3, 0).unwrap();
let results = index
.find_with_operator(Operator::Ne, &[Value::Text("pending".into())])
.unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_bitmap_index_high_cardinality_check() {
let index = BitmapIndex::new(
"idx_id".to_string(),
"items".to_string(),
vec!["id".to_string()],
vec![1],
vec![DataType::Integer],
false,
0,
);
for i in 0..HIGH_CARDINALITY_WARNING_THRESHOLD + 100 {
index.add(&[Value::Integer(i as i64)], i as i64, 0).unwrap();
}
assert!(index.is_high_cardinality());
}
#[test]
fn test_bitmap_index_null_handling() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index.add(&[Value::Null(DataType::Text)], 1, 0).unwrap();
index.add(&[Value::Null(DataType::Text)], 2, 0).unwrap();
index.add(&[Value::Text("active".into())], 3, 0).unwrap();
let nulls = index.find(&[Value::Null(DataType::Text)]).unwrap();
assert_eq!(nulls.len(), 2);
}
#[test]
fn test_bitmap_index_unique_constraint() {
let index = BitmapIndex::new(
"idx_status_unique".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
true, 0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
let result = index.add(&[Value::Text("pending".into())], 2, 0);
assert!(result.is_err());
index.add(&[Value::Null(DataType::Text)], 3, 0).unwrap();
index.add(&[Value::Null(DataType::Text)], 4, 0).unwrap();
}
#[test]
fn test_bitmap_index_large_row_ids() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
let large_row_id_1: i64 = (u32::MAX as i64) + 1; let large_row_id_2: i64 = (u32::MAX as i64) + 1000; let large_row_id_3: i64 = i64::MAX / 2;
index
.add(&[Value::Text("active".into())], large_row_id_1, 0)
.unwrap();
index
.add(&[Value::Text("active".into())], large_row_id_2, 0)
.unwrap();
index
.add(&[Value::Text("inactive".into())], large_row_id_3, 0)
.unwrap();
let results = index.find(&[Value::Text("active".into())]).unwrap();
assert_eq!(results.len(), 2);
assert!(results.iter().any(|e| e.row_id == large_row_id_1));
assert!(results.iter().any(|e| e.row_id == large_row_id_2));
let results = index.find(&[Value::Text("inactive".into())]).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, large_row_id_3);
index
.remove(&[Value::Text("active".into())], large_row_id_1, 0)
.unwrap();
let results = index.find(&[Value::Text("active".into())]).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, large_row_id_2);
}
#[test]
fn test_bitmap_index_rejects_negative_row_ids() {
let index = BitmapIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
let result = index.add(&[Value::Text("pending".into())], -1, 0);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("non-negative"));
let result = index.add(&[Value::Text("pending".into())], i64::MIN, 0);
assert!(result.is_err());
let result = index.remove(&[Value::Text("pending".into())], -1, 0);
assert!(result.is_err());
}
}