use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::common::{I64Map, I64Set};
use crate::core::{DataType, IndexEntry, IndexType, Operator, Result, RowIdVec, Value};
use crate::storage::expression::Expression;
use crate::storage::Index;
const BITSET_MAX_BITS: usize = 10_000_000;
const BITS: usize = 64;
struct PkIndexInner {
words: Vec<u64>,
overflow: I64Set,
has_i64_min: bool,
count: usize,
}
impl PkIndexInner {
fn new() -> Self {
Self {
words: Vec::new(),
overflow: I64Set::new(),
has_i64_min: false,
count: 0,
}
}
#[inline]
fn contains(&self, id: i64) -> bool {
if let Some((word_idx, mask)) = to_word_bit(id) {
word_idx < self.words.len() && (self.words[word_idx] & mask) != 0
} else if id == i64::MIN {
self.has_i64_min
} else {
self.overflow.contains(id)
}
}
#[inline]
fn insert(&mut self, id: i64) -> bool {
if let Some((word_idx, mask)) = to_word_bit(id) {
ensure_capacity(&mut self.words, word_idx);
if (self.words[word_idx] & mask) == 0 {
self.words[word_idx] |= mask;
self.count += 1;
true
} else {
false
}
} else if id == i64::MIN {
if self.has_i64_min {
false
} else {
self.has_i64_min = true;
self.count += 1;
true
}
} else if self.overflow.insert(id) {
self.count += 1;
true
} else {
false
}
}
#[inline]
fn remove(&mut self, id: i64) -> bool {
if let Some((word_idx, mask)) = to_word_bit(id) {
if word_idx < self.words.len() && (self.words[word_idx] & mask) != 0 {
self.words[word_idx] &= !mask;
self.count -= 1;
true
} else {
false
}
} else if id == i64::MIN {
if self.has_i64_min {
self.has_i64_min = false;
self.count -= 1;
true
} else {
false
}
} else if self.overflow.remove(id) {
self.count -= 1;
true
} else {
false
}
}
#[inline]
fn overflow_empty(&self) -> bool {
self.overflow.is_empty() && !self.has_i64_min
}
fn clear(&mut self) {
self.words.clear();
self.overflow = I64Set::new();
self.has_i64_min = false;
self.count = 0;
}
}
#[inline]
fn to_word_bit(row_id: i64) -> Option<(usize, u64)> {
if row_id >= 0 {
let idx = row_id as usize;
if idx < BITSET_MAX_BITS {
return Some((idx / BITS, 1u64 << (idx % BITS)));
}
}
None
}
#[inline]
fn ensure_capacity(words: &mut Vec<u64>, word_idx: usize) {
if word_idx >= words.len() {
words.resize(word_idx + 1, 0);
}
}
#[inline]
fn bit_capacity(words: &[u64]) -> usize {
words.len() * BITS
}
#[inline]
fn for_each_set_bit(words: &[u64], mut f: impl FnMut(i64) -> bool) {
for (word_idx, &word) in words.iter().enumerate() {
if word == 0 {
continue;
}
let base = (word_idx * BITS) as i64;
let mut w = word;
while w != 0 {
let bit = w.trailing_zeros() as i64;
if !f(base + bit) {
return;
}
w &= w - 1; }
}
}
#[inline]
fn for_each_set_bit_rev(words: &[u64], mut f: impl FnMut(i64) -> bool) {
for word_idx in (0..words.len()).rev() {
let word = words[word_idx];
if word == 0 {
continue;
}
let base = (word_idx * BITS) as i64;
let mut w = word;
while w != 0 {
let bit = (BITS - 1 - w.leading_zeros() as usize) as i64;
if !f(base + bit) {
return;
}
w &= !(1u64 << bit); }
}
}
fn for_each_set_bit_in_range(words: &[u64], lo: usize, hi: usize, mut f: impl FnMut(i64) -> bool) {
let lo_word = lo / BITS;
let hi_word = hi / BITS;
let max_word = words.len().saturating_sub(1);
if lo_word > max_word {
return;
}
let hi_word = hi_word.min(max_word);
for (word_idx, &raw_word) in words.iter().enumerate().take(hi_word + 1).skip(lo_word) {
let mut word = raw_word;
if word == 0 {
continue;
}
if word_idx == lo_word {
let lo_bit = lo % BITS;
word &= !((1u64 << lo_bit) - 1);
}
if word_idx == hi_word {
let hi_bit = hi % BITS;
if hi_bit < BITS - 1 {
word &= (1u64 << (hi_bit + 1)) - 1;
}
}
if word == 0 {
continue;
}
let base = (word_idx * BITS) as i64;
while word != 0 {
let bit = word.trailing_zeros() as i64;
if !f(base + bit) {
return;
}
word &= word - 1;
}
}
}
fn collect_range_entries(words: &[u64], lo: usize, hi: usize) -> Vec<IndexEntry> {
let mut result = Vec::new();
for_each_set_bit_in_range(words, lo, hi, |rid| {
result.push(IndexEntry {
row_id: rid,
ref_id: rid,
});
true
});
result
}
fn bitset_min(words: &[u64]) -> Option<i64> {
for (word_idx, &word) in words.iter().enumerate() {
if word != 0 {
return Some((word_idx * BITS) as i64 + word.trailing_zeros() as i64);
}
}
None
}
fn bitset_max(words: &[u64]) -> Option<i64> {
for word_idx in (0..words.len()).rev() {
let word = words[word_idx];
if word != 0 {
let bit = (BITS - 1 - word.leading_zeros() as usize) as i64;
return Some((word_idx * BITS) as i64 + bit);
}
}
None
}
pub struct PkIndex {
name: String,
table_name: String,
column_id: i32,
column_name: String,
data: RwLock<PkIndexInner>,
closed: AtomicBool,
}
impl PkIndex {
pub fn new(name: String, table_name: String, column_id: i32, column_name: String) -> Self {
Self {
name,
table_name,
column_id,
column_name,
data: RwLock::new(PkIndexInner::new()),
closed: AtomicBool::new(false),
}
}
#[inline]
fn extract_i64(values: &[Value]) -> Option<i64> {
values.first().and_then(|v| match v {
Value::Integer(i) => Some(*i),
_ => None,
})
}
#[inline]
fn overflow_min(inner: &PkIndexInner) -> Option<i64> {
if inner.has_i64_min {
return Some(i64::MIN);
}
if !inner.overflow.is_empty() {
inner.overflow.iter().min()
} else {
None
}
}
#[inline]
fn overflow_max(inner: &PkIndexInner) -> Option<i64> {
let set_max = if !inner.overflow.is_empty() {
inner.overflow.iter().max()
} else {
None
};
if inner.has_i64_min {
Some(set_max.unwrap_or(i64::MIN))
} else {
set_max
}
}
}
impl Index for PkIndex {
fn name(&self) -> &str {
&self.name
}
fn table_name(&self) -> &str {
&self.table_name
}
fn build(&mut self) -> Result<()> {
Ok(())
}
fn add(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
let _ = values;
let mut inner = self.data.write();
inner.insert(row_id);
Ok(())
}
fn add_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
let mut inner = self.data.write();
for (row_id, _) in entries.iter() {
inner.insert(row_id);
}
Ok(())
}
fn add_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
let mut inner = self.data.write();
for &(row_id, _) in entries {
inner.insert(row_id);
}
Ok(())
}
fn remove(&self, _values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
let mut inner = self.data.write();
inner.remove(row_id);
Ok(())
}
fn remove_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
let mut inner = self.data.write();
for (row_id, _) in entries.iter() {
inner.remove(row_id);
}
Ok(())
}
fn remove_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
let mut inner = self.data.write();
for &(row_id, _) in entries {
inner.remove(row_id);
}
Ok(())
}
fn column_ids(&self) -> &[i32] {
std::slice::from_ref(&self.column_id)
}
fn column_names(&self) -> &[String] {
std::slice::from_ref(&self.column_name)
}
fn data_types(&self) -> &[DataType] {
&[DataType::Integer]
}
fn index_type(&self) -> IndexType {
IndexType::PrimaryKey
}
fn is_unique(&self) -> bool {
true
}
fn find(&self, values: &[Value]) -> Result<Vec<IndexEntry>> {
let Some(row_id) = Self::extract_i64(values) else {
return Ok(Vec::new());
};
let inner = self.data.read();
if inner.contains(row_id) {
Ok(vec![IndexEntry {
row_id,
ref_id: row_id,
}])
} else {
Ok(Vec::new())
}
}
fn find_range(
&self,
min: &[Value],
max: &[Value],
min_inclusive: bool,
max_inclusive: bool,
) -> Result<Vec<IndexEntry>> {
let min_val = if min.is_empty() {
None
} else {
Self::extract_i64(min)
};
let max_val = if max.is_empty() {
None
} else {
Self::extract_i64(max)
};
let lo = match min_val {
Some(v) if min_inclusive => v,
Some(v) => match v.checked_add(1) {
Some(next) => next,
None => return Ok(Vec::new()), },
None => i64::MIN,
};
let hi = match max_val {
Some(v) if max_inclusive => v,
Some(v) => match v.checked_sub(1) {
Some(prev) => prev,
None => return Ok(Vec::new()), },
None => i64::MAX,
};
if lo > hi {
return Ok(Vec::new());
}
let inner = self.data.read();
let total_bits = bit_capacity(&inner.words) as i64;
let bitset_lo = lo.max(0);
let bitset_hi = hi.min(total_bits - 1);
let mut result = if total_bits > 0 && bitset_lo <= bitset_hi && bitset_hi >= 0 {
collect_range_entries(&inner.words, bitset_lo as usize, bitset_hi as usize)
} else {
Vec::new()
};
if !inner.overflow_empty() {
if inner.has_i64_min && lo == i64::MIN {
result.push(IndexEntry {
row_id: i64::MIN,
ref_id: i64::MIN,
});
}
for id in inner.overflow.iter() {
if id >= lo && id <= hi {
result.push(IndexEntry {
row_id: id,
ref_id: id,
});
}
}
result.sort_unstable_by_key(|e| e.row_id);
}
Ok(result)
}
fn find_with_operator(&self, op: Operator, values: &[Value]) -> Result<Vec<IndexEntry>> {
match op {
Operator::Eq => self.find(values),
Operator::Gt => self.find_range(values, &[], false, false),
Operator::Gte => self.find_range(values, &[], true, false),
Operator::Lt => self.find_range(&[], values, false, false),
Operator::Lte => self.find_range(&[], values, false, true),
Operator::Ne => {
let Some(exclude_id) = Self::extract_i64(values) else {
return Ok(Vec::new());
};
let inner = self.data.read();
let mut result = Vec::new();
for_each_set_bit(&inner.words, |rid| {
if rid != exclude_id {
result.push(IndexEntry {
row_id: rid,
ref_id: rid,
});
}
true
});
if inner.has_i64_min && i64::MIN != exclude_id {
result.push(IndexEntry {
row_id: i64::MIN,
ref_id: i64::MIN,
});
}
for id in inner.overflow.iter() {
if id != exclude_id {
result.push(IndexEntry {
row_id: id,
ref_id: id,
});
}
}
Ok(result)
}
_ => Ok(Vec::new()),
}
}
fn get_row_ids_equal_into(&self, values: &[Value], buffer: &mut Vec<i64>) {
let Some(row_id) = Self::extract_i64(values) else {
return;
};
let inner = self.data.read();
if inner.contains(row_id) {
buffer.push(row_id);
}
}
fn get_row_ids_in_range_into(
&self,
min_value: &[Value],
max_value: &[Value],
include_min: bool,
include_max: bool,
buffer: &mut Vec<i64>,
) {
if let Ok(entries) = self.find_range(min_value, max_value, include_min, include_max) {
buffer.extend(entries.into_iter().map(|e| e.row_id));
}
}
fn get_row_ids_in_into(&self, value_list: &[Value], buffer: &mut Vec<i64>) {
let inner = self.data.read();
for value in value_list {
if let Value::Integer(row_id) = value {
if inner.contains(*row_id) {
buffer.push(*row_id);
}
}
}
}
fn get_filtered_row_ids(&self, expr: &dyn Expression) -> RowIdVec {
if let Some((col_name, operator, value)) = expr.get_comparison_info() {
if col_name == self.column_name {
match operator {
Operator::Eq => {
return self.get_row_ids_equal(std::slice::from_ref(value));
}
Operator::Gt | Operator::Gte | Operator::Lt | Operator::Lte => {
let value_slice = std::slice::from_ref(value);
let empty_slice: &[Value] = &[];
let (min_vals, max_vals, include_min, include_max) = match operator {
Operator::Gt => (value_slice, empty_slice, false, false),
Operator::Gte => (value_slice, empty_slice, true, false),
Operator::Lt => (empty_slice, value_slice, false, false),
Operator::Lte => (empty_slice, value_slice, false, true),
_ => return RowIdVec::new(),
};
return self.get_row_ids_in_range(
min_vals,
max_vals,
include_min,
include_max,
);
}
_ => {}
}
}
}
let comparisons = expr.collect_comparisons();
if !comparisons.is_empty() {
let mut min_val: Option<&Value> = None;
let mut max_val: Option<&Value> = None;
let mut include_min = false;
let mut include_max = false;
let mut eq_val: Option<&Value> = None;
for (col_name, op, val) in &comparisons {
if *col_name != self.column_name {
continue;
}
match op {
Operator::Eq => {
eq_val = Some(val);
}
Operator::Gt => {
min_val = Some(val);
include_min = false;
}
Operator::Gte => {
min_val = Some(val);
include_min = true;
}
Operator::Lt => {
max_val = Some(val);
include_max = false;
}
Operator::Lte => {
max_val = Some(val);
include_max = true;
}
_ => {}
}
}
if let Some(eq) = eq_val {
return self.get_row_ids_equal(std::slice::from_ref(eq));
}
if min_val.is_some() || max_val.is_some() {
let min_slice: &[Value] = min_val.map(std::slice::from_ref).unwrap_or(&[]);
let max_slice: &[Value] = max_val.map(std::slice::from_ref).unwrap_or(&[]);
return self.get_row_ids_in_range(min_slice, max_slice, include_min, include_max);
}
}
RowIdVec::new()
}
fn get_min_value(&self) -> Option<Value> {
let inner = self.data.read();
let b_min = bitset_min(&inner.words);
let o_min = Self::overflow_min(&inner);
match (b_min, o_min) {
(Some(a), Some(b)) => Some(Value::Integer(a.min(b))),
(Some(a), None) => Some(Value::Integer(a)),
(None, Some(b)) => Some(Value::Integer(b)),
(None, None) => None,
}
}
fn get_max_value(&self) -> Option<Value> {
let inner = self.data.read();
let b_max = bitset_max(&inner.words);
let o_max = Self::overflow_max(&inner);
match (b_max, o_max) {
(Some(a), Some(b)) => Some(Value::Integer(a.max(b))),
(Some(a), None) => Some(Value::Integer(a)),
(None, Some(b)) => Some(Value::Integer(b)),
(None, None) => None,
}
}
fn get_all_values(&self) -> Vec<Value> {
let inner = self.data.read();
let mut result = Vec::with_capacity(inner.count);
for_each_set_bit(&inner.words, |rid| {
result.push(Value::Integer(rid));
true
});
if !inner.overflow_empty() {
if inner.has_i64_min {
result.push(Value::Integer(i64::MIN));
}
let mut overflow_ids: Vec<i64> = inner.overflow.iter().collect();
overflow_ids.sort_unstable();
for id in overflow_ids {
result.push(Value::Integer(id));
}
}
result
}
fn get_distinct_count_excluding_null(&self) -> Option<usize> {
let inner = self.data.read();
Some(inner.count)
}
fn get_row_ids_ordered(
&self,
ascending: bool,
limit: usize,
offset: usize,
) -> Option<Vec<i64>> {
let inner = self.data.read();
if inner.overflow_empty() {
let mut result = Vec::with_capacity(limit.min(inner.count));
let mut skipped = 0usize;
if ascending {
for_each_set_bit(&inner.words, |rid| {
if skipped < offset {
skipped += 1;
return true;
}
result.push(rid);
result.len() < limit
});
} else {
for_each_set_bit_rev(&inner.words, |rid| {
if skipped < offset {
skipped += 1;
return true;
}
result.push(rid);
result.len() < limit
});
}
return Some(result);
}
let mut overflow_ids =
Vec::with_capacity(inner.overflow.len() + inner.has_i64_min as usize);
if inner.has_i64_min {
overflow_ids.push(i64::MIN);
}
for id in inner.overflow.iter() {
overflow_ids.push(id);
}
overflow_ids.sort_unstable();
let mut result = Vec::with_capacity(limit.min(inner.count));
let mut skipped = 0usize;
if ascending {
let mut overflow_pos = 0;
for_each_set_bit(&inner.words, |rid| {
while overflow_pos < overflow_ids.len() && overflow_ids[overflow_pos] < rid {
if skipped < offset {
skipped += 1;
} else {
result.push(overflow_ids[overflow_pos]);
if result.len() >= limit {
overflow_pos += 1;
return false;
}
}
overflow_pos += 1;
}
if skipped < offset {
skipped += 1;
} else {
result.push(rid);
if result.len() >= limit {
return false;
}
}
true
});
while overflow_pos < overflow_ids.len() && result.len() < limit {
if skipped < offset {
skipped += 1;
} else {
result.push(overflow_ids[overflow_pos]);
}
overflow_pos += 1;
}
} else {
let mut overflow_pos = overflow_ids.len();
for_each_set_bit_rev(&inner.words, |rid| {
while overflow_pos > 0 && overflow_ids[overflow_pos - 1] > rid {
overflow_pos -= 1;
if skipped < offset {
skipped += 1;
} else {
result.push(overflow_ids[overflow_pos]);
if result.len() >= limit {
return false;
}
}
}
if skipped < offset {
skipped += 1;
} else {
result.push(rid);
if result.len() >= limit {
return false;
}
}
true
});
while overflow_pos > 0 && result.len() < limit {
overflow_pos -= 1;
if skipped < offset {
skipped += 1;
} else {
result.push(overflow_ids[overflow_pos]);
}
}
}
Some(result)
}
fn get_grouped_row_ids(&self) -> Option<Vec<(Value, Vec<i64>)>> {
let inner = self.data.read();
let mut result = Vec::with_capacity(inner.count);
for_each_set_bit(&inner.words, |rid| {
result.push((Value::Integer(rid), vec![rid]));
true
});
if inner.has_i64_min {
result.push((Value::Integer(i64::MIN), vec![i64::MIN]));
}
for id in inner.overflow.iter() {
result.push((Value::Integer(id), vec![id]));
}
Some(result)
}
fn for_each_group(
&self,
callback: &mut dyn FnMut(&Value, &[i64]) -> Result<bool>,
) -> Option<Result<()>> {
let inner = self.data.read();
if inner.overflow_empty() {
let mut err: Option<crate::core::Error> = None;
for_each_set_bit(&inner.words, |rid| {
let val = Value::Integer(rid);
match callback(&val, &[rid]) {
Ok(true) => true,
Ok(false) => false,
Err(e) => {
err = Some(e);
false
}
}
});
return Some(match err {
Some(e) => Err(e),
None => Ok(()),
});
}
let overflow_cap = inner.overflow.len() + if inner.has_i64_min { 1 } else { 0 };
let mut overflow_sorted: Vec<i64> = Vec::with_capacity(overflow_cap);
if inner.has_i64_min {
overflow_sorted.push(i64::MIN);
}
overflow_sorted.extend(inner.overflow.iter());
overflow_sorted.sort_unstable();
for &id in &overflow_sorted {
if id >= 0 {
break;
}
let val = Value::Integer(id);
match callback(&val, &[id]) {
Ok(true) => continue,
Ok(false) => return Some(Ok(())),
Err(e) => return Some(Err(e)),
}
}
let mut early_exit = false;
let mut err: Option<crate::core::Error> = None;
for_each_set_bit(&inner.words, |rid| {
let val = Value::Integer(rid);
match callback(&val, &[rid]) {
Ok(true) => true,
Ok(false) => {
early_exit = true;
false
}
Err(e) => {
err = Some(e);
false
}
}
});
if let Some(e) = err {
return Some(Err(e));
}
if early_exit {
return Some(Ok(()));
}
for &id in &overflow_sorted {
if id < 0 {
continue;
}
let val = Value::Integer(id);
match callback(&val, &[id]) {
Ok(true) => continue,
Ok(false) => return Some(Ok(())),
Err(e) => return Some(Err(e)),
}
}
Some(Ok(()))
}
fn clear(&self) -> Result<()> {
let mut inner = self.data.write();
inner.clear();
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn close(&mut self) -> Result<()> {
self.closed.store(true, Ordering::Release);
self.clear()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_idx() -> PkIndex {
PkIndex::new(
"__pk_test_id".to_string(),
"test".to_string(),
0,
"id".to_string(),
)
}
fn count(idx: &PkIndex) -> usize {
idx.data.read().count
}
#[test]
fn test_pk_index_basic() {
let idx = make_idx();
assert!(idx.is_unique());
assert_eq!(idx.index_type(), IndexType::PrimaryKey);
assert!(idx.find(&[Value::Integer(1)]).unwrap().is_empty());
idx.add(&[Value::Integer(1)], 1, 1).unwrap();
idx.add(&[Value::Integer(5)], 5, 5).unwrap();
idx.add(&[Value::Integer(3)], 3, 3).unwrap();
assert_eq!(count(&idx), 3);
let results = idx.find(&[Value::Integer(5)]).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, 5);
assert!(idx.find(&[Value::Integer(2)]).unwrap().is_empty());
idx.remove(&[Value::Integer(5)], 5, 5).unwrap();
assert!(idx.find(&[Value::Integer(5)]).unwrap().is_empty());
assert_eq!(count(&idx), 2);
}
#[test]
fn test_pk_index_range() {
let idx = make_idx();
for i in 1..=10 {
idx.add(&[Value::Integer(i)], i, i).unwrap();
}
let results = idx
.find_range(&[Value::Integer(3)], &[Value::Integer(7)], true, true)
.unwrap();
assert_eq!(results.len(), 5);
assert_eq!(results[0].row_id, 3);
assert_eq!(results[4].row_id, 7);
let results = idx
.find_range(&[Value::Integer(3)], &[Value::Integer(7)], false, false)
.unwrap();
assert_eq!(results.len(), 3); }
#[test]
fn test_pk_index_min_max() {
let idx = make_idx();
assert!(idx.get_min_value().is_none());
assert!(idx.get_max_value().is_none());
idx.add(&[Value::Integer(5)], 5, 5).unwrap();
idx.add(&[Value::Integer(10)], 10, 10).unwrap();
idx.add(&[Value::Integer(2)], 2, 2).unwrap();
assert_eq!(idx.get_min_value(), Some(Value::Integer(2)));
assert_eq!(idx.get_max_value(), Some(Value::Integer(10)));
}
#[test]
fn test_pk_index_noop_on_duplicate_add() {
let idx = make_idx();
idx.add(&[Value::Integer(1)], 1, 1).unwrap();
idx.add(&[Value::Integer(1)], 1, 1).unwrap(); assert_eq!(count(&idx), 1);
}
#[test]
fn test_pk_index_batch() {
let idx = make_idx();
let entries: Vec<(i64, &[Value])> = vec![
(1, &[Value::Integer(1)]),
(2, &[Value::Integer(2)]),
(3, &[Value::Integer(3)]),
];
idx.add_batch_slice(&entries).unwrap();
assert_eq!(count(&idx), 3);
let results = idx.find(&[Value::Integer(2)]).unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_pk_index_large_ids() {
let idx = make_idx();
for id in [0, 63, 64, 127, 1000] {
idx.add(&[Value::Integer(id)], id, id).unwrap();
}
assert_eq!(count(&idx), 5);
for id in [0, 63, 64, 127, 1000] {
assert_eq!(idx.find(&[Value::Integer(id)]).unwrap().len(), 1);
}
assert!(idx.find(&[Value::Integer(1)]).unwrap().is_empty());
assert!(idx.find(&[Value::Integer(65)]).unwrap().is_empty());
assert_eq!(idx.get_min_value(), Some(Value::Integer(0)));
assert_eq!(idx.get_max_value(), Some(Value::Integer(1000)));
}
#[test]
fn test_pk_index_ordered() {
let idx = make_idx();
for i in [5, 2, 8, 1, 9] {
idx.add(&[Value::Integer(i)], i, i).unwrap();
}
assert_eq!(
idx.get_row_ids_ordered(true, 100, 0).unwrap(),
vec![1, 2, 5, 8, 9]
);
assert_eq!(
idx.get_row_ids_ordered(false, 100, 0).unwrap(),
vec![9, 8, 5, 2, 1]
);
assert_eq!(idx.get_row_ids_ordered(true, 2, 1).unwrap(), vec![2, 5]);
}
#[test]
fn test_pk_index_bitset_memory() {
let idx = make_idx();
for i in 0..1000 {
idx.add(&[Value::Integer(i)], i, i).unwrap();
}
let inner = idx.data.read();
assert!(inner.words.len() <= 16);
assert_eq!(inner.count, 1000);
}
#[test]
fn test_pk_index_large_row_ids_overflow() {
let idx = make_idx();
let large_ids = [50_000_000i64, 2_000_000_000, i64::MAX - 1];
for &id in &large_ids {
idx.add(&[Value::Integer(id)], id, id).unwrap();
}
assert_eq!(count(&idx), 3);
for &id in &large_ids {
let results = idx.find(&[Value::Integer(id)]).unwrap();
assert_eq!(results.len(), 1, "find failed for id={id}");
assert_eq!(results[0].row_id, id);
}
assert!(idx.find(&[Value::Integer(99_999_999)]).unwrap().is_empty());
assert_eq!(idx.get_min_value(), Some(Value::Integer(50_000_000)));
assert_eq!(idx.get_max_value(), Some(Value::Integer(i64::MAX - 1)));
idx.remove(
&[Value::Integer(2_000_000_000)],
2_000_000_000,
2_000_000_000,
)
.unwrap();
assert_eq!(count(&idx), 2);
assert!(idx
.find(&[Value::Integer(2_000_000_000)])
.unwrap()
.is_empty());
let mut buf = Vec::new();
idx.get_row_ids_equal_into(&[Value::Integer(50_000_000)], &mut buf);
assert_eq!(buf, vec![50_000_000]);
buf.clear();
idx.get_row_ids_in_into(
&[
Value::Integer(50_000_000),
Value::Integer(i64::MAX - 1),
Value::Integer(999),
],
&mut buf,
);
assert_eq!(buf.len(), 2);
assert!(buf.contains(&50_000_000));
assert!(buf.contains(&(i64::MAX - 1)));
}
#[test]
fn test_pk_index_hybrid_range() {
let idx = make_idx();
idx.add(&[Value::Integer(5)], 5, 5).unwrap();
idx.add(&[Value::Integer(100)], 100, 100).unwrap();
idx.add(&[Value::Integer(20_000_000)], 20_000_000, 20_000_000)
.unwrap();
idx.add(&[Value::Integer(50_000_000)], 50_000_000, 50_000_000)
.unwrap();
let results = idx
.find_range(
&[Value::Integer(0)],
&[Value::Integer(100_000_000)],
true,
true,
)
.unwrap();
let row_ids: Vec<i64> = results.iter().map(|e| e.row_id).collect();
assert_eq!(row_ids, vec![5, 100, 20_000_000, 50_000_000]);
let results = idx
.find_range(&[Value::Integer(0)], &[Value::Integer(200)], true, true)
.unwrap();
assert_eq!(results.len(), 2);
let results = idx
.find_range(
&[Value::Integer(15_000_000)],
&[Value::Integer(60_000_000)],
true,
true,
)
.unwrap();
let row_ids: Vec<i64> = results.iter().map(|e| e.row_id).collect();
assert_eq!(row_ids, vec![20_000_000, 50_000_000]);
assert_eq!(idx.get_min_value(), Some(Value::Integer(5)));
assert_eq!(idx.get_max_value(), Some(Value::Integer(50_000_000)));
assert_eq!(
idx.get_row_ids_ordered(true, 100, 0).unwrap(),
vec![5, 100, 20_000_000, 50_000_000]
);
assert_eq!(
idx.get_row_ids_ordered(false, 100, 0).unwrap(),
vec![50_000_000, 20_000_000, 100, 5]
);
assert_eq!(
idx.get_row_ids_ordered(true, 2, 1).unwrap(),
vec![100, 20_000_000]
);
}
#[test]
fn test_pk_index_negative_ids_overflow() {
let idx = make_idx();
idx.add(&[Value::Integer(-1)], -1, -1).unwrap();
idx.add(&[Value::Integer(-100)], -100, -100).unwrap();
idx.add(&[Value::Integer(5)], 5, 5).unwrap();
assert_eq!(count(&idx), 3);
assert_eq!(idx.find(&[Value::Integer(-1)]).unwrap().len(), 1);
assert_eq!(idx.find(&[Value::Integer(-100)]).unwrap().len(), 1);
assert!(idx.find(&[Value::Integer(-50)]).unwrap().is_empty());
assert_eq!(idx.get_min_value(), Some(Value::Integer(-100)));
assert_eq!(idx.get_max_value(), Some(Value::Integer(5)));
idx.remove(&[Value::Integer(-1)], -1, -1).unwrap();
assert_eq!(count(&idx), 2);
assert!(idx.find(&[Value::Integer(-1)]).unwrap().is_empty());
}
#[test]
fn test_pk_index_i64_min() {
let idx = make_idx();
idx.add(&[Value::Integer(i64::MIN)], i64::MIN, i64::MIN)
.unwrap();
idx.add(&[Value::Integer(0)], 0, 0).unwrap();
idx.add(&[Value::Integer(100)], 100, 100).unwrap();
assert_eq!(count(&idx), 3);
assert_eq!(idx.find(&[Value::Integer(i64::MIN)]).unwrap().len(), 1);
assert_eq!(
idx.find(&[Value::Integer(i64::MIN)]).unwrap()[0].row_id,
i64::MIN
);
idx.add(&[Value::Integer(i64::MIN)], i64::MIN, i64::MIN)
.unwrap();
assert_eq!(count(&idx), 3);
assert_eq!(idx.get_min_value(), Some(Value::Integer(i64::MIN)));
assert_eq!(idx.get_max_value(), Some(Value::Integer(100)));
let mut buf = Vec::new();
idx.get_row_ids_in_into(
&[
Value::Integer(i64::MIN),
Value::Integer(0),
Value::Integer(999),
],
&mut buf,
);
assert_eq!(buf.len(), 2);
assert!(buf.contains(&i64::MIN));
assert!(buf.contains(&0));
let results = idx
.find_range(&[], &[Value::Integer(0)], false, true)
.unwrap();
let row_ids: Vec<i64> = results.iter().map(|e| e.row_id).collect();
assert!(row_ids.contains(&i64::MIN));
assert!(row_ids.contains(&0));
idx.remove(&[Value::Integer(i64::MIN)], i64::MIN, i64::MIN)
.unwrap();
assert_eq!(count(&idx), 2);
assert!(idx.find(&[Value::Integer(i64::MIN)]).unwrap().is_empty());
idx.remove(&[Value::Integer(i64::MIN)], i64::MIN, i64::MIN)
.unwrap();
assert_eq!(count(&idx), 2);
}
#[test]
fn test_pk_index_overflow_batch_ops() {
let idx = make_idx();
let entries: Vec<(i64, &[Value])> = vec![
(1, &[Value::Integer(1)]),
(100_000_000, &[Value::Integer(100_000_000)]),
(5, &[Value::Integer(5)]),
];
idx.add_batch_slice(&entries).unwrap();
assert_eq!(count(&idx), 3);
assert_eq!(idx.find(&[Value::Integer(1)]).unwrap().len(), 1);
assert_eq!(idx.find(&[Value::Integer(100_000_000)]).unwrap().len(), 1);
assert_eq!(idx.find(&[Value::Integer(5)]).unwrap().len(), 1);
let remove_entries: Vec<(i64, &[Value])> = vec![
(100_000_000, &[Value::Integer(100_000_000)]),
(5, &[Value::Integer(5)]),
];
idx.remove_batch_slice(&remove_entries).unwrap();
assert_eq!(count(&idx), 1);
assert!(idx.find(&[Value::Integer(100_000_000)]).unwrap().is_empty());
assert!(idx.find(&[Value::Integer(5)]).unwrap().is_empty());
assert_eq!(idx.find(&[Value::Integer(1)]).unwrap().len(), 1);
}
#[test]
fn test_pk_index_clear_with_overflow() {
let idx = make_idx();
idx.add(&[Value::Integer(1)], 1, 1).unwrap();
idx.add(&[Value::Integer(50_000_000)], 50_000_000, 50_000_000)
.unwrap();
idx.add(&[Value::Integer(i64::MIN)], i64::MIN, i64::MIN)
.unwrap();
assert_eq!(count(&idx), 3);
idx.clear().unwrap();
assert_eq!(count(&idx), 0);
assert!(idx.find(&[Value::Integer(1)]).unwrap().is_empty());
assert!(idx.find(&[Value::Integer(50_000_000)]).unwrap().is_empty());
assert!(idx.find(&[Value::Integer(i64::MIN)]).unwrap().is_empty());
assert!(idx.get_min_value().is_none());
assert!(idx.get_max_value().is_none());
}
#[test]
fn test_pk_index_count_always_accurate() {
let idx = make_idx();
idx.add(&[Value::Integer(1)], 1, 1).unwrap();
assert_eq!(count(&idx), 1);
idx.add(&[Value::Integer(1)], 1, 1).unwrap(); assert_eq!(count(&idx), 1);
idx.add(&[Value::Integer(50_000_000)], 50_000_000, 50_000_000)
.unwrap();
assert_eq!(count(&idx), 2);
idx.add(&[Value::Integer(50_000_000)], 50_000_000, 50_000_000)
.unwrap(); assert_eq!(count(&idx), 2);
idx.remove(&[Value::Integer(1)], 1, 1).unwrap();
assert_eq!(count(&idx), 1);
idx.remove(&[Value::Integer(1)], 1, 1).unwrap(); assert_eq!(count(&idx), 1);
idx.remove(&[Value::Integer(50_000_000)], 50_000_000, 50_000_000)
.unwrap();
assert_eq!(count(&idx), 0);
idx.remove(&[Value::Integer(50_000_000)], 50_000_000, 50_000_000)
.unwrap(); assert_eq!(count(&idx), 0);
}
#[test]
fn test_pk_index_for_each_group_sorted_order() {
let idx = make_idx();
idx.add(&[Value::Integer(i64::MIN)], i64::MIN, i64::MIN)
.unwrap();
idx.add(&[Value::Integer(-50)], -50, -50).unwrap();
idx.add(&[Value::Integer(-1)], -1, -1).unwrap();
idx.add(&[Value::Integer(0)], 0, 0).unwrap();
idx.add(&[Value::Integer(100)], 100, 100).unwrap();
idx.add(&[Value::Integer(9_999_999)], 9_999_999, 9_999_999)
.unwrap();
idx.add(&[Value::Integer(20_000_000)], 20_000_000, 20_000_000)
.unwrap();
idx.add(&[Value::Integer(100_000_000)], 100_000_000, 100_000_000)
.unwrap();
let mut collected = Vec::new();
idx.for_each_group(&mut |val, row_ids| {
if let Value::Integer(id) = val {
collected.push(*id);
assert_eq!(row_ids.len(), 1);
assert_eq!(row_ids[0], *id);
}
Ok(true)
})
.unwrap()
.unwrap();
assert_eq!(
collected,
vec![
i64::MIN,
-50,
-1,
0,
100,
9_999_999,
20_000_000,
100_000_000
]
);
}
}