use std::sync::Arc;
use dashmap::DashMap;
use parking_lot::RwLock;
use crate::key_buffer::ArenaKeyHandle;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexPolicy {
WriteOptimized,
Balanced,
ScanOptimized,
AppendOnly,
}
impl Default for IndexPolicy {
fn default() -> Self {
IndexPolicy::Balanced
}
}
impl IndexPolicy {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"write_optimized" | "write-optimized" | "write" => Some(IndexPolicy::WriteOptimized),
"balanced" | "default" => Some(IndexPolicy::Balanced),
"scan_optimized" | "scan-optimized" | "scan" => Some(IndexPolicy::ScanOptimized),
"append_only" | "append-only" | "append" => Some(IndexPolicy::AppendOnly),
_ => None,
}
}
pub fn write_cost(&self) -> &'static str {
match self {
IndexPolicy::WriteOptimized => "O(1)",
IndexPolicy::Balanced => "O(1) amortized",
IndexPolicy::ScanOptimized => "O(log N)",
IndexPolicy::AppendOnly => "O(1)",
}
}
pub fn scan_cost(&self) -> &'static str {
match self {
IndexPolicy::WriteOptimized => "O(N)",
IndexPolicy::Balanced => "O(output + log K)",
IndexPolicy::ScanOptimized => "O(log N + K)",
IndexPolicy::AppendOnly => "O(N)",
}
}
pub fn has_ordered_index(&self) -> bool {
matches!(self, IndexPolicy::ScanOptimized)
}
pub fn supports_efficient_scans(&self) -> bool {
matches!(self, IndexPolicy::ScanOptimized | IndexPolicy::Balanced)
}
}
#[derive(Debug, Clone)]
pub struct TableIndexConfig {
pub table_name: String,
pub policy: IndexPolicy,
pub max_sorted_runs: usize,
pub target_run_size: usize,
pub enable_bloom_filter: bool,
}
impl TableIndexConfig {
pub fn new(table_name: impl Into<String>, policy: IndexPolicy) -> Self {
Self {
table_name: table_name.into(),
policy,
max_sorted_runs: 4,
target_run_size: 16 * 1024 * 1024, enable_bloom_filter: true,
}
}
pub fn with_max_sorted_runs(mut self, max: usize) -> Self {
self.max_sorted_runs = max;
self
}
pub fn with_target_run_size(mut self, size: usize) -> Self {
self.target_run_size = size;
self
}
pub fn with_bloom_filter(mut self, enable: bool) -> Self {
self.enable_bloom_filter = enable;
self
}
}
pub struct TableIndexRegistry {
configs: DashMap<String, TableIndexConfig>,
default_policy: RwLock<IndexPolicy>,
}
impl TableIndexRegistry {
pub fn new() -> Self {
Self {
configs: DashMap::new(),
default_policy: RwLock::new(IndexPolicy::Balanced),
}
}
pub fn with_default_policy(policy: IndexPolicy) -> Self {
Self {
configs: DashMap::new(),
default_policy: RwLock::new(policy),
}
}
pub fn set_default_policy(&self, policy: IndexPolicy) {
*self.default_policy.write() = policy;
}
pub fn default_policy(&self) -> IndexPolicy {
*self.default_policy.read()
}
pub fn configure_table(&self, config: TableIndexConfig) {
self.configs.insert(config.table_name.clone(), config);
}
pub fn get_policy(&self, table_name: &str) -> IndexPolicy {
self.configs
.get(table_name)
.map(|c| c.policy)
.unwrap_or_else(|| *self.default_policy.read())
}
pub fn get_config(&self, table_name: &str) -> TableIndexConfig {
self.configs
.get(table_name)
.map(|c| c.clone())
.unwrap_or_else(|| {
TableIndexConfig::new(table_name, *self.default_policy.read())
})
}
pub fn has_explicit_config(&self, table_name: &str) -> bool {
self.configs.contains_key(table_name)
}
pub fn remove_config(&self, table_name: &str) -> Option<TableIndexConfig> {
self.configs.remove(table_name).map(|(_, c)| c)
}
pub fn configured_tables(&self) -> Vec<String> {
self.configs.iter().map(|e| e.key().clone()).collect()
}
}
impl Default for TableIndexRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct SortedRun<K, V> {
entries: Vec<(K, V)>,
min_key: Option<K>,
max_key: Option<K>,
size_bytes: usize,
#[allow(dead_code)]
created_at: std::time::Instant,
level: usize,
}
impl<K: Ord + Clone, V: Clone> SortedRun<K, V> {
pub fn from_unsorted(mut entries: Vec<(K, V)>, level: usize) -> Self {
entries.sort_by(|a, b| a.0.cmp(&b.0));
let size_bytes = std::mem::size_of_val(&entries);
let min_key = entries.first().map(|(k, _)| k.clone());
let max_key = entries.last().map(|(k, _)| k.clone());
Self {
entries,
min_key,
max_key,
size_bytes,
created_at: std::time::Instant::now(),
level,
}
}
pub fn from_sorted(entries: Vec<(K, V)>, level: usize) -> Self {
let size_bytes = std::mem::size_of_val(&entries);
let min_key = entries.first().map(|(k, _)| k.clone());
let max_key = entries.last().map(|(k, _)| k.clone());
Self {
entries,
min_key,
max_key,
size_bytes,
created_at: std::time::Instant::now(),
level,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn size_bytes(&self) -> usize {
self.size_bytes
}
pub fn level(&self) -> usize {
self.level
}
pub fn get(&self, key: &K) -> Option<&V> {
self.entries
.binary_search_by(|(k, _)| k.cmp(key))
.ok()
.map(|idx| &self.entries[idx].1)
}
pub fn range_from<'a>(&'a self, start: &K) -> impl Iterator<Item = &'a (K, V)> {
let idx = self.entries
.binary_search_by(|(k, _)| k.cmp(start))
.unwrap_or_else(|i| i);
self.entries[idx..].iter()
}
pub fn range<'a>(&'a self, start: &K, end: &K) -> impl Iterator<Item = &'a (K, V)> {
let start_idx = self.entries
.binary_search_by(|(k, _)| k.cmp(start))
.unwrap_or_else(|i| i);
let end_idx = self.entries
.binary_search_by(|(k, _)| k.cmp(end))
.unwrap_or_else(|i| i);
self.entries[start_idx..end_idx].iter()
}
pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
self.entries.iter()
}
#[inline]
pub fn entries(&self) -> &[(K, V)] {
&self.entries
}
#[inline]
pub fn overlaps_prefix(&self, prefix: &K) -> bool {
match &self.max_key {
Some(max) if max < prefix => false, _ => true, }
}
#[inline]
pub fn overlaps_range(&self, start: &K, end: &K) -> bool {
match (&self.min_key, &self.max_key) {
(Some(min), _) if min >= end => false, (_, Some(max)) if max < start => false, _ => true, }
}
#[inline]
pub fn min_key(&self) -> Option<&K> {
self.min_key.as_ref()
}
#[inline]
pub fn max_key(&self) -> Option<&K> {
self.max_key.as_ref()
}
}
pub struct BalancedTableIndex<V: Clone + Send + Sync + Eq + 'static> {
memtable: DashMap<ArenaKeyHandle, V>,
sorted_runs: RwLock<Vec<Arc<SortedRun<ArenaKeyHandle, V>>>>,
config: TableIndexConfig,
memtable_size: std::sync::atomic::AtomicUsize,
}
impl<V: Clone + Send + Sync + Eq + 'static> BalancedTableIndex<V> {
pub fn new(config: TableIndexConfig) -> Self {
Self {
memtable: DashMap::new(),
sorted_runs: RwLock::new(Vec::new()),
config,
memtable_size: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn insert(&self, key: ArenaKeyHandle, value: V) {
let key_size = key.len();
let value_size = std::mem::size_of::<V>();
self.memtable.insert(key, value);
self.memtable_size.fetch_add(key_size + value_size, std::sync::atomic::Ordering::Relaxed);
}
pub fn get(&self, key: &ArenaKeyHandle) -> Option<V> {
if let Some(v) = self.memtable.get(key) {
return Some(v.clone());
}
let runs = self.sorted_runs.read();
for run in runs.iter().rev() {
if run.overlaps_prefix(key) {
if let Some(v) = run.get(key) {
return Some(v.clone());
}
}
}
None
}
pub fn scan_prefix(&self, prefix: &ArenaKeyHandle) -> Vec<(ArenaKeyHandle, V)> {
use std::collections::BinaryHeap;
use std::cmp::Reverse;
#[derive(Eq, PartialEq)]
struct PrefixHeapEntry<V: Clone> {
key: ArenaKeyHandle,
value: V,
source_idx: usize, }
impl<V: Clone + Eq + PartialEq> Ord for PrefixHeapEntry<V> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.key.cmp(&other.key) {
std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
other => other,
}
}
}
impl<V: Clone + Eq + PartialEq> PartialOrd for PrefixHeapEntry<V> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
let mut heap: BinaryHeap<Reverse<PrefixHeapEntry<V>>> = BinaryHeap::new();
for entry in self.memtable.iter() {
let key = entry.key();
if key.as_bytes().starts_with(prefix.as_bytes()) {
heap.push(Reverse(PrefixHeapEntry {
key: key.clone(),
value: entry.value().clone(),
source_idx: 0,
}));
}
}
let runs = self.sorted_runs.read();
for (run_idx, run) in runs.iter().enumerate() {
if !run.overlaps_prefix(prefix) {
continue; }
for (key, value) in run.range_from(prefix) {
if !key.as_bytes().starts_with(prefix.as_bytes()) {
break; }
heap.push(Reverse(PrefixHeapEntry {
key: key.clone(),
value: value.clone(),
source_idx: run_idx + 1, }));
}
}
let mut result = Vec::with_capacity(heap.len());
let mut last_key: Option<ArenaKeyHandle> = None;
while let Some(Reverse(entry)) = heap.pop() {
let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
if is_new_key {
last_key = Some(entry.key.clone());
result.push((entry.key, entry.value));
}
}
result
}
pub fn needs_compaction(&self) -> bool {
let memtable_size = self.memtable_size.load(std::sync::atomic::Ordering::Relaxed);
let runs = self.sorted_runs.read();
memtable_size >= self.config.target_run_size
|| runs.len() >= self.config.max_sorted_runs
}
pub fn compact_memtable(&self) {
let entries: Vec<_> = self.memtable.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
if entries.is_empty() {
return;
}
self.memtable.clear();
self.memtable_size.store(0, std::sync::atomic::Ordering::Relaxed);
let run = Arc::new(SortedRun::from_unsorted(entries, 0));
let mut runs = self.sorted_runs.write();
runs.push(run);
}
pub fn merge_runs(&self, levels_to_merge: usize) {
let mut runs = self.sorted_runs.write();
if runs.len() < levels_to_merge {
return;
}
let to_merge: Vec<_> = runs.drain(..levels_to_merge).collect();
let merged = self.k_way_merge(&to_merge);
let new_run = Arc::new(SortedRun::from_sorted(merged, to_merge.len()));
runs.insert(0, new_run);
}
fn k_way_merge(&self, runs: &[Arc<SortedRun<ArenaKeyHandle, V>>]) -> Vec<(ArenaKeyHandle, V)> {
use std::collections::BinaryHeap;
use std::cmp::Reverse;
#[derive(Eq, PartialEq)]
struct HeapEntry<V: Clone> {
key: ArenaKeyHandle,
value: V,
run_idx: usize,
entry_idx: usize,
}
impl<V: Clone + Eq + PartialEq> Ord for HeapEntry<V> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.key.cmp(&other.key) {
std::cmp::Ordering::Equal => self.run_idx.cmp(&other.run_idx),
other => other,
}
}
}
impl<V: Clone + Eq + PartialEq> PartialOrd for HeapEntry<V> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
let mut heap: BinaryHeap<Reverse<HeapEntry<V>>> = BinaryHeap::new();
let mut run_positions: Vec<usize> = vec![0; runs.len()];
for (run_idx, run) in runs.iter().enumerate() {
let entries = run.entries();
if !entries.is_empty() {
let (key, value) = &entries[0]; heap.push(Reverse(HeapEntry {
key: key.clone(),
value: value.clone(),
run_idx,
entry_idx: 0,
}));
}
}
let estimated_size: usize = runs.iter().map(|r| r.len()).sum();
let mut result = Vec::with_capacity(estimated_size);
let mut last_key: Option<ArenaKeyHandle> = None;
while let Some(Reverse(entry)) = heap.pop() {
let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
if is_new_key {
last_key = Some(entry.key.clone());
result.push((entry.key.clone(), entry.value));
}
run_positions[entry.run_idx] += 1;
let next_idx = run_positions[entry.run_idx];
let run_entries = runs[entry.run_idx].entries();
if next_idx < run_entries.len() {
let (key, value) = &run_entries[next_idx]; heap.push(Reverse(HeapEntry {
key: key.clone(),
value: value.clone(),
run_idx: entry.run_idx,
entry_idx: next_idx,
}));
}
}
result
}
pub fn config(&self) -> &TableIndexConfig {
&self.config
}
pub fn memtable_size(&self) -> usize {
self.memtable_size.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn run_count(&self) -> usize {
self.sorted_runs.read().len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_policy_from_str() {
assert_eq!(IndexPolicy::from_str("write_optimized"), Some(IndexPolicy::WriteOptimized));
assert_eq!(IndexPolicy::from_str("balanced"), Some(IndexPolicy::Balanced));
assert_eq!(IndexPolicy::from_str("scan-optimized"), Some(IndexPolicy::ScanOptimized));
assert_eq!(IndexPolicy::from_str("append_only"), Some(IndexPolicy::AppendOnly));
assert_eq!(IndexPolicy::from_str("invalid"), None);
}
#[test]
fn test_registry_default_policy() {
let registry = TableIndexRegistry::new();
assert_eq!(registry.get_policy("unknown"), IndexPolicy::Balanced);
registry.configure_table(TableIndexConfig::new("users", IndexPolicy::WriteOptimized));
assert_eq!(registry.get_policy("users"), IndexPolicy::WriteOptimized);
assert_eq!(registry.get_policy("orders"), IndexPolicy::Balanced);
}
#[test]
fn test_registry_change_default() {
let registry = TableIndexRegistry::new();
registry.set_default_policy(IndexPolicy::ScanOptimized);
assert_eq!(registry.get_policy("any_table"), IndexPolicy::ScanOptimized);
}
#[test]
fn test_sorted_run() {
let entries = vec![
(ArenaKeyHandle::new(b"c"), 3),
(ArenaKeyHandle::new(b"a"), 1),
(ArenaKeyHandle::new(b"b"), 2),
];
let run = SortedRun::from_unsorted(entries, 0);
assert_eq!(run.len(), 3);
assert_eq!(run.get(&ArenaKeyHandle::new(b"a")), Some(&1));
assert_eq!(run.get(&ArenaKeyHandle::new(b"b")), Some(&2));
assert_eq!(run.get(&ArenaKeyHandle::new(b"c")), Some(&3));
assert_eq!(run.get(&ArenaKeyHandle::new(b"d")), None);
}
#[test]
fn test_balanced_table_index() {
let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
index.insert(ArenaKeyHandle::new(b"key1"), 1);
index.insert(ArenaKeyHandle::new(b"key2"), 2);
assert_eq!(index.get(&ArenaKeyHandle::new(b"key1")), Some(1));
assert_eq!(index.get(&ArenaKeyHandle::new(b"key2")), Some(2));
assert_eq!(index.get(&ArenaKeyHandle::new(b"key3")), None);
}
#[test]
fn test_balanced_compaction() {
let config = TableIndexConfig::new("test", IndexPolicy::Balanced)
.with_target_run_size(100);
let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
for i in 0..10 {
let key = format!("key{:03}", i);
index.insert(ArenaKeyHandle::new(key.as_bytes()), i as i32);
}
index.compact_memtable();
assert_eq!(index.run_count(), 1);
assert_eq!(index.memtable_size(), 0);
assert_eq!(index.get(&ArenaKeyHandle::new(b"key005")), Some(5));
}
#[test]
fn test_k_way_merge_scaling() {
use std::time::Instant;
let sizes = [100, 500, 1000];
let mut times_ns: Vec<u128> = Vec::new();
for size in sizes {
let runs: Vec<Arc<SortedRun<ArenaKeyHandle, i32>>> = (0..5)
.map(|run_id| {
let entries: Vec<(ArenaKeyHandle, i32)> = (0..size)
.map(|i| {
let key = format!("key_{:08}_{}", i * 5 + run_id, run_id);
(ArenaKeyHandle::new(key.as_bytes()), (i * 5 + run_id) as i32)
})
.collect();
Arc::new(SortedRun::from_sorted(entries, run_id))
})
.collect();
let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
let start = Instant::now();
let merged = index.k_way_merge(&runs);
let elapsed = start.elapsed();
times_ns.push(elapsed.as_nanos());
let total_entries = size * 5;
assert_eq!(merged.len(), total_entries, "Merge should produce all unique entries");
}
if times_ns.len() >= 2 && times_ns[0] > 0 {
let ratio_1_to_2 = times_ns[1] as f64 / times_ns[0] as f64;
let ratio_2_to_3 = times_ns[2] as f64 / times_ns[1] as f64;
assert!(ratio_1_to_2 < 15.0,
"Merge scaling should be sub-quadratic: ratio={:.1}x for 5x size", ratio_1_to_2);
assert!(ratio_2_to_3 < 10.0,
"Merge scaling should be sub-quadratic: ratio={:.1}x for 2x size", ratio_2_to_3);
}
}
#[test]
fn test_sorted_run_metadata_pruning() {
let entries = vec![
(ArenaKeyHandle::new(b"apple"), 1),
(ArenaKeyHandle::new(b"banana"), 2),
(ArenaKeyHandle::new(b"cherry"), 3),
];
let run = SortedRun::from_sorted(entries, 0);
assert_eq!(run.min_key().map(|k| k.as_bytes()), Some(b"apple".as_slice()));
assert_eq!(run.max_key().map(|k| k.as_bytes()), Some(b"cherry".as_slice()));
assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"banana"))); assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"apple"))); assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"cherry")));
assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"date"))); assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"zebra")));
assert!(run.overlaps_range(
&ArenaKeyHandle::new(b"banana"),
&ArenaKeyHandle::new(b"cherry")
));
assert!(!run.overlaps_range(
&ArenaKeyHandle::new(b"date"),
&ArenaKeyHandle::new(b"fig")
)); assert!(!run.overlaps_range(
&ArenaKeyHandle::new(b"aaa"),
&ArenaKeyHandle::new(b"aab")
)); }
#[test]
fn test_scan_prefix() {
let config = TableIndexConfig::new("test", IndexPolicy::Balanced)
.with_target_run_size(50); let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
let prefixes = ["user:1:", "user:2:", "order:1:", "order:2:"];
for (i, prefix) in prefixes.iter().enumerate() {
for j in 0..5 {
let key = format!("{}{}", prefix, j);
index.insert(ArenaKeyHandle::new(key.as_bytes()), (i * 10 + j) as i32);
}
}
index.compact_memtable();
index.insert(ArenaKeyHandle::new(b"user:1:99"), 199);
index.insert(ArenaKeyHandle::new(b"order:1:99"), 299);
let results = index.scan_prefix(&ArenaKeyHandle::new(b"user:1:"));
assert_eq!(results.len(), 6);
for (key, _value) in &results {
assert!(key.as_bytes().starts_with(b"user:1:"),
"Key {:?} should start with user:1:", String::from_utf8_lossy(key.as_bytes()));
}
for window in results.windows(2) {
assert!(window[0].0 <= window[1].0, "Results should be sorted by key");
}
let results = index.scan_prefix(&ArenaKeyHandle::new(b"order:"));
assert_eq!(results.len(), 11); }
#[test]
fn test_empty_sorted_run_metadata() {
let entries: Vec<(ArenaKeyHandle, i32)> = vec![];
let run = SortedRun::from_sorted(entries, 0);
assert!(run.min_key().is_none());
assert!(run.max_key().is_none());
assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"anything"))); assert!(run.overlaps_range(
&ArenaKeyHandle::new(b"a"),
&ArenaKeyHandle::new(b"z")
)); }
}