use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::ops::Bound;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use crate::common::{CompactArc, CompactVec, I64Map};
use crate::core::{DataType, Error, IndexEntry, IndexType, Operator, Result, RowIdVec, Value};
use crate::storage::expression::Expression;
use crate::storage::traits::Index;
#[cfg(feature = "parallel")]
const PARALLEL_FILTER_THRESHOLD: usize = 10_000;
type RowIdSet = CompactVec<i64>;
pub struct BTreeIndex {
name: String,
table_name: String,
column_id: i32,
column_name: String,
data_type: DataType,
unique: bool,
closed: AtomicBool,
sorted_values: RwLock<BTreeMap<CompactArc<Value>, RowIdSet>>,
row_to_value: RwLock<I64Map<CompactArc<Value>>>,
cached_min: RwLock<Option<CompactArc<Value>>>,
cached_max: RwLock<Option<CompactArc<Value>>>,
cache_valid: AtomicBool,
next_ref_id: RwLock<i64>,
}
impl BTreeIndex {
pub fn new(
name: String,
table_name: String,
column_id: i32,
column_name: String,
data_type: DataType,
unique: bool,
expected_rows: usize,
) -> Self {
Self {
name,
table_name,
column_id,
column_name,
data_type,
unique,
closed: AtomicBool::new(false),
sorted_values: RwLock::new(BTreeMap::new()),
row_to_value: RwLock::new(if expected_rows > 0 {
I64Map::with_capacity(expected_rows)
} else {
I64Map::new()
}),
cached_min: RwLock::new(None),
cached_max: RwLock::new(None),
cache_valid: AtomicBool::new(true),
next_ref_id: RwLock::new(0),
}
}
pub fn with_custom_name(
table_name: String,
column_id: i32,
column_name: String,
data_type: DataType,
unique: bool,
custom_name: Option<&str>,
expected_rows: usize,
) -> Self {
let name = custom_name
.map(|s| s.to_string())
.unwrap_or_else(|| format!("idx_{}_{}_btree", table_name, column_name));
Self::new(
name,
table_name,
column_id,
column_name,
data_type,
unique,
expected_rows,
)
}
fn check_closed(&self) -> Result<()> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
Ok(())
}
fn next_ref(&self) -> i64 {
let mut ref_id = self.next_ref_id.write();
*ref_id += 1;
*ref_id
}
pub fn unique_value_count(&self) -> usize {
let sorted_values = self.sorted_values.read();
sorted_values.len()
}
pub fn entry_count(&self) -> usize {
let row_to_value = self.row_to_value.read();
row_to_value.len()
}
pub fn get_all_values(&self) -> Vec<Value> {
let sorted_values = self.sorted_values.read();
sorted_values.keys().map(|arc| (**arc).clone()).collect()
}
pub fn get_row_ids_for_value(&self, value: &Value) -> Vec<i64> {
let sorted_values = self.sorted_values.read();
sorted_values
.get(value)
.map(|set| set.iter().copied().collect())
.unwrap_or_default()
}
pub fn contains_value(&self, value: &Value) -> bool {
let sorted_values = self.sorted_values.read();
sorted_values.contains_key(value)
}
pub fn contains_row(&self, row_id: i64) -> bool {
let row_to_value = self.row_to_value.read();
row_to_value.contains_key(row_id)
}
#[inline]
fn invalidate_cache(&self) {
self.cache_valid.store(false, AtomicOrdering::Release);
}
fn update_cache_if_needed(&self) {
if self.cache_valid.load(AtomicOrdering::Acquire) {
return;
}
let sorted_values = self.sorted_values.read();
let min = sorted_values
.iter()
.find(|(v, _)| !v.is_null())
.map(|(v, _)| CompactArc::clone(v));
let max = sorted_values
.iter()
.rev()
.find(|(v, _)| !v.is_null())
.map(|(v, _)| CompactArc::clone(v));
{
let mut cached_min = self.cached_min.write();
let mut cached_max = self.cached_max.write();
*cached_min = min;
*cached_max = max;
}
self.cache_valid.store(true, AtomicOrdering::Release);
}
fn find_with_op(&self, op: Operator, value: &Value) -> Vec<i64> {
let sorted_values = self.sorted_values.read();
match op {
Operator::Eq | Operator::In => sorted_values
.get(value)
.map(|rows| rows.iter().copied().collect())
.unwrap_or_default(),
Operator::Lt => {
let lookup_key = CompactArc::new(value.clone());
let capacity = sorted_values.len() / 4; let mut results = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range(..lookup_key) {
results.extend_from_slice(rows.as_slice());
}
results
}
Operator::Lte => {
let lookup_key = CompactArc::new(value.clone());
let capacity = sorted_values.len() / 4;
let mut results = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range(..=lookup_key) {
results.extend_from_slice(rows.as_slice());
}
results
}
Operator::Gt => {
let lookup_key = CompactArc::new(value.clone());
let capacity = sorted_values.len() / 4;
let mut results = Vec::with_capacity(capacity);
for (_, rows) in
sorted_values.range((Bound::Excluded(lookup_key), Bound::Unbounded))
{
results.extend_from_slice(rows.as_slice());
}
results
}
Operator::Gte => {
let lookup_key = CompactArc::new(value.clone());
let capacity = sorted_values.len() / 4;
let mut results = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range(lookup_key..) {
results.extend_from_slice(rows.as_slice());
}
results
}
Operator::Ne | Operator::NotIn => {
let capacity = sorted_values.len();
let mut results = Vec::with_capacity(capacity);
for (v, rows) in sorted_values.iter() {
if v.as_ref() != value {
results.extend_from_slice(rows.as_slice());
}
}
results
}
Operator::Like => Vec::new(),
Operator::IsNull => {
let mut results = Vec::new();
for (v, rows) in sorted_values.iter() {
if v.is_null() {
results.extend_from_slice(rows.as_slice());
}
}
results
}
Operator::IsNotNull => {
let capacity = sorted_values.len();
let mut results = Vec::with_capacity(capacity);
for (v, rows) in sorted_values.iter() {
if !v.is_null() {
results.extend_from_slice(rows.as_slice());
}
}
results
}
}
}
}
impl Index for BTreeIndex {
fn name(&self) -> &str {
&self.name
}
fn table_name(&self) -> &str {
&self.table_name
}
fn build(&mut self) -> Result<()> {
self.check_closed()?;
Ok(())
}
fn add(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
self.check_closed()?;
if values.is_empty() {
return Ok(());
}
let value = &values[0];
let mut sorted_values = self.sorted_values.write();
let mut row_to_value = self.row_to_value.write();
if let Some(old_arc) = row_to_value.get(row_id) {
if old_arc.as_ref() == value {
return Ok(());
}
let old_arc = old_arc.clone();
if let Some(rows) = sorted_values.get_mut(&old_arc) {
if let Ok(pos) = rows.binary_search(&row_id) {
rows.remove(pos);
}
if rows.is_empty() {
sorted_values.remove(&old_arc);
}
}
}
if self.unique && !value.is_null() {
if let Some(rows) = sorted_values.get(value) {
for existing_row_id in rows.iter() {
if *existing_row_id != row_id {
return Err(Error::unique_constraint(
&self.name,
&self.column_name,
format!("{:?}", value),
));
}
}
}
}
let arc_value = if let Some((existing_arc, _)) = sorted_values.get_key_value(value) {
CompactArc::clone(existing_arc)
} else {
CompactArc::new(value.clone())
};
let btree_rows = sorted_values
.entry(CompactArc::clone(&arc_value))
.or_default();
if let Err(pos) = btree_rows.binary_search(&row_id) {
btree_rows.insert(pos, row_id);
}
row_to_value.insert(row_id, arc_value);
drop(sorted_values);
drop(row_to_value);
self.invalidate_cache();
Ok(())
}
fn add_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
self.check_closed()?;
for (row_id, values) in entries.iter() {
self.add(values, row_id, self.next_ref())?;
}
Ok(())
}
fn remove(&self, _values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
self.check_closed()?;
let mut sorted_values = self.sorted_values.write();
let mut row_to_value = self.row_to_value.write();
if let Some(arc_value) = row_to_value.remove(row_id) {
if let Some(rows) = sorted_values.get_mut(&arc_value) {
if let Ok(pos) = rows.binary_search(&row_id) {
rows.remove(pos);
}
if rows.is_empty() {
sorted_values.remove(&arc_value);
}
}
drop(sorted_values);
drop(row_to_value);
self.invalidate_cache();
}
Ok(())
}
fn remove_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
self.check_closed()?;
for (row_id, values) in entries.iter() {
self.remove(values, row_id, 0)?;
}
Ok(())
}
fn add_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
self.check_closed()?;
let mut sorted_values = self.sorted_values.write();
let mut row_to_value = self.row_to_value.write();
row_to_value.reserve(entries.len());
if self.unique {
for &(row_id, values) in entries {
if values.is_empty() {
continue;
}
let value = &values[0];
if value.is_null() {
continue;
}
if let Some(rows) = sorted_values.get(value) {
for &existing_row_id in rows.iter() {
if existing_row_id != row_id {
return Err(Error::unique_constraint(
&self.name,
&self.column_name,
format!("{:?}", value),
));
}
}
}
}
let mut seen: ahash::AHashMap<&Value, i64> =
ahash::AHashMap::with_capacity(entries.len());
for &(row_id, values) in entries {
if values.is_empty() {
continue;
}
let value = &values[0];
if value.is_null() {
continue;
}
if let Some(&existing_row_id) = seen.get(value) {
if existing_row_id != row_id {
return Err(Error::unique_constraint(
&self.name,
&self.column_name,
format!("{:?}", value),
));
}
}
seen.insert(value, row_id);
}
}
for &(row_id, values) in entries {
if values.is_empty() {
continue;
}
let value = &values[0];
if let Some(old_arc) = row_to_value.get(row_id) {
if old_arc.as_ref() == value {
continue; }
let old_arc = old_arc.clone();
if let Some(rows) = sorted_values.get_mut(&old_arc) {
if let Ok(pos) = rows.binary_search(&row_id) {
rows.remove(pos);
}
if rows.is_empty() {
sorted_values.remove(&old_arc);
}
}
}
let arc_value = if let Some((existing_arc, _)) = sorted_values.get_key_value(value) {
CompactArc::clone(existing_arc)
} else {
CompactArc::new(value.clone())
};
let btree_rows = sorted_values
.entry(CompactArc::clone(&arc_value))
.or_default();
if let Err(pos) = btree_rows.binary_search(&row_id) {
btree_rows.insert(pos, row_id);
}
row_to_value.insert(row_id, arc_value);
}
drop(sorted_values);
drop(row_to_value);
self.invalidate_cache();
Ok(())
}
fn remove_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
self.check_closed()?;
let mut sorted_values = self.sorted_values.write();
let mut row_to_value = self.row_to_value.write();
let mut any_removed = false;
for &(row_id, _values) in entries {
if let Some(arc_value) = row_to_value.remove(row_id) {
if let Some(rows) = sorted_values.get_mut(&arc_value) {
if let Ok(pos) = rows.binary_search(&row_id) {
rows.remove(pos);
any_removed = true;
}
if rows.is_empty() {
sorted_values.remove(&arc_value);
}
}
}
}
drop(sorted_values);
drop(row_to_value);
if any_removed {
self.invalidate_cache();
}
Ok(())
}
fn column_ids(&self) -> &[i32] {
std::slice::from_ref(&self.column_id)
}
fn column_names(&self) -> &[String] {
std::slice::from_ref(&self.column_name)
}
fn data_types(&self) -> &[DataType] {
std::slice::from_ref(&self.data_type)
}
fn index_type(&self) -> IndexType {
IndexType::BTree
}
fn is_unique(&self) -> bool {
self.unique
}
fn find(&self, values: &[Value]) -> Result<Vec<IndexEntry>> {
self.check_closed()?;
if values.is_empty() {
return Ok(Vec::new());
}
let value = &values[0];
let sorted_values = self.sorted_values.read();
let entries = sorted_values
.get(value)
.map(|rows| {
rows.iter()
.map(|&row_id| IndexEntry {
row_id,
ref_id: row_id, })
.collect()
})
.unwrap_or_default();
Ok(entries)
}
fn find_range(
&self,
min: &[Value],
max: &[Value],
min_inclusive: bool,
max_inclusive: bool,
) -> Result<Vec<IndexEntry>> {
self.check_closed()?;
if min.is_empty() || max.is_empty() {
return Ok(Vec::new());
}
let min_val = &min[0];
let max_val = &max[0];
let sorted_values = self.sorted_values.read();
let min_arc = CompactArc::new(min_val.clone());
let max_arc = CompactArc::new(max_val.clone());
let min_bound = if min_inclusive {
Bound::Included(min_arc)
} else {
Bound::Excluded(min_arc)
};
let max_bound = if max_inclusive {
Bound::Included(max_arc)
} else {
Bound::Excluded(max_arc)
};
let capacity = sorted_values.len() / 4;
let mut entries = Vec::with_capacity(capacity);
for (_, rows) in sorted_values.range((min_bound, max_bound)) {
for &row_id in rows.iter() {
entries.push(IndexEntry {
row_id,
ref_id: row_id,
});
}
}
Ok(entries)
}
fn find_with_operator(&self, op: Operator, values: &[Value]) -> Result<Vec<IndexEntry>> {
self.check_closed()?;
if values.is_empty() {
return Ok(Vec::new());
}
let value = &values[0];
let row_ids = self.find_with_op(op, value);
Ok(row_ids
.into_iter()
.map(|row_id| IndexEntry {
row_id,
ref_id: row_id,
})
.collect())
}
fn get_row_ids_equal_into(&self, values: &[Value], buffer: &mut Vec<i64>) {
if values.is_empty() {
return;
}
let value = &values[0];
let sorted_values = self.sorted_values.read();
if let Some(rows) = sorted_values.get(value) {
buffer.extend_from_slice(rows.as_slice());
}
}
fn get_row_ids_in_range_into(
&self,
min_value: &[Value],
max_value: &[Value],
include_min: bool,
include_max: bool,
buffer: &mut Vec<i64>,
) {
if min_value.is_empty() || max_value.is_empty() {
return;
}
if self.closed.load(AtomicOrdering::Acquire) {
return;
}
let min_val = &min_value[0];
let max_val = &max_value[0];
let sorted_values = self.sorted_values.read();
let min_arc = CompactArc::new(min_val.clone());
let max_arc = CompactArc::new(max_val.clone());
let min_bound = if include_min {
Bound::Included(min_arc)
} else {
Bound::Excluded(min_arc)
};
let max_bound = if include_max {
Bound::Included(max_arc)
} else {
Bound::Excluded(max_arc)
};
for (_, rows) in sorted_values.range((min_bound, max_bound)) {
buffer.extend_from_slice(rows.as_slice());
}
}
fn get_filtered_row_ids(&self, expr: &dyn Expression) -> RowIdVec {
if self.closed.load(AtomicOrdering::Acquire) {
return RowIdVec::new();
}
if let Some((col_name, operator, value)) = expr.get_comparison_info() {
if col_name == self.column_name {
match operator {
Operator::Eq => {
return self.get_row_ids_equal(std::slice::from_ref(value));
}
Operator::Gt | Operator::Gte | Operator::Lt | Operator::Lte => {
let value_slice = std::slice::from_ref(value);
let empty_slice: &[Value] = &[];
let (min_vals, max_vals, include_min, include_max) = match operator {
Operator::Gt => (value_slice, empty_slice, false, false),
Operator::Gte => (value_slice, empty_slice, true, false),
Operator::Lt => (empty_slice, value_slice, false, false),
Operator::Lte => (empty_slice, value_slice, false, true),
_ => return RowIdVec::new(),
};
return self.get_row_ids_in_range(
min_vals,
max_vals,
include_min,
include_max,
);
}
_ => {}
}
}
}
let comparisons = expr.collect_comparisons();
if !comparisons.is_empty() {
let mut min_val: Option<&Value> = None;
let mut max_val: Option<&Value> = None;
let mut include_min = false;
let mut include_max = false;
let mut eq_val: Option<&Value> = None;
for (col_name, op, val) in &comparisons {
if *col_name != self.column_name {
continue;
}
match op {
Operator::Eq => eq_val = Some(*val),
Operator::Gt => {
min_val = Some(*val);
include_min = false;
}
Operator::Gte => {
min_val = Some(*val);
include_min = true;
}
Operator::Lt => {
max_val = Some(*val);
include_max = false;
}
Operator::Lte => {
max_val = Some(*val);
include_max = true;
}
_ => {}
}
}
if let Some(val) = eq_val {
return self.get_row_ids_equal(std::slice::from_ref(val));
}
let empty_slice: &[Value] = &[];
if min_val.is_some() || max_val.is_some() {
let min_vals = min_val.map(std::slice::from_ref).unwrap_or(empty_slice);
let max_vals = max_val.map(std::slice::from_ref).unwrap_or(empty_slice);
return self.get_row_ids_in_range(min_vals, max_vals, include_min, include_max);
}
}
let row_to_value = self.row_to_value.read();
#[cfg(feature = "parallel")]
if row_to_value.len() >= PARALLEL_FILTER_THRESHOLD {
let pairs: Vec<_> = row_to_value.iter().collect();
let collected: Vec<i64> = pairs
.par_iter()
.filter_map(|&(row_id, arc_value)| {
let row = crate::core::Row::from_values(vec![(**arc_value).clone()]);
if expr.evaluate(&row).unwrap_or(false) {
Some(row_id)
} else {
None
}
})
.collect();
return RowIdVec::from_vec(collected);
}
{
let mut results = RowIdVec::with_capacity(row_to_value.len() / 4);
for (row_id, arc_value) in row_to_value.iter() {
let row = crate::core::Row::from_values(vec![(**arc_value).clone()]);
if expr.evaluate(&row).unwrap_or(false) {
results.push(row_id);
}
}
results
}
}
fn get_min_value(&self) -> Option<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
self.update_cache_if_needed();
let cached_min = self.cached_min.read();
cached_min.as_ref().map(|arc| (**arc).clone())
}
fn get_max_value(&self) -> Option<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
self.update_cache_if_needed();
let cached_max = self.cached_max.read();
cached_max.as_ref().map(|arc| (**arc).clone())
}
fn get_all_values(&self) -> Vec<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return Vec::new();
}
let sorted_values = self.sorted_values.read();
sorted_values.keys().map(|arc| (**arc).clone()).collect()
}
fn get_distinct_count_excluding_null(&self) -> Option<usize> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
let sorted_values = self.sorted_values.read();
let count = sorted_values.keys().filter(|v| !v.is_null()).count();
Some(count)
}
fn get_row_ids_ordered(
&self,
ascending: bool,
limit: usize,
offset: usize,
) -> Option<Vec<i64>> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
let sorted_values = self.sorted_values.read();
let mut result = Vec::with_capacity(limit.min(128));
let mut skipped = 0;
if ascending {
'outer: for row_ids in sorted_values.values() {
for &row_id in row_ids {
if skipped < offset {
skipped += 1;
continue;
}
result.push(row_id);
if result.len() >= limit {
break 'outer;
}
}
}
} else {
'outer: for row_ids in sorted_values.values().rev() {
for &row_id in row_ids {
if skipped < offset {
skipped += 1;
continue;
}
result.push(row_id);
if result.len() >= limit {
break 'outer;
}
}
}
}
Some(result)
}
fn get_grouped_row_ids(&self) -> Option<Vec<(Value, Vec<i64>)>> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
let sorted_values = self.sorted_values.read();
let result: Vec<(Value, Vec<i64>)> = sorted_values
.iter()
.map(|(arc_value, row_ids)| ((**arc_value).clone(), row_ids.to_vec()))
.collect();
Some(result)
}
fn for_each_group(
&self,
callback: &mut dyn FnMut(&Value, &[i64]) -> Result<bool>,
) -> Option<Result<()>> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
let sorted_values = self.sorted_values.read();
for (arc_value, row_ids) in sorted_values.iter() {
match callback(arc_value.as_ref(), row_ids.as_slice()) {
Ok(true) => continue, Ok(false) => break, Err(e) => return Some(Err(e)), }
}
Some(Ok(()))
}
fn clear(&self) -> Result<()> {
let mut sorted_values = self.sorted_values.write();
sorted_values.clear();
drop(sorted_values);
let mut row_to_value = self.row_to_value.write();
row_to_value.clear();
drop(row_to_value);
*self.cached_min.write() = None;
*self.cached_max.write() = None;
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn close(&mut self) -> Result<()> {
self.closed.store(true, AtomicOrdering::Release);
self.clear()
}
}
pub fn intersect_sorted_ids(a: &[i64], b: &[i64]) -> RowIdVec {
if a.is_empty() || b.is_empty() {
return RowIdVec::new();
}
let (a_min, a_max) = (a[0], a[a.len() - 1]);
let (b_min, b_max) = (b[0], b[b.len() - 1]);
if a_max < b_min || b_max < a_min {
return RowIdVec::new();
}
let (smaller, larger) = if a.len() <= b.len() { (a, b) } else { (b, a) };
let mut result = RowIdVec::with_capacity(smaller.len());
for &val in smaller {
if larger.binary_search(&val).is_ok() {
result.push(val);
}
}
result
}
pub fn intersect_multiple_sorted_ids(slices: &[&[i64]]) -> RowIdVec {
if slices.is_empty() {
return RowIdVec::new();
}
if slices.len() == 1 {
let mut result = RowIdVec::with_capacity(slices[0].len());
result.extend_from_slice(slices[0]);
return result;
}
let mut result = intersect_sorted_ids(slices[0], slices[1]);
for slice in &slices[2..] {
if result.is_empty() {
break;
}
result = intersect_sorted_ids(&result, slice);
}
result
}
pub fn union_sorted_ids(a: &[i64], b: &[i64]) -> RowIdVec {
if a.is_empty() {
let mut result = RowIdVec::with_capacity(b.len());
result.extend_from_slice(b);
return result;
}
if b.is_empty() {
let mut result = RowIdVec::with_capacity(a.len());
result.extend_from_slice(a);
return result;
}
let mut result = RowIdVec::with_capacity(a.len() + b.len());
let mut i = 0;
let mut j = 0;
while i < a.len() && j < b.len() {
match a[i].cmp(&b[j]) {
std::cmp::Ordering::Less => {
result.push(a[i]);
i += 1;
}
std::cmp::Ordering::Greater => {
result.push(b[j]);
j += 1;
}
std::cmp::Ordering::Equal => {
result.push(a[i]);
i += 1;
j += 1;
}
}
}
result.extend_from_slice(&a[i..]);
result.extend_from_slice(&b[j..]);
result
}
pub fn union_multiple_sorted_ids(slices: &[&[i64]]) -> RowIdVec {
if slices.is_empty() {
return RowIdVec::new();
}
if slices.len() == 1 {
let mut result = RowIdVec::with_capacity(slices[0].len());
result.extend_from_slice(slices[0]);
return result;
}
let mut result = union_sorted_ids(slices[0], slices[1]);
for slice in &slices[2..] {
result = union_sorted_ids(&result, slice);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_index() -> BTreeIndex {
BTreeIndex::new(
"idx_test".to_string(),
"test_table".to_string(),
0,
"id".to_string(),
DataType::Integer,
false,
0, )
}
#[test]
fn test_btree_index_creation() {
let index = create_test_index();
assert_eq!(index.name(), "idx_test");
assert_eq!(index.table_name(), "test_table");
assert_eq!(index.column_names()[0], "id");
assert_eq!(index.index_type(), IndexType::BTree);
assert!(!index.is_unique());
}
#[test]
fn test_btree_index_with_custom_name() {
let index = BTreeIndex::with_custom_name(
"users".to_string(),
1,
"email".to_string(),
DataType::Text,
true,
Some("custom_email_idx"),
0,
);
assert_eq!(index.name(), "custom_email_idx");
assert!(index.is_unique());
}
#[test]
fn test_btree_index_add_and_find() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(200)], 2, 2).unwrap();
index.add(&[Value::Integer(100)], 3, 3).unwrap();
assert_eq!(index.unique_value_count(), 2);
assert_eq!(index.entry_count(), 3);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 2);
let entries = index.find(&[Value::Integer(999)]).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_btree_index_unique_constraint() {
let index = BTreeIndex::new(
"idx_unique".to_string(),
"test".to_string(),
0,
"id".to_string(),
DataType::Integer,
true, 0,
);
assert!(index.add(&[Value::Integer(1)], 1, 1).is_ok());
let result = index.add(&[Value::Integer(1)], 2, 2);
assert!(result.is_err());
assert!(index.add(&[Value::null_unknown()], 3, 3).is_ok());
assert!(index.add(&[Value::null_unknown()], 4, 4).is_ok());
}
#[test]
fn test_btree_index_remove() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
index.add(&[Value::Integer(200)], 3, 3).unwrap();
assert_eq!(index.entry_count(), 3);
index.remove(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 2);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 2);
index.remove(&[Value::Integer(100)], 2, 2).unwrap();
assert!(!index.contains_value(&Value::Integer(100)));
}
#[test]
fn test_btree_index_range_query() {
let index = create_test_index();
for i in 1..=10 {
index.add(&[Value::Integer(i * 10)], i, i).unwrap();
}
let entries = index
.find_range(&[Value::Integer(30)], &[Value::Integer(70)], true, true)
.unwrap();
assert_eq!(entries.len(), 5);
let entries = index
.find_range(&[Value::Integer(30)], &[Value::Integer(70)], false, false)
.unwrap();
assert_eq!(entries.len(), 3); }
#[test]
fn test_btree_index_operators() {
let index = create_test_index();
for i in 1..=5 {
index.add(&[Value::Integer(i * 10)], i, i).unwrap();
}
let entries = index
.find_with_operator(Operator::Lt, &[Value::Integer(30)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::Gte, &[Value::Integer(40)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::Ne, &[Value::Integer(30)])
.unwrap();
assert_eq!(entries.len(), 4); }
#[test]
fn test_btree_index_batch_operations() {
let index = create_test_index();
let mut entries = I64Map::new();
entries.insert(1, vec![Value::Integer(100)]);
entries.insert(2, vec![Value::Integer(200)]);
entries.insert(3, vec![Value::Integer(100)]);
index.add_batch(&entries).unwrap();
assert_eq!(index.entry_count(), 3);
let mut to_remove = I64Map::new();
to_remove.insert(1, vec![Value::Integer(100)]);
to_remove.insert(2, vec![Value::Integer(200)]);
index.remove_batch(&to_remove).unwrap();
assert_eq!(index.entry_count(), 1);
}
#[test]
fn test_btree_index_null_handling() {
let index = create_test_index();
index.add(&[Value::null_unknown()], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
index.add(&[Value::null_unknown()], 3, 3).unwrap();
let entries = index
.find_with_operator(Operator::IsNull, &[Value::null_unknown()])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::IsNotNull, &[Value::null_unknown()])
.unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_btree_index_text_values() {
let index = BTreeIndex::new(
"idx_name".to_string(),
"test".to_string(),
0,
"name".to_string(),
DataType::Text,
false,
0,
);
index.add(&[Value::text("Alice")], 1, 1).unwrap();
index.add(&[Value::text("Bob")], 2, 2).unwrap();
index.add(&[Value::text("Charlie")], 3, 3).unwrap();
let entries = index
.find_range(&[Value::text("Alice")], &[Value::text("Bob")], true, true)
.unwrap();
assert_eq!(entries.len(), 2); }
#[test]
fn test_btree_index_close() {
let mut index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 1);
index.close().unwrap();
assert!(index.add(&[Value::Integer(200)], 2, 2).is_err());
assert!(index.find(&[Value::Integer(100)]).is_err());
}
#[test]
fn test_btree_index_get_all_values() {
let index = create_test_index();
index.add(&[Value::Integer(30)], 1, 1).unwrap();
index.add(&[Value::Integer(10)], 2, 2).unwrap();
index.add(&[Value::Integer(20)], 3, 3).unwrap();
let values = index.get_all_values();
assert_eq!(values.len(), 3);
assert!(values.contains(&Value::Integer(10)));
assert!(values.contains(&Value::Integer(20)));
assert!(values.contains(&Value::Integer(30)));
}
#[test]
fn test_btree_index_row_id_helpers() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
index.add(&[Value::Integer(200)], 3, 3).unwrap();
let row_ids = index.get_row_ids_equal(&[Value::Integer(100)]);
assert_eq!(row_ids.len(), 2);
let row_ids =
index.get_row_ids_in_range(&[Value::Integer(100)], &[Value::Integer(200)], true, true);
assert_eq!(row_ids.len(), 3);
}
#[test]
fn test_btree_index_lte_gte_operators() {
let index = create_test_index();
for i in 1..=5 {
index.add(&[Value::Integer(i * 10)], i, i).unwrap();
}
let entries = index
.find_with_operator(Operator::Lte, &[Value::Integer(30)])
.unwrap();
assert_eq!(entries.len(), 3);
let entries = index
.find_with_operator(Operator::Gt, &[Value::Integer(20)])
.unwrap();
assert_eq!(entries.len(), 3); }
#[test]
fn test_mixed_integer_float_ordering() {
let index = BTreeIndex::new(
"idx_mixed".to_string(),
"test".to_string(),
0,
"value".to_string(),
DataType::Float, false,
0,
);
index.add(&[Value::Integer(1)], 1, 1).unwrap();
index.add(&[Value::Float(2.5)], 2, 2).unwrap();
index.add(&[Value::Integer(3)], 3, 3).unwrap();
index.add(&[Value::Float(1.5)], 4, 4).unwrap();
index.add(&[Value::Float(3.0)], 5, 5).unwrap();
assert_eq!(index.entry_count(), 5);
let entries = index
.find_with_operator(Operator::Gte, &[Value::Integer(2)])
.unwrap();
assert_eq!(entries.len(), 3);
let entries = index
.find_with_operator(Operator::Lt, &[Value::Float(2.0)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index.find(&[Value::Integer(3)]).unwrap();
assert!(!entries.is_empty()); }
#[test]
fn test_nan_handling_in_range_queries() {
let index = BTreeIndex::new(
"idx_nan".to_string(),
"test".to_string(),
0,
"value".to_string(),
DataType::Float,
false,
0,
);
index.add(&[Value::Float(1.0)], 1, 1).unwrap();
index.add(&[Value::Float(f64::NAN)], 2, 2).unwrap();
index.add(&[Value::Float(2.0)], 3, 3).unwrap();
index.add(&[Value::Float(f64::INFINITY)], 4, 4).unwrap();
index.add(&[Value::Float(f64::NEG_INFINITY)], 5, 5).unwrap();
assert_eq!(index.entry_count(), 5);
let entries = index
.find_with_operator(Operator::Gt, &[Value::Float(100.0)])
.unwrap();
assert_eq!(entries.len(), 2);
let entries = index
.find_with_operator(Operator::Lt, &[Value::Float(f64::NAN)])
.unwrap();
assert_eq!(entries.len(), 4);
}
#[test]
fn test_high_cardinality_duplicates() {
let index = BTreeIndex::new(
"idx_highcard".to_string(),
"test".to_string(),
0,
"status".to_string(),
DataType::Text,
false,
0,
);
for i in 0..100 {
let status = match i % 3 {
0 => "active",
1 => "pending",
_ => "completed",
};
index
.add(&[Value::text(status)], i as i64, i as i64)
.unwrap();
}
assert_eq!(index.unique_value_count(), 3);
assert_eq!(index.entry_count(), 100);
let entries = index.find(&[Value::text("active")]).unwrap();
assert_eq!(entries.len(), 34);
let entries = index.find(&[Value::text("pending")]).unwrap();
assert_eq!(entries.len(), 33);
index.add(&[Value::text("archived")], 0, 0).unwrap();
assert_eq!(index.unique_value_count(), 4);
let entries = index.find(&[Value::text("active")]).unwrap();
assert_eq!(entries.len(), 33);
let entries = index.find(&[Value::text("archived")]).unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_update_same_value_is_noop() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 1);
index.add(&[Value::Integer(100)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 1);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 1);
}
#[test]
fn test_update_different_value_moves_row() {
let index = create_test_index();
index.add(&[Value::Integer(100)], 1, 1).unwrap();
index.add(&[Value::Integer(100)], 2, 2).unwrap();
assert_eq!(index.entry_count(), 2);
index.add(&[Value::Integer(200)], 1, 1).unwrap();
assert_eq!(index.entry_count(), 2);
assert_eq!(index.unique_value_count(), 2);
let entries = index.find(&[Value::Integer(100)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 2);
let entries = index.find(&[Value::Integer(200)]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_id, 1);
}
}