use crate::database::mem_buffer::IndexMemBuffer;
use crate::index::btree_generic::{GenericBTree, GenericBTreeConfig, BTreeKey};
use crate::index::cached_index::CachedIndex;
use crate::types::{RowId, Value};
use crate::{Result, StorageError};
use parking_lot::{Mutex, RwLock};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ColumnValueIndexConfig {
pub max_page_size: usize,
pub cache_size: usize,
}
impl Default for ColumnValueIndexConfig {
fn default() -> Self {
Self {
max_page_size: 4096,
cache_size: 16,
}
}
}
const VALUE_DATA_SIZE: usize = 12;
const ROW_ID_SIZE: usize = 8;
const VALUE_LEN_SIZE: usize = 2;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct IndexKey {
value_bytes: Vec<u8>,
row_id: RowId,
}
impl std::hash::Hash for IndexKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.value_bytes.hash(state);
self.row_id.hash(state);
}
}
fn tombstone_key(key: &IndexKey) -> IndexKey {
IndexKey {
value_bytes: if key.value_bytes.len() > VALUE_DATA_SIZE {
key.value_bytes[..VALUE_DATA_SIZE].to_vec()
} else {
key.value_bytes.clone()
},
row_id: key.row_id,
}
}
impl BTreeKey for IndexKey {
fn serialize(&self) -> Vec<u8> {
let key_size = Self::key_size();
let mut result = vec![0u8; key_size];
let val_len = self.value_bytes.len().min(VALUE_DATA_SIZE);
result[..val_len].copy_from_slice(&self.value_bytes[..val_len]);
result[VALUE_DATA_SIZE..VALUE_DATA_SIZE + ROW_ID_SIZE]
.copy_from_slice(&self.row_id.to_be_bytes());
let vlen = self.value_bytes.len() as u16;
result[VALUE_DATA_SIZE + ROW_ID_SIZE..VALUE_DATA_SIZE + ROW_ID_SIZE + VALUE_LEN_SIZE]
.copy_from_slice(&vlen.to_be_bytes());
result
}
fn deserialize(bytes: &[u8]) -> Result<Self> {
let key_size = Self::key_size();
if bytes.len() < key_size {
return Err(StorageError::Serialization("Invalid key: too short".to_string()));
}
let vlen = u16::from_be_bytes(
bytes[VALUE_DATA_SIZE + ROW_ID_SIZE..VALUE_DATA_SIZE + ROW_ID_SIZE + VALUE_LEN_SIZE]
.try_into()
.map_err(|_| StorageError::Serialization("Invalid value_len".to_string()))?
) as usize;
let value_bytes = if vlen <= VALUE_DATA_SIZE {
bytes[..vlen].to_vec()
} else {
bytes[..VALUE_DATA_SIZE].to_vec()
};
let row_id = u64::from_be_bytes(
bytes[VALUE_DATA_SIZE..VALUE_DATA_SIZE + ROW_ID_SIZE]
.try_into()
.map_err(|_| StorageError::Serialization("Invalid row_id".to_string()))?
);
Ok(IndexKey { value_bytes, row_id })
}
fn key_size() -> usize {
VALUE_DATA_SIZE + ROW_ID_SIZE + VALUE_LEN_SIZE }
}
pub struct ColumnValueIndex {
_table_name: String,
column_name: String,
_storage_path: PathBuf,
btree: Arc<RwLock<GenericBTree<IndexKey>>>,
lru_cache: Arc<CachedIndex>,
mem_buffer: IndexMemBuffer<IndexKey, ()>,
tombstones: Mutex<HashSet<IndexKey>>,
drain_lock: Mutex<()>,
}
impl ColumnValueIndex {
pub fn create<P: AsRef<Path>>(
path: P,
table_name: String,
column_name: String,
config: ColumnValueIndexConfig,
) -> Result<Self> {
let storage_path = path.as_ref().to_path_buf();
let btree_config = GenericBTreeConfig {
cache_size: config.cache_size,
unique_keys: false,
allow_updates: true,
immediate_sync: false,
};
let btree = GenericBTree::with_config(storage_path.clone(), btree_config)?;
Ok(Self {
_table_name: table_name,
column_name,
_storage_path: storage_path,
btree: Arc::new(RwLock::new(btree)),
lru_cache: Arc::new(CachedIndex::new(500)),
mem_buffer: IndexMemBuffer::new(1024 * 1024), tombstones: Mutex::new(HashSet::new()),
drain_lock: Mutex::new(()),
})
}
pub fn open<P: AsRef<Path>>(
path: P,
table_name: String,
column_name: String,
config: ColumnValueIndexConfig,
) -> Result<Self> {
Self::create(path, table_name, column_name, config)
}
pub fn insert(&self, value: &Value, row_id: RowId) -> Result<()> {
let value_bytes = self.value_to_bytes(value)?;
let key = IndexKey {
value_bytes,
row_id,
};
let full = self.mem_buffer.insert(key.clone(), ()).map_err(|e| {
StorageError::InvalidData(e)
})?;
self.tombstones.lock().remove(&tombstone_key(&key));
if full {
if let Some(_guard) = self.drain_lock.try_lock() {
self.drain_immutable_to_btree()?;
}
}
self.lru_cache.invalidate(value);
Ok(())
}
pub fn batch_insert(&self, items: Vec<(Value, RowId)>) -> Result<()> {
if items.is_empty() {
return Ok(());
}
let mut keys: Vec<(IndexKey, Value)> = items.into_iter()
.map(|(value, row_id)| {
let value_bytes = self.value_to_bytes(&value)?;
let key = IndexKey {
value_bytes,
row_id,
};
Ok((key, value))
})
.collect::<Result<Vec<_>>>()?;
keys.sort_by(|a, b| a.0.value_bytes.cmp(&b.0.value_bytes));
{
let mut tombstones = self.tombstones.lock();
for (key, _) in &keys {
tombstones.remove(&tombstone_key(key));
}
}
for (key, value) in &keys {
let full = self.mem_buffer.insert(key.clone(), ()).map_err(|e| {
StorageError::InvalidData(e)
})?;
if full {
if let Some(_guard) = self.drain_lock.try_lock() {
self.drain_immutable_to_btree()?;
}
}
self.lru_cache.invalidate(value);
}
Ok(())
}
pub fn get(&self, value: &Value) -> Result<Vec<RowId>> {
if let Some(cached_ids) = self.lru_cache.get(value) {
return Ok((*cached_ids).clone());
}
self.lru_cache.record_miss();
let value_bytes = self.value_to_bytes(value)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: value_bytes.clone(),
row_id: 0,
};
let end_key = IndexKey {
value_bytes: value_bytes.clone(),
row_id: RowId::MAX,
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if key.value_bytes == value_bytes && seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if key.value_bytes == value_bytes && seen.insert(key.row_id) {
results.push(key);
}
}
}
let row_ids = {
let tombstones = self.tombstones.lock();
let filtered: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
if !filtered.is_empty() {
self.lru_cache.put(value.clone(), filtered.clone());
}
filtered
};
Ok(row_ids)
}
pub fn range(&self, start: &Value, end: &Value) -> Result<Vec<RowId>> {
let start_bytes = self.value_to_bytes(start)?;
let end_bytes = self.value_to_bytes(end)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: start_bytes,
row_id: 0,
};
let end_key = IndexKey {
value_bytes: end_bytes,
row_id: RowId::MAX,
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn scan_all_row_ids(&self) -> Result<Vec<RowId>> {
self.scan_row_ids_with_limit(None)
}
pub fn scan_row_ids_with_limit(&self, limit: Option<usize>) -> Result<Vec<RowId>> {
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let min_key = IndexKey {
value_bytes: vec![],
row_id: 0,
};
let max_key = IndexKey {
value_bytes: vec![0xFF; VALUE_DATA_SIZE],
row_id: RowId::MAX,
};
let buffer_results = self.mem_buffer.range(&min_key, &max_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let all_entries = if let Some(limit_count) = limit {
btree.range_with_limit(&min_key, &max_key, limit_count)?
} else {
btree.range(&min_key, &max_key)?
};
for (key, _) in all_entries {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn query_less_than(&self, upper_bound: &Value) -> Result<Vec<RowId>> {
let upper_bytes = self.value_to_bytes(upper_bound)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: vec![],
row_id: 0,
};
let end_key = IndexKey {
value_bytes: upper_bytes,
row_id: 0,
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn query_greater_than(&self, lower_bound: &Value) -> Result<Vec<RowId>> {
let lower_bytes = self.value_to_bytes(lower_bound)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: lower_bytes,
row_id: RowId::MAX,
};
let end_key = IndexKey {
value_bytes: vec![0xFF; VALUE_DATA_SIZE],
row_id: RowId::MAX,
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn query_less_than_or_equal(&self, upper_bound: &Value) -> Result<Vec<RowId>> {
let upper_bytes = self.value_to_bytes(upper_bound)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: vec![],
row_id: 0,
};
let end_key = IndexKey {
value_bytes: upper_bytes,
row_id: RowId::MAX,
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn query_greater_than_or_equal(&self, lower_bound: &Value) -> Result<Vec<RowId>> {
let lower_bytes = self.value_to_bytes(lower_bound)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: lower_bytes,
row_id: 0,
};
let end_key = IndexKey {
value_bytes: vec![0xFF; VALUE_DATA_SIZE],
row_id: RowId::MAX,
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn query_between(&self,
lower_bound: &Value, lower_inclusive: bool,
upper_bound: &Value, upper_inclusive: bool) -> Result<Vec<RowId>> {
let lower_bytes = self.value_to_bytes(lower_bound)?;
let upper_bytes = self.value_to_bytes(upper_bound)?;
let mut results: Vec<IndexKey> = Vec::new();
let mut seen = HashSet::new();
let start_key = IndexKey {
value_bytes: lower_bytes,
row_id: if lower_inclusive { 0 } else { RowId::MAX },
};
let end_key = IndexKey {
value_bytes: upper_bytes,
row_id: if upper_inclusive { RowId::MAX } else { 0 },
};
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in buffer_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
{
let btree = self.btree.read();
let btree_results = btree.range(&start_key, &end_key)?;
for (key, _) in btree_results {
if seen.insert(key.row_id) {
results.push(key);
}
}
}
let tombstones = self.tombstones.lock();
let row_ids: Vec<RowId> = results.into_iter()
.filter(|key| !tombstones.contains(&tombstone_key(key)))
.map(|key| key.row_id)
.collect();
Ok(row_ids)
}
pub fn delete(&self, value: &Value, row_id: RowId) -> Result<()> {
let value_bytes = self.value_to_bytes(value)?;
let key = IndexKey {
value_bytes,
row_id,
};
self.mem_buffer.delete(&key);
self.tombstones.lock().insert(tombstone_key(&key));
let mut btree = self.btree.write();
btree.delete(&key)?;
drop(btree);
self.lru_cache.invalidate(value);
Ok(())
}
pub fn batch_delete(&self, items: Vec<(Value, RowId)>) -> Result<()> {
if items.is_empty() {
return Ok(());
}
{
let mut tombstones = self.tombstones.lock();
let mut btree = self.btree.write();
for (value, row_id) in &items {
let value_bytes = self.value_to_bytes(value)?;
let key = IndexKey {
value_bytes,
row_id: *row_id,
};
self.mem_buffer.delete(&key);
tombstones.insert(tombstone_key(&key));
btree.delete(&key)?;
}
}
let mut unique_values = items.into_iter()
.map(|(value, _)| value)
.collect::<Vec<_>>();
unique_values.sort_by(|a, b| {
let a_bytes = Self::value_to_bytes_helper(a).unwrap_or_default();
let b_bytes = Self::value_to_bytes_helper(b).unwrap_or_default();
a_bytes.cmp(&b_bytes)
});
unique_values.dedup_by(|a, b| {
let a_bytes = Self::value_to_bytes_helper(a).unwrap_or_default();
let b_bytes = Self::value_to_bytes_helper(b).unwrap_or_default();
a_bytes == b_bytes
});
self.lru_cache.invalidate_batch(&unique_values);
Ok(())
}
pub fn delete_range(&self, start: &Value, end: &Value) -> Result<usize> {
let start_bytes = self.value_to_bytes(start)?;
let end_bytes = self.value_to_bytes(end)?;
let start_key = IndexKey {
value_bytes: start_bytes.clone(),
row_id: 0,
};
let end_key = IndexKey {
value_bytes: end_bytes.clone(),
row_id: RowId::MAX,
};
let mut deleted_count = 0;
let mut tombstones = self.tombstones.lock();
let mut btree = self.btree.write();
let keys_to_delete: Vec<IndexKey> = btree.range(&start_key, &end_key)?
.into_iter()
.map(|(key, _)| key)
.collect();
for key in &keys_to_delete {
btree.delete(key)?;
tombstones.insert(tombstone_key(key));
deleted_count += 1;
}
drop(btree);
let buffer_results = self.mem_buffer.range(&start_key, &end_key);
for (key, _) in &buffer_results {
self.mem_buffer.delete(key);
tombstones.insert(tombstone_key(key));
deleted_count += 1;
}
drop(tombstones);
self.lru_cache.invalidate_range(start, end);
Ok(deleted_count)
}
pub fn flush(&self) -> Result<()> {
self.flush_buffer()?;
let mut btree = self.btree.write();
btree.flush()?;
Ok(())
}
fn drain_immutable_to_btree(&self) -> Result<()> {
while self.mem_buffer.should_flush() {
if let Some(entries) = self.mem_buffer.flush().map_err(|e| StorageError::InvalidData(e))? {
if !entries.is_empty() {
let tombstones = self.tombstones.lock();
let mut btree = self.btree.write();
for (key, _) in entries {
if !tombstones.contains(&tombstone_key(&key)) {
btree.insert(key, vec![])?;
}
}
}
} else {
break;
}
}
Ok(())
}
pub fn flush_buffer(&self) -> Result<()> {
let entries = self.mem_buffer.drain();
if !entries.is_empty() {
let tombstones = self.tombstones.lock();
let mut btree = self.btree.write();
for (key, _) in &entries {
if !tombstones.contains(&tombstone_key(key)) {
btree.insert(key.clone(), vec![])?;
}
}
drop(btree);
drop(tombstones);
self.tombstones.lock().clear();
}
Ok(())
}
pub fn stats(&self) -> IndexStats {
let lru_stats = self.lru_cache.stats();
IndexStats {
cached_values: lru_stats.size,
total_row_ids: 0,
}
}
pub fn entry_count(&self) -> usize {
let btree = self.btree.read();
btree.approximate_entry_count()
}
fn value_to_bytes(&self, value: &Value) -> Result<Vec<u8>> {
Self::value_to_bytes_helper(value)
}
fn value_to_bytes_helper(value: &Value) -> Result<Vec<u8>> {
let bytes = match value {
Value::Integer(i) => i.to_be_bytes().to_vec(),
Value::Float(f) => f.to_be_bytes().to_vec(),
Value::Text(s) => s.as_bytes().to_vec(),
Value::Bool(b) => vec![if *b { 1 } else { 0 }],
Value::Timestamp(ts) => ts.as_micros().to_be_bytes().to_vec(),
_ => {
return Err(StorageError::InvalidData(
format!("Unsupported value type for indexing: {:?}", value)
));
}
};
Ok(bytes)
}
}
#[derive(Debug, Clone)]
pub struct IndexStats {
pub cached_values: usize,
pub total_row_ids: usize,
}
use crate::index::builder::{IndexBuilder, BuildStats};
use crate::types::Row;
impl IndexBuilder for ColumnValueIndex {
fn build_from_memtable(&mut self, _rows: &[(RowId, Row)]) -> Result<()> {
debug_log!("[ColumnIndex::{}] ⚠️ build_from_memtable is deprecated, use insert_batch instead",
self.column_name);
Ok(())
}
fn persist(&mut self) -> Result<()> {
use std::time::Instant;
let start = Instant::now();
self.flush()?;
let duration = start.elapsed();
debug_log!("[ColumnIndex::{}] Persist: {:?}", self.column_name, duration);
Ok(())
}
fn name(&self) -> &str {
&self.column_name
}
fn stats(&self) -> BuildStats {
let stats = self.stats();
BuildStats {
rows_processed: stats.total_row_ids,
build_time_ms: 0,
persist_time_ms: 0,
index_size_bytes: stats.total_row_ids * 22,
}
}
}
impl ColumnValueIndex {
pub fn insert_batch(&self, batch: &[(RowId, &Value)]) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
for (row_id, value) in batch {
self.insert(value, *row_id)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_column_value_index_basic() -> Result<()> {
let temp_dir = TempDir::new()?;
let path = temp_dir.path().join("test_index.idx");
let index = ColumnValueIndex::create(
&path,
"users".to_string(),
"age".to_string(),
ColumnValueIndexConfig::default(),
)?;
index.insert(&Value::Integer(25), 1)?;
index.insert(&Value::Integer(30), 2)?;
index.insert(&Value::Integer(25), 3)?;
let row_ids = index.get(&Value::Integer(25))?;
assert_eq!(row_ids.len(), 2);
assert!(row_ids.contains(&1));
assert!(row_ids.contains(&3));
Ok(())
}
#[test]
fn test_column_value_index_delete_tombstone() -> Result<()> {
let temp_dir = TempDir::new()?;
let path = temp_dir.path().join("test_tombstone.idx");
let index = ColumnValueIndex::create(
&path,
"users".to_string(),
"age".to_string(),
ColumnValueIndexConfig::default(),
)?;
index.insert(&Value::Integer(25), 1)?;
index.insert(&Value::Integer(25), 2)?;
index.delete(&Value::Integer(25), 1)?;
let row_ids = index.get(&Value::Integer(25))?;
assert_eq!(row_ids.len(), 1);
assert!(row_ids.contains(&2));
index.insert(&Value::Integer(25), 1)?;
let row_ids = index.get(&Value::Integer(25))?;
assert_eq!(row_ids.len(), 2);
assert!(row_ids.contains(&1));
assert!(row_ids.contains(&2));
Ok(())
}
#[test]
fn test_column_value_index_range_with_delete() -> Result<()> {
let temp_dir = TempDir::new()?;
let path = temp_dir.path().join("test_range_delete.idx");
let index = ColumnValueIndex::create(
&path,
"users".to_string(),
"age".to_string(),
ColumnValueIndexConfig::default(),
)?;
for i in 10..20 {
index.insert(&Value::Integer(i), i as RowId)?;
}
let deleted = index.delete_range(&Value::Integer(13), &Value::Integer(17))?;
assert!(deleted > 0);
let row_ids = index.range(&Value::Integer(10), &Value::Integer(19))?;
let expected: Vec<RowId> = vec![10, 11, 12, 18, 19];
assert_eq!(row_ids.len(), expected.len());
for id in &expected {
assert!(row_ids.contains(id));
}
Ok(())
}
#[test]
fn test_tombstone_key_normalization() {
let short = IndexKey { value_bytes: b"hello".to_vec(), row_id: 42 };
let tk_short = tombstone_key(&short);
assert_eq!(tk_short.value_bytes, b"hello".to_vec());
let long = IndexKey {
value_bytes: b"abcdefghijklmno_xtralong".to_vec(),
row_id: 99,
};
let tk_long = tombstone_key(&long);
assert_eq!(tk_long.value_bytes, b"abcdefghijkl".to_vec()); assert_eq!(tk_long.row_id, 99);
}
#[test]
fn test_column_value_index_long_text_tombstone() -> Result<()> {
let temp_dir = TempDir::new()?;
let path = temp_dir.path().join("test_long_text.idx");
let index = ColumnValueIndex::create(
&path,
"users".to_string(),
"bio".to_string(),
ColumnValueIndexConfig::default(),
)?;
let long_val = Value::Text("abcdefghijklmno_xtralong_value".to_string());
index.insert(&long_val, 1)?;
index.insert(&long_val, 2)?;
index.flush_buffer()?;
index.delete(&long_val, 1)?;
let row_ids = index.get(&long_val)?;
assert_eq!(row_ids.len(), 1);
assert!(row_ids.contains(&2));
assert!(!row_ids.contains(&1));
Ok(())
}
#[test]
fn test_column_value_index_concurrent_stress() -> Result<()> {
use std::sync::atomic::{AtomicBool, Ordering};
let temp_dir = TempDir::new()?;
let path = temp_dir.path().join("test_concurrent.idx");
let index = Arc::new(ColumnValueIndex::create(
&path,
"users".to_string(),
"age".to_string(),
ColumnValueIndexConfig::default(),
)?);
let stop = Arc::new(AtomicBool::new(false));
let n = 500;
for i in 0..n {
index.insert(&Value::Integer(i % 50), i as RowId)?;
}
let mut handles = vec![];
{
let index = Arc::clone(&index);
let stop = Arc::clone(&stop);
handles.push(std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
for i in 0..100 {
let _ = index.insert(&Value::Integer(i % 50), i as RowId);
}
}
}));
}
{
let index = Arc::clone(&index);
let stop = Arc::clone(&stop);
handles.push(std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
for i in 0..50 {
let _ = index.delete(&Value::Integer(i), i as RowId);
let _ = index.insert(&Value::Integer(i), i as RowId);
}
}
}));
}
{
let index = Arc::clone(&index);
let stop = Arc::clone(&stop);
handles.push(std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
if let Ok(ids) = index.get(&Value::Integer(25)) {
for &id in &ids {
assert!(id < n as RowId,
"get() returned unexpected row_id {}", id);
}
}
if let Ok(ids) = index.query_less_than_or_equal(&Value::Integer(10)) {
for &id in &ids {
assert!(id < n as RowId,
"range() returned unexpected row_id {}", id);
}
}
}
}));
}
std::thread::sleep(std::time::Duration::from_millis(500));
stop.store(true, Ordering::Relaxed);
for handle in handles {
handle.join().unwrap();
}
for i in 0..10 {
index.delete(&Value::Integer(i), i as RowId)?;
}
for i in 0..10 {
let ids = index.get(&Value::Integer(i))?;
assert!(!ids.contains(&(i as RowId)),
"Deleted key (value={}, row_id={}) still present", i, i);
}
Ok(())
}
}