use std::collections::{BTreeMap, BTreeSet};
#[derive(Debug)]
pub struct KvFieldIndex {
field: String,
field_position: usize,
tree: BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
}
impl KvFieldIndex {
pub fn new(field: impl Into<String>, field_position: usize) -> Self {
Self {
field: field.into(),
field_position,
tree: BTreeMap::new(),
}
}
pub fn field(&self) -> &str {
&self.field
}
pub fn field_position(&self) -> usize {
self.field_position
}
pub fn insert(&mut self, field_value: Vec<u8>, primary_key: Vec<u8>) {
self.tree
.entry(field_value)
.or_default()
.insert(primary_key);
}
pub fn remove(&mut self, field_value: &[u8], primary_key: &[u8]) -> bool {
if let Some(keys) = self.tree.get_mut(field_value) {
let removed = keys.remove(primary_key);
if keys.is_empty() {
self.tree.remove(field_value);
}
removed
} else {
false
}
}
pub fn lookup_eq(&self, field_value: &[u8]) -> Vec<&[u8]> {
self.tree
.get(field_value)
.map(|keys| keys.iter().map(|k| k.as_slice()).collect())
.unwrap_or_default()
}
pub fn lookup_range(&self, lower: Option<&[u8]>, upper: Option<&[u8]>) -> Vec<(&[u8], &[u8])> {
use std::ops::Bound;
let lo = match lower {
Some(l) => Bound::Included(l.to_vec()),
None => Bound::Unbounded,
};
let hi = match upper {
Some(u) => Bound::Excluded(u.to_vec()),
None => Bound::Unbounded,
};
let mut results = Vec::new();
for (value, keys) in self.tree.range((lo, hi)) {
for key in keys {
results.push((value.as_slice(), key.as_slice()));
}
}
results
}
pub fn entry_count(&self) -> usize {
self.tree.values().map(|s| s.len()).sum()
}
pub fn distinct_values(&self) -> usize {
self.tree.len()
}
pub fn clear(&mut self) {
self.tree.clear();
}
}
#[derive(Debug)]
pub struct KvCompositeIndex {
fields: Vec<String>,
field_positions: Vec<usize>,
tree: BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
}
impl KvCompositeIndex {
pub fn new(fields: Vec<String>, field_positions: Vec<usize>) -> Self {
Self {
fields,
field_positions,
tree: BTreeMap::new(),
}
}
pub fn fields(&self) -> &[String] {
&self.fields
}
pub fn field_positions(&self) -> &[usize] {
&self.field_positions
}
fn build_key(values: &[&[u8]]) -> Vec<u8> {
let mut key = Vec::new();
for (i, v) in values.iter().enumerate() {
if i > 0 {
key.push(0); }
key.extend_from_slice(v);
}
key
}
pub fn insert(&mut self, field_values: &[&[u8]], primary_key: Vec<u8>) {
let key = Self::build_key(field_values);
self.tree.entry(key).or_default().insert(primary_key);
}
pub fn remove(&mut self, field_values: &[&[u8]], primary_key: &[u8]) -> bool {
let key = Self::build_key(field_values);
if let Some(keys) = self.tree.get_mut(&key) {
let removed = keys.remove(primary_key);
if keys.is_empty() {
self.tree.remove(&key);
}
removed
} else {
false
}
}
pub fn lookup_eq(&self, field_values: &[&[u8]]) -> Vec<&[u8]> {
let key = Self::build_key(field_values);
self.tree
.get(&key)
.map(|keys| keys.iter().map(|k| k.as_slice()).collect())
.unwrap_or_default()
}
pub fn lookup_prefix(&self, prefix_values: &[&[u8]]) -> Vec<&[u8]> {
let prefix = Self::build_key(prefix_values);
let mut results = Vec::new();
for (composite_key, primary_keys) in self.tree.range(prefix.clone()..) {
if !composite_key.starts_with(&prefix) {
break;
}
for pk in primary_keys {
results.push(pk.as_slice());
}
}
results
}
pub fn entry_count(&self) -> usize {
self.tree.values().map(|s| s.len()).sum()
}
pub fn clear(&mut self) {
self.tree.clear();
}
}
#[derive(Debug)]
pub struct KvIndexSet {
indexes: Vec<KvFieldIndex>,
composite_indexes: Vec<KvCompositeIndex>,
total_puts: u64,
total_index_writes: u64,
}
impl KvIndexSet {
pub fn new() -> Self {
Self {
indexes: Vec::new(),
composite_indexes: Vec::new(),
total_puts: 0,
total_index_writes: 0,
}
}
pub fn is_empty(&self) -> bool {
self.indexes.is_empty() && self.composite_indexes.is_empty()
}
pub fn index_count(&self) -> usize {
self.indexes.len() + self.composite_indexes.len()
}
pub fn add_index(&mut self, field: &str, field_position: usize) -> bool {
if self.indexes.iter().any(|i| i.field == field) {
return false;
}
self.indexes.push(KvFieldIndex::new(field, field_position));
true
}
pub fn remove_index(&mut self, field: &str) -> Option<KvFieldIndex> {
if let Some(pos) = self.indexes.iter().position(|i| i.field == field) {
Some(self.indexes.remove(pos))
} else {
None
}
}
pub fn get_index(&self, field: &str) -> Option<&KvFieldIndex> {
self.indexes.iter().find(|i| i.field == field)
}
pub fn add_composite_index(
&mut self,
fields: Vec<String>,
field_positions: Vec<usize>,
) -> bool {
if self.composite_indexes.iter().any(|ci| ci.fields == fields) {
return false;
}
self.composite_indexes
.push(KvCompositeIndex::new(fields, field_positions));
true
}
pub fn remove_composite_index(&mut self, fields: &[String]) -> Option<KvCompositeIndex> {
if let Some(pos) = self
.composite_indexes
.iter()
.position(|ci| ci.fields == fields)
{
Some(self.composite_indexes.remove(pos))
} else {
None
}
}
pub fn get_composite_index(&self, fields: &[String]) -> Option<&KvCompositeIndex> {
self.composite_indexes.iter().find(|ci| ci.fields == fields)
}
pub fn find_composite_with_prefix(&self, field: &str) -> Option<&KvCompositeIndex> {
self.composite_indexes
.iter()
.find(|ci| ci.fields.first().is_some_and(|f| f == field))
}
pub fn on_put(
&mut self,
primary_key: &[u8],
field_values: &[(&str, &[u8])],
old_field_values: Option<&[(&str, &[u8])]>,
) -> usize {
self.total_puts += 1;
if self.is_empty() {
return 0;
}
let mut writes = 0;
if let Some(old_values) = old_field_values {
for idx in &mut self.indexes {
for &(field, value) in old_values {
if field == idx.field {
idx.remove(value, primary_key);
writes += 1;
}
}
}
}
for idx in &mut self.indexes {
for &(field, value) in field_values {
if field == idx.field {
idx.insert(value.to_vec(), primary_key.to_vec());
writes += 1;
}
}
}
for ci in &mut self.composite_indexes {
if let Some(old_values) = old_field_values {
let old_vals: Vec<&[u8]> = ci
.fields
.iter()
.filter_map(|f| {
old_values
.iter()
.find(|(name, _)| *name == f.as_str())
.map(|(_, v)| *v)
})
.collect();
if old_vals.len() == ci.fields.len() {
ci.remove(&old_vals, primary_key);
writes += 1;
}
}
let new_vals: Vec<&[u8]> = ci
.fields
.iter()
.filter_map(|f| {
field_values
.iter()
.find(|(name, _)| *name == f.as_str())
.map(|(_, v)| *v)
})
.collect();
if new_vals.len() == ci.fields.len() {
ci.insert(&new_vals, primary_key.to_vec());
writes += 1;
}
}
self.total_index_writes += writes as u64;
writes
}
pub fn on_delete(&mut self, primary_key: &[u8], field_values: &[(&str, &[u8])]) {
for idx in &mut self.indexes {
for &(field, value) in field_values {
if field == idx.field {
idx.remove(value, primary_key);
self.total_index_writes += 1;
}
}
}
for ci in &mut self.composite_indexes {
let vals: Vec<&[u8]> = ci
.fields
.iter()
.filter_map(|f| {
field_values
.iter()
.find(|(name, _)| *name == f.as_str())
.map(|(_, v)| *v)
})
.collect();
if vals.len() == ci.fields.len() {
ci.remove(&vals, primary_key);
self.total_index_writes += 1;
}
}
}
pub fn write_amp_ratio(&self) -> f64 {
if self.total_puts == 0 {
return 0.0;
}
self.total_index_writes as f64 / self.total_puts as f64
}
pub fn lookup_eq(&self, field: &str, value: &[u8]) -> Vec<&[u8]> {
self.indexes
.iter()
.find(|i| i.field == field)
.map(|i| i.lookup_eq(value))
.unwrap_or_default()
}
pub fn lookup_range(
&self,
field: &str,
lower: Option<&[u8]>,
upper: Option<&[u8]>,
) -> Vec<(&[u8], &[u8])> {
self.indexes
.iter()
.find(|i| i.field == field)
.map(|i| i.lookup_range(lower, upper))
.unwrap_or_default()
}
pub fn indexed_fields(&self) -> impl Iterator<Item = &str> {
self.indexes.iter().map(|i| i.field.as_str())
}
}
impl Default for KvIndexSet {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn field_index_insert_and_lookup() {
let mut idx = KvFieldIndex::new("region", 2);
idx.insert(b"us-east".to_vec(), b"key1".to_vec());
idx.insert(b"us-east".to_vec(), b"key2".to_vec());
idx.insert(b"eu-west".to_vec(), b"key3".to_vec());
let results = idx.lookup_eq(b"us-east");
assert_eq!(results.len(), 2);
assert!(results.contains(&b"key1".as_slice()));
assert!(results.contains(&b"key2".as_slice()));
let results = idx.lookup_eq(b"eu-west");
assert_eq!(results.len(), 1);
let results = idx.lookup_eq(b"ap-south");
assert!(results.is_empty());
}
#[test]
fn field_index_remove() {
let mut idx = KvFieldIndex::new("status", 1);
idx.insert(b"active".to_vec(), b"k1".to_vec());
idx.insert(b"active".to_vec(), b"k2".to_vec());
assert!(idx.remove(b"active", b"k1"));
assert_eq!(idx.lookup_eq(b"active").len(), 1);
assert!(idx.remove(b"active", b"k2"));
assert!(idx.lookup_eq(b"active").is_empty());
assert_eq!(idx.distinct_values(), 0);
assert!(!idx.remove(b"active", b"k3"));
}
#[test]
fn field_index_range_lookup() {
let mut idx = KvFieldIndex::new("score", 0);
for i in 0u32..10 {
idx.insert(i.to_be_bytes().to_vec(), format!("k{i}").into_bytes());
}
let results = idx.lookup_range(Some(&3u32.to_be_bytes()), Some(&7u32.to_be_bytes()));
assert_eq!(results.len(), 4); }
#[test]
fn index_set_zero_index_fast_path() {
let set = KvIndexSet::new();
assert!(set.is_empty());
assert_eq!(set.index_count(), 0);
}
#[test]
fn index_set_add_and_remove() {
let mut set = KvIndexSet::new();
assert!(set.add_index("region", 2));
assert!(!set.add_index("region", 2)); assert_eq!(set.index_count(), 1);
assert!(!set.is_empty());
assert!(set.remove_index("region").is_some());
assert!(set.is_empty());
assert!(set.remove_index("region").is_none());
}
#[test]
fn index_set_on_put_maintains_indexes() {
let mut set = KvIndexSet::new();
set.add_index("region", 2);
set.add_index("status", 3);
let field_values: Vec<(&str, &[u8])> = vec![("region", b"us-east"), ("status", b"active")];
let writes = set.on_put(b"key1", &field_values, None);
assert_eq!(writes, 2);
assert_eq!(set.lookup_eq("region", b"us-east").len(), 1);
assert_eq!(set.lookup_eq("status", b"active").len(), 1);
}
#[test]
fn index_set_on_put_update_replaces_old() {
let mut set = KvIndexSet::new();
set.add_index("status", 0);
set.on_put(b"k1", &[("status", b"active")], None);
assert_eq!(set.lookup_eq("status", b"active").len(), 1);
set.on_put(
b"k1",
&[("status", b"inactive")],
Some(&[("status", b"active")]),
);
assert!(set.lookup_eq("status", b"active").is_empty());
assert_eq!(set.lookup_eq("status", b"inactive").len(), 1);
}
#[test]
fn index_set_on_delete_cleans_up() {
let mut set = KvIndexSet::new();
set.add_index("region", 0);
set.on_put(b"k1", &[("region", b"us")], None);
set.on_put(b"k2", &[("region", b"us")], None);
assert_eq!(set.lookup_eq("region", b"us").len(), 2);
set.on_delete(b"k1", &[("region", b"us")]);
assert_eq!(set.lookup_eq("region", b"us").len(), 1);
}
#[test]
fn write_amp_ratio() {
let mut set = KvIndexSet::new();
set.add_index("a", 0);
set.add_index("b", 1);
for i in 0..10 {
let k = format!("k{i}");
set.on_put(k.as_bytes(), &[("a", b"x"), ("b", b"y")], None);
}
assert!((set.write_amp_ratio() - 2.0).abs() < f64::EPSILON);
}
#[test]
fn unindexed_field_ignored() {
let mut set = KvIndexSet::new();
set.add_index("region", 0);
let writes = set.on_put(b"k1", &[("name", b"alice")], None);
assert_eq!(writes, 0);
}
#[test]
fn composite_index_insert_and_exact_lookup() {
let mut ci = KvCompositeIndex::new(vec!["region".into(), "status".into()], vec![0, 1]);
ci.insert(&[b"us-east", b"active"], b"k1".to_vec());
ci.insert(&[b"us-east", b"inactive"], b"k2".to_vec());
ci.insert(&[b"eu-west", b"active"], b"k3".to_vec());
let results = ci.lookup_eq(&[b"us-east", b"active"]);
assert_eq!(results.len(), 1);
assert_eq!(results[0], b"k1");
let results = ci.lookup_eq(&[b"eu-west", b"active"]);
assert_eq!(results.len(), 1);
assert_eq!(results[0], b"k3");
assert!(ci.lookup_eq(&[b"ap-south", b"active"]).is_empty());
}
#[test]
fn composite_index_prefix_lookup() {
let mut ci = KvCompositeIndex::new(vec!["region".into(), "status".into()], vec![0, 1]);
ci.insert(&[b"us-east", b"active"], b"k1".to_vec());
ci.insert(&[b"us-east", b"inactive"], b"k2".to_vec());
ci.insert(&[b"eu-west", b"active"], b"k3".to_vec());
let results = ci.lookup_prefix(&[b"us-east"]);
assert_eq!(results.len(), 2);
}
#[test]
fn composite_index_remove() {
let mut ci = KvCompositeIndex::new(vec!["a".into(), "b".into()], vec![0, 1]);
ci.insert(&[b"x", b"y"], b"k1".to_vec());
assert_eq!(ci.entry_count(), 1);
assert!(ci.remove(&[b"x", b"y"], b"k1"));
assert_eq!(ci.entry_count(), 0);
}
#[test]
fn index_set_composite_on_put() {
let mut set = KvIndexSet::new();
set.add_composite_index(vec!["region".into(), "status".into()], vec![0, 1]);
let writes = set.on_put(b"k1", &[("region", b"us"), ("status", b"active")], None);
assert!(writes > 0);
let ci = set
.get_composite_index(&["region".into(), "status".into()])
.unwrap();
let results = ci.lookup_eq(&[b"us", b"active"]);
assert_eq!(results.len(), 1);
}
#[test]
fn index_set_composite_on_delete() {
let mut set = KvIndexSet::new();
set.add_composite_index(vec!["a".into(), "b".into()], vec![0, 1]);
set.on_put(b"k1", &[("a", b"x"), ("b", b"y")], None);
set.on_delete(b"k1", &[("a", b"x"), ("b", b"y")]);
let ci = set.get_composite_index(&["a".into(), "b".into()]).unwrap();
assert!(ci.lookup_eq(&[b"x", b"y"]).is_empty());
}
}