use parking_lot::RwLock;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use rustc_hash::FxHashMap;
use crate::common::{CompactArc, CompactVec, I64Map};
use crate::core::{DataType, Error, IndexEntry, IndexType, Operator, Result, RowIdVec, Value};
use crate::storage::expression::{ComparisonExpr, Expression, InListExpr};
use crate::storage::traits::Index;
const HASH_SEEDS: [u64; 4] = [
0x517cc1b727220a95,
0x8a36afbc28b36e9c,
0x2f24bc8d75cd8b0a,
0xe9a5e3f10d13d6f7,
];
fn hash_values(values: &[Value]) -> u64 {
let hasher_builder =
ahash::RandomState::with_seeds(HASH_SEEDS[0], HASH_SEEDS[1], HASH_SEEDS[2], HASH_SEEDS[3]);
let mut hasher = hasher_builder.build_hasher();
for v in values {
v.hash(&mut hasher);
}
hasher.finish()
}
pub struct HashIndex {
name: String,
table_name: String,
column_names: Vec<String>,
column_ids: Vec<i32>,
data_types: Vec<DataType>,
is_unique: bool,
closed: AtomicBool,
hash_to_rows: RwLock<FxHashMap<u64, CompactVec<i64>>>,
row_to_hash: RwLock<I64Map<u64>>,
#[allow(clippy::type_complexity)]
hash_to_values: RwLock<FxHashMap<u64, Vec<(Vec<CompactArc<Value>>, CompactVec<i64>)>>>,
}
impl std::fmt::Debug for HashIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashIndex")
.field("name", &self.name)
.field("table_name", &self.table_name)
.field("column_names", &self.column_names)
.field("column_ids", &self.column_ids)
.field("is_unique", &self.is_unique)
.field("closed", &self.closed.load(AtomicOrdering::Relaxed))
.finish_non_exhaustive()
}
}
impl HashIndex {
pub fn new(
name: String,
table_name: String,
column_names: Vec<String>,
column_ids: Vec<i32>,
data_types: Vec<DataType>,
is_unique: bool,
expected_rows: usize,
) -> Self {
Self {
name,
table_name,
column_names,
column_ids,
data_types,
is_unique,
closed: AtomicBool::new(false),
hash_to_rows: RwLock::new(if expected_rows > 0 {
FxHashMap::with_capacity_and_hasher(expected_rows, Default::default())
} else {
FxHashMap::default()
}),
row_to_hash: RwLock::new(if expected_rows > 0 {
I64Map::with_capacity(expected_rows)
} else {
I64Map::new()
}),
hash_to_values: RwLock::new(if expected_rows > 0 {
FxHashMap::with_capacity_and_hasher(expected_rows, Default::default())
} else {
FxHashMap::default()
}),
}
}
#[allow(clippy::type_complexity)]
fn check_unique_constraint(
&self,
values: &[Value],
row_id: i64,
hash: u64,
hash_to_values: &FxHashMap<u64, Vec<(Vec<CompactArc<Value>>, CompactVec<i64>)>>,
) -> Result<()> {
if !self.is_unique {
return Ok(());
}
for v in values {
if v.is_null() {
return Ok(());
}
}
if let Some(entries) = hash_to_values.get(&hash) {
for (stored_values, row_ids) in entries {
if Self::values_match(stored_values, values) && !row_ids.is_empty() {
if row_ids.iter().any(|&id| id != row_id) {
let values_str: Vec<String> =
values.iter().map(|v| format!("{:?}", v)).collect();
return Err(Error::unique_constraint(
&self.name,
self.column_names.join(", "),
format!("[{}]", values_str.join(", ")),
));
}
}
}
}
Ok(())
}
#[inline]
fn values_match(stored: &[CompactArc<Value>], input: &[Value]) -> bool {
if stored.len() != input.len() {
return false;
}
match stored.len() {
0 => true,
1 => stored[0].as_ref() == &input[0],
2 => stored[0].as_ref() == &input[0] && stored[1].as_ref() == &input[1],
3 => {
stored[0].as_ref() == &input[0]
&& stored[1].as_ref() == &input[1]
&& stored[2].as_ref() == &input[2]
}
4 => {
stored[0].as_ref() == &input[0]
&& stored[1].as_ref() == &input[1]
&& stored[2].as_ref() == &input[2]
&& stored[3].as_ref() == &input[3]
}
_ => stored
.iter()
.zip(input.iter())
.all(|(s, i)| s.as_ref() == i),
}
}
}
impl Index for HashIndex {
fn name(&self) -> &str {
&self.name
}
fn table_name(&self) -> &str {
&self.table_name
}
fn build(&mut self) -> Result<()> {
Ok(())
}
fn add(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
let num_cols = self.column_ids.len();
if values.len() != num_cols {
return Err(Error::internal(format!(
"expected {} values, got {}",
num_cols,
values.len()
)));
}
let hash = hash_values(values);
let mut hash_to_rows = self.hash_to_rows.write();
let mut row_to_hash = self.row_to_hash.write();
let mut hash_to_values = self.hash_to_values.write();
if let Some(&old_hash) = row_to_hash.get(row_id) {
if old_hash == hash {
if let Some(entries) = hash_to_values.get(&old_hash) {
let old_values_match = entries.iter().any(|(stored_values, row_ids)| {
row_ids.contains(&row_id) && Self::values_match(stored_values, values)
});
if old_values_match {
return Ok(());
}
}
}
if let Some(rows) = hash_to_rows.get_mut(&old_hash) {
rows.retain(|id| *id != row_id);
if rows.is_empty() {
hash_to_rows.remove(&old_hash);
}
}
if let Some(entries) = hash_to_values.get_mut(&old_hash) {
for (_, row_ids) in entries.iter_mut() {
row_ids.retain(|id| *id != row_id);
}
entries.retain(|(_, row_ids)| !row_ids.is_empty());
if entries.is_empty() {
hash_to_values.remove(&old_hash);
}
}
}
self.check_unique_constraint(values, row_id, hash, &hash_to_values)?;
let rows = hash_to_rows.entry(hash).or_default();
if let Err(pos) = rows.binary_search(&row_id) {
rows.insert(pos, row_id);
}
row_to_hash.insert(row_id, hash);
let entries = hash_to_values.entry(hash).or_default();
let mut found = false;
for (stored_values, row_ids) in entries.iter_mut() {
if Self::values_match(stored_values, values) {
if let Err(pos) = row_ids.binary_search(&row_id) {
row_ids.insert(pos, row_id);
}
found = true;
break;
}
}
if !found {
let arc_values: Vec<CompactArc<Value>> =
values.iter().map(|v| CompactArc::new(v.clone())).collect();
let mut row_ids = CompactVec::new();
row_ids.push(row_id); entries.push((arc_values, row_ids));
}
Ok(())
}
fn add_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
for (row_id, values) in entries.iter() {
self.add(values, row_id, 0)?;
}
Ok(())
}
fn remove(&self, values: &[Value], row_id: i64, _ref_id: i64) -> Result<()> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
let hash = hash_values(values);
let mut hash_to_rows = self.hash_to_rows.write();
let mut row_to_hash = self.row_to_hash.write();
let mut hash_to_values = self.hash_to_values.write();
if let Some(rows) = hash_to_rows.get_mut(&hash) {
if let Ok(pos) = rows.binary_search(&row_id) {
rows.remove(pos);
}
if rows.is_empty() {
hash_to_rows.remove(&hash);
}
}
row_to_hash.remove(row_id);
if let Some(entries) = hash_to_values.get_mut(&hash) {
for (stored_values, row_ids) in entries.iter_mut() {
if Self::values_match(stored_values, values) {
if let Ok(pos) = row_ids.binary_search(&row_id) {
row_ids.remove(pos);
}
break;
}
}
entries.retain(|(_, row_ids)| !row_ids.is_empty());
if entries.is_empty() {
hash_to_values.remove(&hash);
}
}
Ok(())
}
fn remove_batch(&self, entries: &I64Map<Vec<Value>>) -> Result<()> {
for (row_id, values) in entries.iter() {
self.remove(values, row_id, 0)?;
}
Ok(())
}
fn add_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
let num_cols = self.column_ids.len();
let mut hash_to_rows = self.hash_to_rows.write();
let mut row_to_hash = self.row_to_hash.write();
let mut hash_to_values = self.hash_to_values.write();
hash_to_rows.reserve(entries.len());
row_to_hash.reserve(entries.len());
hash_to_values.reserve(entries.len());
if self.is_unique {
let mut batch_values: ahash::AHashMap<&[Value], i64> =
ahash::AHashMap::with_capacity(entries.len());
for &(row_id, values) in entries {
if values.len() != num_cols {
return Err(Error::internal(format!(
"expected {} values, got {}",
num_cols,
values.len()
)));
}
if values.iter().any(|v| v.is_null()) {
continue;
}
let hash = hash_values(values);
self.check_unique_constraint(values, row_id, hash, &hash_to_values)?;
if let Some(&existing_row_id) = batch_values.get(values) {
if existing_row_id != row_id {
let values_str: Vec<String> =
values.iter().map(|v| format!("{:?}", v)).collect();
return Err(Error::unique_constraint(
&self.name,
self.column_names.join(", "),
format!("[{}]", values_str.join(", ")),
));
}
}
batch_values.insert(values, row_id);
}
}
for &(row_id, values) in entries {
if values.len() != num_cols {
continue; }
let hash = hash_values(values);
if let Some(&old_hash) = row_to_hash.get(row_id) {
if old_hash == hash {
let old_values_match = if let Some(entries) = hash_to_values.get(&old_hash) {
entries.iter().any(|(stored_values, row_ids)| {
row_ids.contains(&row_id) && Self::values_match(stored_values, values)
})
} else {
false
};
if old_values_match {
continue; }
}
if let Some(rows) = hash_to_rows.get_mut(&old_hash) {
rows.retain(|id| *id != row_id);
if rows.is_empty() {
hash_to_rows.remove(&old_hash);
}
}
if let Some(val_entries) = hash_to_values.get_mut(&old_hash) {
for (_, row_ids) in val_entries.iter_mut() {
row_ids.retain(|id| *id != row_id);
}
val_entries.retain(|(_, row_ids)| !row_ids.is_empty());
if val_entries.is_empty() {
hash_to_values.remove(&old_hash);
}
}
}
let rows = hash_to_rows.entry(hash).or_default();
if let Err(pos) = rows.binary_search(&row_id) {
rows.insert(pos, row_id);
}
row_to_hash.insert(row_id, hash);
let val_entries = hash_to_values.entry(hash).or_default();
let mut found = false;
for (stored_values, row_ids) in val_entries.iter_mut() {
if Self::values_match(stored_values, values) {
if let Err(pos) = row_ids.binary_search(&row_id) {
row_ids.insert(pos, row_id);
}
found = true;
break;
}
}
if !found {
let arc_values: Vec<CompactArc<Value>> =
values.iter().map(|v| CompactArc::new(v.clone())).collect();
let mut row_ids = CompactVec::new();
row_ids.push(row_id);
val_entries.push((arc_values, row_ids));
}
}
Ok(())
}
fn remove_batch_slice(&self, entries: &[(i64, &[Value])]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
let mut hash_to_rows = self.hash_to_rows.write();
let mut row_to_hash = self.row_to_hash.write();
let mut hash_to_values = self.hash_to_values.write();
for &(row_id, values) in entries {
let hash = hash_values(values);
if let Some(rows) = hash_to_rows.get_mut(&hash) {
if let Ok(pos) = rows.binary_search(&row_id) {
rows.remove(pos);
}
if rows.is_empty() {
hash_to_rows.remove(&hash);
}
}
row_to_hash.remove(row_id);
if let Some(val_entries) = hash_to_values.get_mut(&hash) {
for (stored_values, row_ids) in val_entries.iter_mut() {
if Self::values_match(stored_values, values) {
if let Ok(pos) = row_ids.binary_search(&row_id) {
row_ids.remove(pos);
}
break;
}
}
val_entries.retain(|(_, row_ids)| !row_ids.is_empty());
if val_entries.is_empty() {
hash_to_values.remove(&hash);
}
}
}
Ok(())
}
fn column_ids(&self) -> &[i32] {
&self.column_ids
}
fn column_names(&self) -> &[String] {
&self.column_names
}
fn data_types(&self) -> &[DataType] {
&self.data_types
}
fn index_type(&self) -> IndexType {
IndexType::Hash
}
fn is_unique(&self) -> bool {
self.is_unique
}
fn find(&self, values: &[Value]) -> Result<Vec<IndexEntry>> {
if self.closed.load(AtomicOrdering::Acquire) {
return Err(Error::IndexClosed);
}
if values.len() != self.column_ids.len() {
return Err(Error::internal(
"hash index requires exact match on all columns",
));
}
let hash = hash_values(values);
let hash_to_values = self.hash_to_values.read();
if let Some(entries) = hash_to_values.get(&hash) {
for (stored_values, row_ids) in entries {
if Self::values_match(stored_values, values) {
return Ok(row_ids
.iter()
.map(|&row_id| IndexEntry { row_id, ref_id: 0 })
.collect());
}
}
}
Ok(vec![])
}
fn find_range(
&self,
_min: &[Value],
_max: &[Value],
_min_inclusive: bool,
_max_inclusive: bool,
) -> Result<Vec<IndexEntry>> {
Err(Error::internal(
"hash index does not support range queries; use btree index instead",
))
}
fn find_with_operator(&self, op: Operator, values: &[Value]) -> Result<Vec<IndexEntry>> {
match op {
Operator::Eq => self.find(values),
_ => Err(Error::internal(format!(
"hash index only supports equality operator, not {:?}",
op
))),
}
}
fn get_row_ids_equal_into(&self, values: &[Value], buffer: &mut Vec<i64>) {
if self.closed.load(AtomicOrdering::Acquire) {
return;
}
if values.len() != self.column_ids.len() {
return;
}
let hash = hash_values(values);
let hash_to_values = self.hash_to_values.read();
if let Some(entries) = hash_to_values.get(&hash) {
for (stored_values, row_ids) in entries {
if Self::values_match(stored_values, values) {
buffer.extend_from_slice(row_ids.as_slice());
return;
}
}
}
}
fn get_row_ids_in_into(&self, value_list: &[Value], buffer: &mut Vec<i64>) {
if self.closed.load(AtomicOrdering::Acquire) {
return;
}
let hash_to_values = self.hash_to_values.read();
for value in value_list {
let hash = hash_values(std::slice::from_ref(value));
if let Some(entries) = hash_to_values.get(&hash) {
for (stored_values, row_ids) in entries {
if stored_values.len() == 1 && stored_values[0].as_ref() == value {
buffer.extend_from_slice(row_ids.as_slice());
break;
}
}
}
}
}
fn get_row_ids_in_range_into(
&self,
_min_value: &[Value],
_max_value: &[Value],
_include_min: bool,
_include_max: bool,
_buffer: &mut Vec<i64>,
) {
}
fn get_filtered_row_ids(&self, expr: &dyn Expression) -> RowIdVec {
if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
if let Some(col_name) = in_list.get_column_name() {
if self.column_names.len() == 1 && self.column_names[0] == col_name {
let values = in_list.get_values();
return self.get_row_ids_in(values);
}
}
}
if let Some(comparison) = expr.as_any().downcast_ref::<ComparisonExpr>() {
if comparison.operator() == Operator::Eq {
if let Some(col_name) = comparison.get_column_name() {
if self.column_names.len() == 1 && self.column_names[0] == col_name {
return self.get_row_ids_equal(&[comparison.value().to_value()]);
}
}
}
}
let hash_to_values = self.hash_to_values.read();
let mut results = RowIdVec::new();
for entries in hash_to_values.values() {
for (_values, row_ids) in entries {
results.extend_from_slice(row_ids.as_slice());
}
}
results
}
fn get_all_values(&self) -> Vec<Value> {
if self.closed.load(AtomicOrdering::Acquire) {
return Vec::new();
}
let hash_to_values = self.hash_to_values.read();
let mut result = Vec::with_capacity(hash_to_values.len());
for entries in hash_to_values.values() {
for (values, _row_ids) in entries {
if values.len() == 1 {
result.push((*values[0]).clone());
}
}
}
result
}
fn get_distinct_count_excluding_null(&self) -> Option<usize> {
if self.closed.load(AtomicOrdering::Acquire) {
return None;
}
let hash_to_values = self.hash_to_values.read();
let mut count = 0;
for entries in hash_to_values.values() {
for (values, _row_ids) in entries {
if values.len() == 1 {
if !values[0].is_null() {
count += 1;
}
} else {
if !values.iter().all(|v| v.is_null()) {
count += 1;
}
}
}
}
Some(count)
}
fn clear(&self) -> Result<()> {
self.hash_to_rows.write().clear();
self.row_to_hash.write().clear();
self.hash_to_values.write().clear();
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn close(&mut self) -> Result<()> {
self.closed.store(true, AtomicOrdering::Release);
self.clear()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_index_basic() {
let index = HashIndex::new(
"idx_email".to_string(),
"users".to_string(),
vec!["email".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index
.add(&[Value::Text("alice@example.com".into())], 1, 0)
.unwrap();
index
.add(&[Value::Text("bob@example.com".into())], 2, 0)
.unwrap();
index
.add(&[Value::Text("charlie@example.com".into())], 3, 0)
.unwrap();
let results = index
.find(&[Value::Text("alice@example.com".into())])
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, 1);
let results = index
.find(&[Value::Text("bob@example.com".into())])
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, 2);
let results = index
.find(&[Value::Text("nobody@example.com".into())])
.unwrap();
assert!(results.is_empty());
}
#[test]
fn test_hash_index_unique_constraint() {
let index = HashIndex::new(
"idx_email_unique".to_string(),
"users".to_string(),
vec!["email".to_string()],
vec![1],
vec![DataType::Text],
true, 0,
);
index
.add(&[Value::Text("alice@example.com".into())], 1, 0)
.unwrap();
let result = index.add(&[Value::Text("alice@example.com".into())], 2, 0);
assert!(result.is_err());
index
.add(&[Value::Text("alice@example.com".into())], 1, 0)
.unwrap();
}
#[test]
fn test_hash_index_null_values() {
let index = HashIndex::new(
"idx_email_unique".to_string(),
"users".to_string(),
vec!["email".to_string()],
vec![1],
vec![DataType::Text],
true, 0,
);
index.add(&[Value::Null(DataType::Text)], 1, 0).unwrap();
index.add(&[Value::Null(DataType::Text)], 2, 0).unwrap();
let results = index.find(&[Value::Null(DataType::Text)]).unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_hash_index_remove() {
let index = HashIndex::new(
"idx_email".to_string(),
"users".to_string(),
vec!["email".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index
.add(&[Value::Text("alice@example.com".into())], 1, 0)
.unwrap();
index
.add(&[Value::Text("bob@example.com".into())], 2, 0)
.unwrap();
index
.remove(&[Value::Text("alice@example.com".into())], 1, 0)
.unwrap();
let results = index
.find(&[Value::Text("alice@example.com".into())])
.unwrap();
assert!(results.is_empty());
let results = index
.find(&[Value::Text("bob@example.com".into())])
.unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_hash_index_update() {
let index = HashIndex::new(
"idx_email".to_string(),
"users".to_string(),
vec!["email".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
index
.add(&[Value::Text("old@example.com".into())], 1, 0)
.unwrap();
index
.add(&[Value::Text("new@example.com".into())], 1, 0)
.unwrap();
let results = index
.find(&[Value::Text("old@example.com".into())])
.unwrap();
assert!(results.is_empty());
let results = index
.find(&[Value::Text("new@example.com".into())])
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, 1);
}
#[test]
fn test_hash_index_range_not_supported() {
let index = HashIndex::new(
"idx_email".to_string(),
"users".to_string(),
vec!["email".to_string()],
vec![1],
vec![DataType::Text],
false,
0,
);
let result = index.find_range(
&[Value::Text("a".into())],
&[Value::Text("z".into())],
true,
true,
);
assert!(result.is_err());
}
#[test]
fn test_hash_index_multi_column() {
let index = HashIndex::new(
"idx_name_email".to_string(),
"users".to_string(),
vec!["name".to_string(), "email".to_string()],
vec![1, 2],
vec![DataType::Text, DataType::Text],
false,
0,
);
index
.add(
&[
Value::Text("Alice".into()),
Value::Text("alice@example.com".into()),
],
1,
0,
)
.unwrap();
index
.add(
&[
Value::Text("Bob".into()),
Value::Text("bob@example.com".into()),
],
2,
0,
)
.unwrap();
let results = index
.find(&[
Value::Text("Alice".into()),
Value::Text("alice@example.com".into()),
])
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].row_id, 1);
let result = index.find(&[Value::Text("Alice".into())]);
assert!(result.is_err());
}
#[test]
fn test_hash_index_duplicate_values() {
let index = HashIndex::new(
"idx_status".to_string(),
"orders".to_string(),
vec!["status".to_string()],
vec![1],
vec![DataType::Text],
false, 0,
);
index.add(&[Value::Text("pending".into())], 1, 0).unwrap();
index.add(&[Value::Text("pending".into())], 2, 0).unwrap();
index.add(&[Value::Text("pending".into())], 3, 0).unwrap();
index.add(&[Value::Text("shipped".into())], 4, 0).unwrap();
let results = index.find(&[Value::Text("pending".into())]).unwrap();
assert_eq!(results.len(), 3);
let results = index.find(&[Value::Text("shipped".into())]).unwrap();
assert_eq!(results.len(), 1);
}
}