use std::collections::{BTreeMap, HashMap};
use std::ops::Bound;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::RwLock;
use ahash::AHashMap;
use rayon::prelude::*;
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use crate::core::{DataType, Error, IndexEntry, IndexType, Operator, Result, Value};
use crate::storage::expression::Expression;
use crate::storage::traits::Index;
const PARALLEL_FILTER_THRESHOLD: usize = 10_000;
type RowIdSet = SmallVec<[i64; 4]>;
pub struct BTreeIndex {
name: String,
table_name: String,
column_id: i32,
column_name: String,
data_type: DataType,
unique: bool,
closed: AtomicBool,
sorted_values: RwLock<BTreeMap<Value, RowIdSet>>,
value_to_rows: RwLock<AHashMap<Value, RowIdSet>>,
row_to_value: RwLock<FxHashMap<i64, Value>>,
cached_min: RwLock<Option<Value>>,
cached_max: RwLock<Option<Value>>,
cache_valid: AtomicBool,
next_ref_id: RwLock<i64>,
}
impl BTreeIndex {
pub fn new(
name: String,
table_name: String,
column_id: i32,
column_name: String,
data_type: DataType,
unique: bool,
) -> Self {
Self {
name,
table_name,
column_id,
column_name,
data_type,
unique,
closed: AtomicBool::new(false),
sorted_values: RwLock::new(BTreeMap::new()),
value_to_rows: RwLock::new(AHashMap::default()),
row_to_value: RwLock::new(FxHashMap::default()),
cached_min: RwLock::new(None),
cached_max: RwLock::new(None),
cache_valid: AtomicBool::new(true),
next_ref_id: RwLock::new(0),
}
}
pub fn with_custom_name(
table_name: String,
column_id: i32,
column_name: String,
data_type: DataType,
unique: bool,
custom_name: Option<&str>,
) -> Self {
let name = custom_name
.map(|s| s.to_string())
.unwrap_or_else(|| format!("idx_{}_{}_btree", table_name, column_name));
Self::new(name, table_name, column_id, column_name, data_type, unique)
}
fn check_closed(&self) -> Result<()> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
Ok(())
}
fn next_ref(&self) -> i64 {
let mut ref_id = self.next_ref_id.write().unwrap();
*ref_id += 1;
*ref_id
}
pub fn unique_value_count(&self) -> usize {
let sorted_values = self.sorted_values.read().unwrap();
sorted_values.len()
}
pub fn entry_count(&self) -> usize {
let row_to_value = self.row_to_value.read().unwrap();
row_to_value.len()
}
pub fn get_all_values(&self) -> Vec<Value> {
let sorted_values = self.sorted_values.read().unwrap();
sorted_values.keys().cloned().collect()
}
pub fn get_row_ids_for_value(&self, value: &Value) -> Vec<i64> {
let value_to_rows = self.value_to_rows.read().unwrap();
value_to_rows
.get(value)
.map(|set| set.iter().copied().collect())
.unwrap_or_default()
}
pub fn contains_value(&self, value: &Value) -> bool {
let value_to_rows = self.value_to_rows.read().unwrap();
value_to_rows.contains_key(value)
}
pub fn contains_row(&self, row_id: i64) -> bool {
let row_to_value = self.row_to_value.read().unwrap();
row_to_value.contains_key(&row_id)
}
#[inline]
fn invalidate_cache(&self) {
self.cache_valid.store(false, AtomicOrdering::Release);
}
fn remove_row_from_indexes(&self, row_id: i64, value: &Value) {
{
let mut value_to_rows = self.value_to_rows.write().unwrap();
if let Some(rows) = value_to_rows.get_mut(value) {
if let Some(pos) = rows.iter().position(|&id| id == row_id) {
rows.remove(pos);
}
if rows.is_empty() {
value_to_rows.remove(value);
}
}
}
{
let mut sorted_values = self.sorted_values.write().unwrap();
if let Some(rows) = sorted_values.get_mut(value) {
if let Some(pos) = rows.iter().position(|&id| id == row_id) {
rows.remove(pos);
}
if rows.is_empty() {
sorted_values.remove(value);
}
}
}
}
fn update_cache_if_needed(&self) {
if self.cache_valid.load(AtomicOrdering::Acquire) {
return;
}
let sorted_values = self.sorted_values.read().unwrap();
let min = sorted_values
.iter()
.find(|(v, _)| !v.is_null())
.map(|(v, _)| v.clone());
let max = sorted_values
.iter()
.rev()
.find(|(v, _)| !v.is_null())
.map(|(v, _)| v.clone());
drop(sorted_values);
{
let mut cached_min = self.cached_min.write().unwrap();
let mut cached_max = self.cached_max.write().unwrap();
*cached_min = min;
*cached_max = max;
}
self.cache_valid.store(true, AtomicOrdering::Release);
}
fn find_with_op(&self, op: Operator, value: &Value) -> Vec<i64> {
match op {
Operator::Eq | Operator::In => {
let value_to_rows = self.value_to_rows.read().unwrap();
value_to_rows
.get(value)
.map(|rows| rows.iter().copied().collect())
.unwrap_or_default()
}
Operator::Lt => {
let sorted_values = self.sorted_values.read().unwrap();
let capacity = sorted_values.len() / 4; let mut results = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range(..value.clone()) {
results.extend(rows.iter().copied());
}
results
}
Operator::Lte => {
let sorted_values = self.sorted_values.read().unwrap();
let capacity = sorted_values.len() / 4;
let mut results = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range(..=value.clone()) {
results.extend(rows.iter().copied());
}
results
}
Operator::Gt => {
let sorted_values = self.sorted_values.read().unwrap();
let capacity = sorted_values.len() / 4;
let mut results = Vec::with_capacity(capacity);
for (_, rows) in
sorted_values.range((Bound::Excluded(value.clone()), Bound::Unbounded))
{
results.extend(rows.iter().copied());
}
results
}
Operator::Gte => {
let sorted_values = self.sorted_values.read().unwrap();
let capacity = sorted_values.len() / 4;
let mut results = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range(value.clone()..) {
results.extend(rows.iter().copied());
}
results
}
Operator::Ne | Operator::NotIn => {
let value_to_rows = self.value_to_rows.read().unwrap();
let capacity = value_to_rows.len();
let mut results = Vec::with_capacity(capacity);
for (v, rows) in value_to_rows.iter() {
if v != value {
results.extend(rows.iter().copied());
}
}
results
}
Operator::Like => Vec::new(),
Operator::IsNull => {
let value_to_rows = self.value_to_rows.read().unwrap();
let mut results = Vec::new();
for (v, rows) in value_to_rows.iter() {
if v.is_null() {
results.extend(rows.iter().copied());
}
}
results
}
Operator::IsNotNull => {
let value_to_rows = self.value_to_rows.read().unwrap();
let capacity = value_to_rows.len();
let mut results = Vec::with_capacity(capacity);
for (v, rows) in value_to_rows.iter() {
if !v.is_null() {
results.extend(rows.iter().copied());
}
}
results
}
}
}
}
impl Index for BTreeIndex {
fn name(&self) -> &str {
&self.name
}
fn table_name(&self) -> &str {
&self.table_name
}
fn build(&mut self) -> Result<()> {
self.check_closed()?;
Ok(())
}
fn add(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
self.check_closed()?;
if values.is_empty() {
return Ok(());
}
let value = values[0].clone();
let existing_value = {
let row_to_value = self.row_to_value.read().unwrap();
row_to_value.get(&row_id).cloned()
};
if let Some(ref old_value) = existing_value {
if old_value == &value {
return Ok(());
}
self.remove_row_from_indexes(row_id, old_value);
}
if self.unique && !value.is_null() {
let value_to_rows = self.value_to_rows.read().unwrap();
if let Some(rows) = value_to_rows.get(&value) {
for existing_row_id in rows.iter() {
if *existing_row_id != row_id {
return Err(Error::unique_constraint(
&self.name,
&self.column_name,
format!("{:?}", value),
));
}
}
}
}
{
let mut value_to_rows = self.value_to_rows.write().unwrap();
let mut sorted_values = self.sorted_values.write().unwrap();
let mut row_to_value = self.row_to_value.write().unwrap();
let value_for_hash = value.clone();
let value_for_btree = value.clone();
value_to_rows
.entry(value_for_hash)
.or_default()
.push(row_id);
sorted_values
.entry(value_for_btree)
.or_default()
.push(row_id);
row_to_value.insert(row_id, value);
}
self.invalidate_cache();
Ok(())
}
fn add_batch(&self, entries: &HashMap<i64, Vec<Value>>) -> Result<()> {
self.check_closed()?;
for (row_id, values) in entries {
self.add(values, *row_id, self.next_ref())?;
}
Ok(())
}
fn remove(&self, _values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
self.check_closed()?;
let stored_value = {
let row_to_value = self.row_to_value.read().unwrap();
row_to_value.get(&row_id).cloned()
};
if let Some(value) = stored_value {
{
let mut value_to_rows = self.value_to_rows.write().unwrap();
if let Some(rows) = value_to_rows.get_mut(&value) {
if let Some(pos) = rows.iter().position(|&id| id == row_id) {
rows.remove(pos);
}
if rows.is_empty() {
value_to_rows.remove(&value);
}
}
}
{
let mut sorted_values = self.sorted_values.write().unwrap();
if let Some(rows) = sorted_values.get_mut(&value) {
if let Some(pos) = rows.iter().position(|&id| id == row_id) {
rows.remove(pos);
}
if rows.is_empty() {
sorted_values.remove(&value);
}
}
}
{
let mut row_to_value = self.row_to_value.write().unwrap();
row_to_value.remove(&row_id);
}
self.invalidate_cache();
}
Ok(())
}
fn remove_batch(&self, entries: &HashMap<i64, Vec<Value>>) -> Result<()> {
self.check_closed()?;
for (row_id, values) in entries {
self.remove(values, *row_id, 0)?;
}
Ok(())
}
fn column_ids(&self) -> &[i32] {
std::slice::from_ref(&self.column_id)
}
fn column_names(&self) -> &[String] {
std::slice::from_ref(&self.column_name)
}
fn data_types(&self) -> &[DataType] {
std::slice::from_ref(&self.data_type)
}
fn index_type(&self) -> IndexType {
IndexType::BTree
}
fn is_unique(&self) -> bool {
self.unique
}
fn find(&self, values: &[Value]) -> Result<Vec<IndexEntry>> {
self.check_closed()?;
if values.is_empty() {
return Ok(Vec::new());
}
let value = &values[0];
let value_to_rows = self.value_to_rows.read().unwrap();
let entries = value_to_rows
.get(value)
.map(|rows| {
rows.iter()
.map(|&row_id| IndexEntry {
row_id,
ref_id: row_id, })
.collect()
})
.unwrap_or_default();
Ok(entries)
}
fn find_range(
&self,
min: &[Value],
max: &[Value],
min_inclusive: bool,
max_inclusive: bool,
) -> Result<Vec<IndexEntry>> {
self.check_closed()?;
if min.is_empty() || max.is_empty() {
return Ok(Vec::new());
}
let min_val = &min[0];
let max_val = &max[0];
let sorted_values = self.sorted_values.read().unwrap();
let min_bound = if min_inclusive {
Bound::Included(min_val.clone())
} else {
Bound::Excluded(min_val.clone())
};
let max_bound = if max_inclusive {
Bound::Included(max_val.clone())
} else {
Bound::Excluded(max_val.clone())
};
let capacity = sorted_values.len() / 4;
let mut entries = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range((min_bound, max_bound)) {
for &row_id in rows.iter() {
entries.push(IndexEntry {
row_id,
ref_id: row_id,
});
}
}
Ok(entries)
}
fn find_with_operator(&self, op: Operator, values: &[Value]) -> Result<Vec<IndexEntry>> {
self.check_closed()?;
if values.is_empty() {
return Ok(Vec::new());
}
let value = &values[0];
let row_ids = self.find_with_op(op, value);
Ok(row_ids
.into_iter()
.map(|row_id| IndexEntry {
row_id,
ref_id: row_id,
})
.collect())
}
fn get_row_ids_equal(&self, values: &[Value]) -> Vec<i64> {
if values.is_empty() {
return Vec::new();
}
let value = &values[0];
let value_to_rows = self.value_to_rows.read().unwrap();
value_to_rows
.get(value)
.map(|rows| rows.iter().copied().collect())
.unwrap_or_default()
}
fn get_row_ids_in_range(
&self,
min_value: &[Value],
max_value: &[Value],
include_min: bool,
include_max: bool,
) -> Vec<i64> {
if min_value.is_empty() || max_value.is_empty() {
return Vec::new();
}
self.find_range(min_value, max_value, include_min, include_max)
.map(|entries| entries.into_iter().map(|e| e.row_id).collect())
.unwrap_or_default()
}
fn get_filtered_row_ids(&self, expr: &dyn Expression) -> Vec<i64> {
if self.closed.load(AtomicOrdering::Acquire) {
return Vec::new();
}
if let Some((col_name, operator, value)) = expr.get_comparison_info() {
if col_name == self.column_name {
match operator {
Operator::Eq => {
return self.get_row_ids_equal(std::slice::from_ref(value));
}
Operator::Gt | Operator::Gte | Operator::Lt | Operator::Lte => {
let value_slice = std::slice::from_ref(value);
let empty_slice: &[Value] = &[];
let (min_vals, max_vals, include_min, include_max) = match operator {
Operator::Gt => (value_slice, empty_slice, false, false),
Operator::Gte => (value_slice, empty_slice, true, false),
Operator::Lt => (empty_slice, value_slice, false, false),
Operator::Lte => (empty_slice, value_slice, false, true),
_ => return Vec::new(),
};
return self.get_row_ids_in_range(
min_vals,
max_vals,
include_min,
include_max,
);
}
_ => {}
}
}
}
let comparisons = expr.collect_comparisons();
if !comparisons.is_empty() {
let mut min_val: Option<&Value> = None;
let mut max_val: Option<&Value> = None;
let mut include_min = false;
let mut include_max = false;
let mut eq_val: Option<&Value> = None;
for (col_name, op, val) in &comparisons {
if *col_name != self.column_name {
continue;
}
match op {
Operator::Eq => eq_val = Some(*val),
Operator::Gt => {
min_val = Some(*val);
include_min = false;
}
Operator::Gte => {
min_val = Some(*val);
include_min = true;
}
Operator::Lt => {
max_val = Some(*val);
include_max = false;
}
Operator::Lte => {
max_val = Some(*val);
include_max = true;
}
_ => {}
}
}
if let Some(val) = eq_val {
return self.get_row_ids_equal(std::slice::from_ref(val));
}
let empty_slice: &[Value] = &[];
if min_val.is_some() || max_val.is_some() {
let min_vals = min_val.map(std::slice::from_ref).unwrap_or(empty_slice);
let max_vals = max_val.map(std::slice::from_ref).unwrap_or(empty_slice);
return self.get_row_ids_in_range(min_vals, max_vals, include_min, include_max);
}
}
let row_to_value = self.row_to_value.read().unwrap();
if row_to_value.len() >= PARALLEL_FILTER_THRESHOLD {
let pairs: Vec<_> = row_to_value.iter().collect();
pairs
.par_iter()
.filter_map(|(&row_id, value)| {
let row = crate::core::Row::from_values(vec![(*value).clone()]);
if expr.evaluate(&row).unwrap_or(false) {
Some(row_id)
} else {
None
}
})
.collect()
} else {
let mut results = Vec::with_capacity(row_to_value.len() / 4);
for (&row_id, value) in row_to_value.iter() {
let row = crate::core::Row::from_values(vec![(*value).clone()]);
if expr.evaluate(&row).unwrap_or(false) {
results.push(row_id);
}
}
results
}
}
fn get_min_value(&self) -> Option<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
self.update_cache_if_needed();
let cached_min = self.cached_min.read().unwrap();
cached_min.clone()
}
fn get_max_value(&self) -> Option<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
self.update_cache_if_needed();
let cached_max = self.cached_max.read().unwrap();
cached_max.clone()
}
fn get_all_values(&self) -> Vec<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return Vec::new();
}
let sorted_values = self.sorted_values.read().unwrap();
sorted_values.keys().cloned().collect()
}
fn get_row_ids_ordered(
&self,
ascending: bool,
limit: usize,
offset: usize,
) -> Option<Vec<i64>> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
let sorted_values = self.sorted_values.read().unwrap();
let mut result = Vec::with_capacity(limit.min(128));
let mut skipped = 0;
if ascending {
'outer: for row_ids in sorted_values.values() {
for &row_id in row_ids {
if skipped < offset {
skipped += 1;
continue;
}
result.push(row_id);
if result.len() >= limit {
break 'outer;
}
}
}
} else {
'outer: for row_ids in sorted_values.values().rev() {
for &row_id in row_ids {
if skipped < offset {
skipped += 1;
continue;
}
result.push(row_id);
if result.len() >= limit {
break 'outer;
}
}
}
}
Some(result)
}
fn close(&mut self) -> Result<()> {
self.closed.store(true, AtomicOrdering::Release);
{
let mut sorted_values = self.sorted_values.write().unwrap();
sorted_values.clear();
}
{
let mut value_to_rows = self.value_to_rows.write().unwrap();
value_to_rows.clear();
}
{
let mut row_to_value = self.row_to_value.write().unwrap();
row_to_value.clear();
}
{
let mut cached_min = self.cached_min.write().unwrap();
*cached_min = None;
}
{
let mut cached_max = self.cached_max.write().unwrap();
*cached_max = None;
}
Ok(())
}
}
pub fn intersect_sorted_ids(a: &[i64], b: &[i64]) -> Vec<i64> {
if a.is_empty() || b.is_empty() {
return Vec::new();
}
let (a_min, a_max) = (a[0], a[a.len() - 1]);
let (b_min, b_max) = (b[0], b[b.len() - 1]);
if a_max < b_min || b_max < a_min {
return Vec::new();
}
let (smaller, larger) = if a.len() <= b.len() { (a, b) } else { (b, a) };
let mut result = Vec::with_capacity(smaller.len());
for &val in smaller {
if larger.binary_search(&val).is_ok() {
result.push(val);
}
}
result
}
pub fn intersect_multiple_sorted_ids(slices: &[Vec<i64>]) -> Vec<i64> {
if slices.is_empty() {
return Vec::new();
}
if slices.len() == 1 {
return slices[0].clone();
}
let mut result = slices[0].clone();
for slice in &slices[1..] {
result = intersect_sorted_ids(&result, slice);
if result.is_empty() {
break;
}
}
result
}
pub fn union_sorted_ids(a: &[i64], b: &[i64]) -> Vec<i64> {
if a.is_empty() {
return b.to_vec();
}
if b.is_empty() {
return a.to_vec();
}
let mut result = Vec::with_capacity(a.len() + b.len());
let mut i = 0;
let mut j = 0;
while i < a.len() && j < b.len() {
match a[i].cmp(&b[j]) {
std::cmp::Ordering::Less => {
result.push(a[i]);
i += 1;
}
std::cmp::Ordering::Greater => {
result.push(b[j]);
j += 1;
}
std::cmp::Ordering::Equal => {
result.push(a[i]);
i += 1;
j += 1;
}
}
}
result.extend_from_slice(&a[i..]);
result.extend_from_slice(&b[j..]);
result
}
pub fn union_multiple_sorted_ids(slices: &[Vec<i64>]) -> Vec<i64> {
if slices.is_empty() {
return Vec::new();
}
if slices.len() == 1 {
return slices[0].clone();
}
let mut result = slices[0].clone();
for slice in &slices[1..] {
result = union_sorted_ids(&result, slice);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_index() -> BTreeIndex {
BTreeIndex::new(
"idx_test".to_string(),
"test_table".to_string(),
0,
"id".to_string(),
DataType::Integer,
false,
)
}
#[test]
fn test_btree_index_creation() {
let index = create_test_index();
assert_eq!(index.name(), "idx_test");
assert_eq!(index.table_name(), "test_table");
assert_eq!(index.column_names()[0], "id");
assert_eq!(index.index_type(), IndexType::BTree);
assert!(!index.is_unique());
}
#[test]
fn test_btree_index_with_custom_name() {
let index = BTreeIndex::with_custom_name(
"users".to_string(),
1,
"email".to_string(),
DataType::Text,
true,
Some("custom_email_idx"),
);
assert_eq!(index.name(), "custom_email_idx");
assert!(index.is_unique());
}
#[test]
fn test_btree_index_add_and_find() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(200)], 2, 2).unwrap();
index.add(&[Value::Integer(100)], 3, 3).unwrap();
assert_eq!(index.unique_value_count(), 2);
assert_eq!(index.entry_count(), 3);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 2);
let entries = index.find(&[Value::Integer(999)]).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_btree_index_unique_constraint() {
let index = BTreeIndex::new(
"idx_unique".to_string(),
"test".to_string(),
0,
"id".to_string(),
DataType::Integer,
true, );
assert!(index.add(&[Value::Integer(1)], 1, 1).is_ok());
let result = index.add(&[Value::Integer(1)], 2, 2);
assert!(result.is_err());
assert!(index.add(&[Value::null_unknown()], 3, 3).is_ok());
assert!(index.add(&[Value::null_unknown()], 4, 4).is_ok());
}
#[test]
fn test_btree_index_remove() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
index.add(&[Value::Integer(200)], 3, 3).unwrap();
assert_eq!(index.entry_count(), 3);
index.remove(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 2);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 2);
index.remove(&[Value::Integer(100)], 2, 2).unwrap();
assert!(!index.contains_value(&Value::Integer(100)));
}
#[test]
fn test_btree_index_range_query() {
let index = create_test_index();
for i in 1..=10 {
index.add(&[Value::Integer(i * 10)], i, i).unwrap();
}
let entries = index
.find_range(&[Value::Integer(30)], &[Value::Integer(70)], true, true)
.unwrap();
assert_eq!(entries.len(), 5);
let entries = index
.find_range(&[Value::Integer(30)], &[Value::Integer(70)], false, false)
.unwrap();
assert_eq!(entries.len(), 3); }
#[test]
fn test_btree_index_operators() {
let index = create_test_index();
for i in 1..=5 {
index.add(&[Value::Integer(i * 10)], i, i).unwrap();
}
let entries = index
.find_with_operator(Operator::Lt, &[Value::Integer(30)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::Gte, &[Value::Integer(40)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::Ne, &[Value::Integer(30)])
.unwrap();
assert_eq!(entries.len(), 4); }
#[test]
fn test_btree_index_batch_operations() {
let index = create_test_index();
let mut entries = HashMap::new();
entries.insert(1, vec![Value::Integer(100)]);
entries.insert(2, vec![Value::Integer(200)]);
entries.insert(3, vec![Value::Integer(100)]);
index.add_batch(&entries).unwrap();
assert_eq!(index.entry_count(), 3);
let mut to_remove = HashMap::new();
to_remove.insert(1, vec![Value::Integer(100)]);
to_remove.insert(2, vec![Value::Integer(200)]);
index.remove_batch(&to_remove).unwrap();
assert_eq!(index.entry_count(), 1);
}
#[test]
fn test_btree_index_null_handling() {
let index = create_test_index();
index.add(&[Value::null_unknown()], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
index.add(&[Value::null_unknown()], 3, 3).unwrap();
let entries = index
.find_with_operator(Operator::IsNull, &[Value::null_unknown()])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::IsNotNull, &[Value::null_unknown()])
.unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_btree_index_text_values() {
let index = BTreeIndex::new(
"idx_name".to_string(),
"test".to_string(),
0,
"name".to_string(),
DataType::Text,
false,
);
index.add(&[Value::text("Alice")], 1, 1).unwrap();
index.add(&[Value::text("Bob")], 2, 2).unwrap();
index.add(&[Value::text("Charlie")], 3, 3).unwrap();
let entries = index
.find_range(&[Value::text("Alice")], &[Value::text("Bob")], true, true)
.unwrap();
assert_eq!(entries.len(), 2); }
#[test]
fn test_btree_index_close() {
let mut index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 1);
index.close().unwrap();
assert!(index.add(&[Value::Integer(200)], 2, 2).is_err());
assert!(index.find(&[Value::Integer(100)]).is_err());
}
#[test]
fn test_btree_index_get_all_values() {
let index = create_test_index();
index.add(&[Value::Integer(30)], 1, 1).unwrap();
index.add(&[Value::Integer(10)], 2, 2).unwrap();
index.add(&[Value::Integer(20)], 3, 3).unwrap();
let values = index.get_all_values();
assert_eq!(values.len(), 3);
assert!(values.contains(&Value::Integer(10)));
assert!(values.contains(&Value::Integer(20)));
assert!(values.contains(&Value::Integer(30)));
}
#[test]
fn test_btree_index_row_id_helpers() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
index.add(&[Value::Integer(200)], 3, 3).unwrap();
let row_ids = index.get_row_ids_equal(&[Value::Integer(100)]);
assert_eq!(row_ids.len(), 2);
let row_ids =
index.get_row_ids_in_range(&[Value::Integer(100)], &[Value::Integer(200)], true, true);
assert_eq!(row_ids.len(), 3);
}
#[test]
fn test_btree_index_lte_gte_operators() {
let index = create_test_index();
for i in 1..=5 {
index.add(&[Value::Integer(i * 10)], i, i).unwrap();
}
let entries = index
.find_with_operator(Operator::Lte, &[Value::Integer(30)])
.unwrap();
assert_eq!(entries.len(), 3);
let entries = index
.find_with_operator(Operator::Gt, &[Value::Integer(20)])
.unwrap();
assert_eq!(entries.len(), 3); }
#[test]
fn test_mixed_integer_float_ordering() {
let index = BTreeIndex::new(
"idx_mixed".to_string(),
"test".to_string(),
0,
"value".to_string(),
DataType::Float, false,
);
index.add(&[Value::Integer(1)], 1, 1).unwrap();
index.add(&[Value::Float(2.5)], 2, 2).unwrap();
index.add(&[Value::Integer(3)], 3, 3).unwrap();
index.add(&[Value::Float(1.5)], 4, 4).unwrap();
index.add(&[Value::Float(3.0)], 5, 5).unwrap();
assert_eq!(index.entry_count(), 5);
let entries = index
.find_with_operator(Operator::Gte, &[Value::Integer(2)])
.unwrap();
assert_eq!(entries.len(), 3);
let entries = index
.find_with_operator(Operator::Lt, &[Value::Float(2.0)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index.find(&[Value::Integer(3)]).unwrap();
assert!(!entries.is_empty()); }
#[test]
fn test_nan_handling_in_range_queries() {
let index = BTreeIndex::new(
"idx_nan".to_string(),
"test".to_string(),
0,
"value".to_string(),
DataType::Float,
false,
);
index.add(&[Value::Float(1.0)], 1, 1).unwrap();
index.add(&[Value::Float(f64::NAN)], 2, 2).unwrap();
index.add(&[Value::Float(2.0)], 3, 3).unwrap();
index.add(&[Value::Float(f64::INFINITY)], 4, 4).unwrap();
index.add(&[Value::Float(f64::NEG_INFINITY)], 5, 5).unwrap();
assert_eq!(index.entry_count(), 5);
let entries = index
.find_with_operator(Operator::Gt, &[Value::Float(100.0)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::Lt, &[Value::Float(f64::NAN)])
.unwrap();
assert_eq!(entries.len(), 4);
}
#[test]
fn test_high_cardinality_duplicates() {
let index = BTreeIndex::new(
"idx_highcard".to_string(),
"test".to_string(),
0,
"status".to_string(),
DataType::Text,
false,
);
for i in 0..100 {
let status = match i % 3 {
0 => "active",
1 => "pending",
_ => "completed",
};
index
.add(&[Value::text(status)], i as i64, i as i64)
.unwrap();
}
assert_eq!(index.unique_value_count(), 3);
assert_eq!(index.entry_count(), 100);
let entries = index.find(&[Value::text("active")]).unwrap();
assert_eq!(entries.len(), 34);
let entries = index.find(&[Value::text("pending")]).unwrap();
assert_eq!(entries.len(), 33);
index.add(&[Value::text("archived")], 0, 0).unwrap();
assert_eq!(index.unique_value_count(), 4);
let entries = index.find(&[Value::text("active")]).unwrap();
assert_eq!(entries.len(), 33);
let entries = index.find(&[Value::text("archived")]).unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_update_same_value_is_noop() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 1);
index.add(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 1);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 1);
}
#[test]
fn test_update_different_value_moves_row() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
assert_eq!(index.entry_count(), 2);
index.add(&[Value::Integer(200)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 2);
assert_eq!(index.unique_value_count(), 2);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 2);
let entries = index.find(&[Value::Integer(200)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 1);
}
}