Skip to main content

sochdb_storage/
index_policy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Per-Table Index Policy + Scan-Optimized Structure
19//!
20//! This module replaces the global `enable_ordered_index` toggle with
21//! a per-table index policy, allowing fine-grained control over write
22//! throughput vs scan performance trade-offs.
23//!
24//! ## Problem: Global Toggle is Too Coarse
25//!
26//! The current `DatabaseConfig::enable_ordered_index` is a single global toggle.
27//! This forces a choice:
28//! - Enabled: Pay ~134 ns/op on ALL writes for O(log N) scans
29//! - Disabled: Fast writes but O(N) scans on ALL tables
30//!
31//! Real workloads have mixed access patterns:
32//! - Write-heavy logs tables → don't need ordered index
33//! - Scan-heavy analytics tables → need ordered index
34//! - OLTP tables → need balanced policy
35//!
36//! ## Solution: Per-Table Index Policy
37//!
38//! ```text
39//! ┌─────────────────────────────────────────────────────────────────┐
40//! │                      Table Index Registry                        │
41//! ├─────────────────────────────────────────────────────────────────┤
42//! │                                                                  │
43//! │  "users"    → WriteOptimized (no ordered index)                 │
44//! │  "orders"   → Balanced (lazy compaction to sorted runs)          │
45//! │  "analytics"→ ScanOptimized (maintain ordered index)             │
46//! │  "logs"     → AppendOnly (no index, time-ordered writes)         │
47//! │                                                                  │
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Index Policies
52//!
53//! | Policy         | Insert Cost | Scan Cost      | Use Case              |
54//! |----------------|-------------|----------------|------------------------|
55//! | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
56//! | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
57//! | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
58//! | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
59//!
60//! ## LSM-Style Balanced Policy
61//!
62//! For `Balanced` tables, we use an LSM-style approach:
63//! - Unsorted append-friendly memtable (O(1) inserts)
64//! - Periodic compaction to sorted runs
65//! - K-way merge for range scans (O(output + log K))
66//!
67//! This retains range-scan capability without paying O(log N) on every write.
68
69use std::sync::Arc;
70
71use dashmap::DashMap;
72use parking_lot::RwLock;
73
74use crate::key_buffer::ArenaKeyHandle;
75
76// ============================================================================
77// IndexPolicy - Per-Table Index Configuration
78// ============================================================================
79
80/// Index policy for a table
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum IndexPolicy {
83    /// No ordered index - fastest writes, O(N) scans
84    /// Use for write-heavy tables that rarely need range scans.
85    WriteOptimized,
86    
87    /// LSM-style: unsorted memtable + periodic sorted runs
88    /// Amortized O(1) inserts, O(output + log K) scans where K = run count.
89    /// Good balance for mixed OLTP workloads.
90    Balanced,
91    
92    /// Maintain ordered index on every write
93    /// O(log N) inserts, O(log N + K) scans.
94    /// Use for analytics tables with frequent range queries.
95    ScanOptimized,
96    
97    /// Append-only with no indexing
98    /// O(1) inserts, O(N) scans (but efficient forward iteration).
99    /// Use for time-series logs where data is naturally ordered.
100    AppendOnly,
101}
102
103impl Default for IndexPolicy {
104    fn default() -> Self {
105        IndexPolicy::Balanced
106    }
107}
108
109impl IndexPolicy {
110    /// Parse from string
111    pub fn from_str(s: &str) -> Option<Self> {
112        match s.to_lowercase().as_str() {
113            "write_optimized" | "write-optimized" | "write" => Some(IndexPolicy::WriteOptimized),
114            "balanced" | "default" => Some(IndexPolicy::Balanced),
115            "scan_optimized" | "scan-optimized" | "scan" => Some(IndexPolicy::ScanOptimized),
116            "append_only" | "append-only" | "append" => Some(IndexPolicy::AppendOnly),
117            _ => None,
118        }
119    }
120
121    /// Get write cost description
122    pub fn write_cost(&self) -> &'static str {
123        match self {
124            IndexPolicy::WriteOptimized => "O(1)",
125            IndexPolicy::Balanced => "O(1) amortized",
126            IndexPolicy::ScanOptimized => "O(log N)",
127            IndexPolicy::AppendOnly => "O(1)",
128        }
129    }
130
131    /// Get scan cost description
132    pub fn scan_cost(&self) -> &'static str {
133        match self {
134            IndexPolicy::WriteOptimized => "O(N)",
135            IndexPolicy::Balanced => "O(output + log K)",
136            IndexPolicy::ScanOptimized => "O(log N + K)",
137            IndexPolicy::AppendOnly => "O(N)",
138        }
139    }
140
141    /// Whether this policy maintains an ordered index
142    pub fn has_ordered_index(&self) -> bool {
143        matches!(self, IndexPolicy::ScanOptimized)
144    }
145
146    /// Whether this policy supports efficient range scans
147    pub fn supports_efficient_scans(&self) -> bool {
148        matches!(self, IndexPolicy::ScanOptimized | IndexPolicy::Balanced)
149    }
150}
151
152// ============================================================================
153// TableIndexConfig - Configuration for a Single Table
154// ============================================================================
155
156/// Index configuration for a single table
157#[derive(Debug, Clone)]
158pub struct TableIndexConfig {
159    /// Table name
160    pub table_name: String,
161    /// Index policy
162    pub policy: IndexPolicy,
163    /// Maximum sorted runs before compaction (for Balanced policy)
164    pub max_sorted_runs: usize,
165    /// Target sorted run size in bytes
166    pub target_run_size: usize,
167    /// Enable bloom filters for point queries
168    pub enable_bloom_filter: bool,
169}
170
171impl TableIndexConfig {
172    /// Create a new table index config
173    pub fn new(table_name: impl Into<String>, policy: IndexPolicy) -> Self {
174        Self {
175            table_name: table_name.into(),
176            policy,
177            max_sorted_runs: 4,
178            target_run_size: 16 * 1024 * 1024, // 16MB
179            enable_bloom_filter: true,
180        }
181    }
182
183    /// Builder: set max sorted runs
184    pub fn with_max_sorted_runs(mut self, max: usize) -> Self {
185        self.max_sorted_runs = max;
186        self
187    }
188
189    /// Builder: set target run size
190    pub fn with_target_run_size(mut self, size: usize) -> Self {
191        self.target_run_size = size;
192        self
193    }
194
195    /// Builder: enable/disable bloom filter
196    pub fn with_bloom_filter(mut self, enable: bool) -> Self {
197        self.enable_bloom_filter = enable;
198        self
199    }
200}
201
202// ============================================================================
203// TableIndexRegistry - Central Registry for Per-Table Policies
204// ============================================================================
205
206/// Registry of per-table index policies
207///
208/// Allows setting different index policies for different tables,
209/// with a default policy for tables not explicitly configured.
210pub struct TableIndexRegistry {
211    /// Per-table configurations
212    configs: DashMap<String, TableIndexConfig>,
213    /// Default policy for unconfigured tables
214    default_policy: RwLock<IndexPolicy>,
215}
216
217impl TableIndexRegistry {
218    /// Create a new registry with default Balanced policy
219    pub fn new() -> Self {
220        Self {
221            configs: DashMap::new(),
222            default_policy: RwLock::new(IndexPolicy::Balanced),
223        }
224    }
225
226    /// Create with a specific default policy
227    pub fn with_default_policy(policy: IndexPolicy) -> Self {
228        Self {
229            configs: DashMap::new(),
230            default_policy: RwLock::new(policy),
231        }
232    }
233
234    /// Set the default policy for unconfigured tables
235    pub fn set_default_policy(&self, policy: IndexPolicy) {
236        *self.default_policy.write() = policy;
237    }
238
239    /// Get the default policy
240    pub fn default_policy(&self) -> IndexPolicy {
241        *self.default_policy.read()
242    }
243
244    /// Configure a table with a specific policy
245    pub fn configure_table(&self, config: TableIndexConfig) {
246        self.configs.insert(config.table_name.clone(), config);
247    }
248
249    /// Get the policy for a table
250    pub fn get_policy(&self, table_name: &str) -> IndexPolicy {
251        self.configs
252            .get(table_name)
253            .map(|c| c.policy)
254            .unwrap_or_else(|| *self.default_policy.read())
255    }
256
257    /// Get the full config for a table (or default)
258    pub fn get_config(&self, table_name: &str) -> TableIndexConfig {
259        self.configs
260            .get(table_name)
261            .map(|c| c.clone())
262            .unwrap_or_else(|| {
263                TableIndexConfig::new(table_name, *self.default_policy.read())
264            })
265    }
266
267    /// Check if a table has an explicitly configured policy
268    pub fn has_explicit_config(&self, table_name: &str) -> bool {
269        self.configs.contains_key(table_name)
270    }
271
272    /// Remove a table's configuration (reverts to default)
273    pub fn remove_config(&self, table_name: &str) -> Option<TableIndexConfig> {
274        self.configs.remove(table_name).map(|(_, c)| c)
275    }
276
277    /// List all configured tables
278    pub fn configured_tables(&self) -> Vec<String> {
279        self.configs.iter().map(|e| e.key().clone()).collect()
280    }
281}
282
283impl Default for TableIndexRegistry {
284    fn default() -> Self {
285        Self::new()
286    }
287}
288
289// ============================================================================
290// SortedRun - Immutable Sorted Segment for Balanced Policy
291// ============================================================================
292
293/// An immutable sorted segment of key-value pairs
294///
295/// Used by the Balanced policy for LSM-style scan optimization.
296/// Each run is sorted by key, enabling efficient k-way merge.
297/// 
298/// ## Key Range Metadata
299/// 
300/// Each run stores `min_key` and `max_key` bounds for O(1) overlap checking.
301/// This enables prefix scan pruning: runs that don't overlap the prefix
302/// range can be skipped entirely.
303#[derive(Debug)]
304pub struct SortedRun<K, V> {
305    /// Sorted entries
306    entries: Vec<(K, V)>,
307    /// Minimum key in this run (for scan pruning)
308    min_key: Option<K>,
309    /// Maximum key in this run (for scan pruning)
310    max_key: Option<K>,
311    /// Size in bytes (approximate)
312    size_bytes: usize,
313    /// Creation timestamp
314    #[allow(dead_code)]
315    created_at: std::time::Instant,
316    /// Run level in LSM hierarchy
317    level: usize,
318}
319
320impl<K: Ord + Clone, V: Clone> SortedRun<K, V> {
321    /// Create a new sorted run from unsorted entries
322    pub fn from_unsorted(mut entries: Vec<(K, V)>, level: usize) -> Self {
323        entries.sort_by(|a, b| a.0.cmp(&b.0));
324        let size_bytes = std::mem::size_of_val(&entries);
325        let min_key = entries.first().map(|(k, _)| k.clone());
326        let max_key = entries.last().map(|(k, _)| k.clone());
327        Self {
328            entries,
329            min_key,
330            max_key,
331            size_bytes,
332            created_at: std::time::Instant::now(),
333            level,
334        }
335    }
336
337    /// Create from already-sorted entries
338    pub fn from_sorted(entries: Vec<(K, V)>, level: usize) -> Self {
339        let size_bytes = std::mem::size_of_val(&entries);
340        let min_key = entries.first().map(|(k, _)| k.clone());
341        let max_key = entries.last().map(|(k, _)| k.clone());
342        Self {
343            entries,
344            min_key,
345            max_key,
346            size_bytes,
347            created_at: std::time::Instant::now(),
348            level,
349        }
350    }
351
352    /// Get number of entries
353    pub fn len(&self) -> usize {
354        self.entries.len()
355    }
356
357    /// Check if empty
358    pub fn is_empty(&self) -> bool {
359        self.entries.is_empty()
360    }
361
362    /// Get size in bytes
363    pub fn size_bytes(&self) -> usize {
364        self.size_bytes
365    }
366
367    /// Get the level
368    pub fn level(&self) -> usize {
369        self.level
370    }
371
372    /// Binary search for a key
373    pub fn get(&self, key: &K) -> Option<&V> {
374        self.entries
375            .binary_search_by(|(k, _)| k.cmp(key))
376            .ok()
377            .map(|idx| &self.entries[idx].1)
378    }
379
380    /// Range scan from start key
381    pub fn range_from<'a>(&'a self, start: &K) -> impl Iterator<Item = &'a (K, V)> {
382        let idx = self.entries
383            .binary_search_by(|(k, _)| k.cmp(start))
384            .unwrap_or_else(|i| i);
385        self.entries[idx..].iter()
386    }
387
388    /// Range scan with bounds
389    pub fn range<'a>(&'a self, start: &K, end: &K) -> impl Iterator<Item = &'a (K, V)> {
390        let start_idx = self.entries
391            .binary_search_by(|(k, _)| k.cmp(start))
392            .unwrap_or_else(|i| i);
393        let end_idx = self.entries
394            .binary_search_by(|(k, _)| k.cmp(end))
395            .unwrap_or_else(|i| i);
396        self.entries[start_idx..end_idx].iter()
397    }
398
399    /// Iterate all entries
400    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
401        self.entries.iter()
402    }
403
404    /// Direct access to underlying entries for O(1) indexing
405    /// 
406    /// Required for efficient k-way merge. Without this accessor,
407    /// callers are forced to use `iter().nth()` which is O(n) per call.
408    #[inline]
409    pub fn entries(&self) -> &[(K, V)] {
410        &self.entries
411    }
412
413    /// Check if this run might contain keys with the given prefix.
414    ///
415    /// Uses stored min_key and max_key bounds for O(1) overlap check.
416    /// Returns `true` if the run may contain matching keys (conservative).
417    /// Returns `false` only if we can prove no keys match.
418    ///
419    /// # Prefix Overlap Logic
420    ///
421    /// For a run with [min_key, max_key] to overlap prefix range [prefix, prefix++):
422    /// - If max_key < prefix → run is entirely before prefix → no overlap
423    /// - If min_key starts with prefix OR min_key < prefix and max_key >= prefix → overlap
424    ///
425    /// We use a conservative check: return true unless max_key < prefix.
426    #[inline]
427    pub fn overlaps_prefix(&self, prefix: &K) -> bool {
428        match &self.max_key {
429            Some(max) if max < prefix => false, // Run entirely before prefix
430            _ => true, // Could overlap (conservative)
431        }
432    }
433
434    /// Check if this run might contain keys in the given range.
435    ///
436    /// Uses stored min_key and max_key bounds for O(1) overlap check.
437    /// Returns `true` if the run may contain matching keys (conservative).
438    #[inline]
439    pub fn overlaps_range(&self, start: &K, end: &K) -> bool {
440        // Check if run is entirely outside the range
441        match (&self.min_key, &self.max_key) {
442            (Some(min), _) if min >= end => false,  // Run entirely after range
443            (_, Some(max)) if max < start => false, // Run entirely before range
444            _ => true, // Could overlap
445        }
446    }
447
448    /// Get the minimum key in this run, if any
449    #[inline]
450    pub fn min_key(&self) -> Option<&K> {
451        self.min_key.as_ref()
452    }
453
454    /// Get the maximum key in this run, if any
455    #[inline]
456    pub fn max_key(&self) -> Option<&K> {
457        self.max_key.as_ref()
458    }
459}
460
461// ============================================================================
462// BalancedTableIndex - LSM-Style Index for Balanced Policy
463// ============================================================================
464
465/// LSM-style index for the Balanced policy
466///
467/// Combines:
468/// - Unsorted DashMap for O(1) writes
469/// - Periodic compaction to sorted runs
470/// - K-way merge for range scans
471pub struct BalancedTableIndex<V: Clone + Send + Sync + Eq + 'static> {
472    /// Unsorted memtable for fast writes
473    memtable: DashMap<ArenaKeyHandle, V>,
474    /// Sorted runs for efficient scans
475    sorted_runs: RwLock<Vec<Arc<SortedRun<ArenaKeyHandle, V>>>>,
476    /// Configuration
477    config: TableIndexConfig,
478    /// Size of memtable in bytes
479    memtable_size: std::sync::atomic::AtomicUsize,
480}
481
482impl<V: Clone + Send + Sync + Eq + 'static> BalancedTableIndex<V> {
483    /// Create a new balanced table index
484    pub fn new(config: TableIndexConfig) -> Self {
485        Self {
486            memtable: DashMap::new(),
487            sorted_runs: RwLock::new(Vec::new()),
488            config,
489            memtable_size: std::sync::atomic::AtomicUsize::new(0),
490        }
491    }
492
493    /// Insert a key-value pair (O(1))
494    pub fn insert(&self, key: ArenaKeyHandle, value: V) {
495        let key_size = key.len();
496        let value_size = std::mem::size_of::<V>();
497        
498        self.memtable.insert(key, value);
499        self.memtable_size.fetch_add(key_size + value_size, std::sync::atomic::Ordering::Relaxed);
500    }
501
502    /// Get a value by key
503    /// 
504    /// Uses run metadata for O(1) pruning: skips runs where 
505    /// `max_key < key` (run entirely before search key).
506    pub fn get(&self, key: &ArenaKeyHandle) -> Option<V> {
507        // Check memtable first
508        if let Some(v) = self.memtable.get(key) {
509            return Some(v.clone());
510        }
511
512        // Check sorted runs (newest first)
513        // Use metadata pruning to skip runs that can't contain the key
514        let runs = self.sorted_runs.read();
515        for run in runs.iter().rev() {
516            // Prune: skip runs that are entirely before the key
517            if run.overlaps_prefix(key) {
518                if let Some(v) = run.get(key) {
519                    return Some(v.clone());
520                }
521            }
522        }
523
524        None
525    }
526
527    /// Scan entries with a given key prefix
528    /// 
529    /// Uses run metadata for O(1) pruning: only scans runs whose
530    /// key range overlaps with the prefix. Returns merged results
531    /// in key order with duplicates resolved (newest wins).
532    /// 
533    /// # Pruning Benefit
534    /// 
535    /// For selective prefixes (e.g., "user:123:" in a table with 1M keys),
536    /// most runs will have `max_key < prefix` or `min_key > prefix_end`,
537    /// allowing them to be skipped entirely.
538    pub fn scan_prefix(&self, prefix: &ArenaKeyHandle) -> Vec<(ArenaKeyHandle, V)> {
539        use std::collections::BinaryHeap;
540        use std::cmp::Reverse;
541
542        #[derive(Eq, PartialEq)]
543        struct PrefixHeapEntry<V: Clone> {
544            key: ArenaKeyHandle,
545            value: V,
546            source_idx: usize,  // 0 = memtable, 1+ = sorted runs
547        }
548
549        impl<V: Clone + Eq + PartialEq> Ord for PrefixHeapEntry<V> {
550            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
551                match self.key.cmp(&other.key) {
552                    std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
553                    other => other,
554                }
555            }
556        }
557
558        impl<V: Clone + Eq + PartialEq> PartialOrd for PrefixHeapEntry<V> {
559            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
560                Some(self.cmp(other))
561            }
562        }
563
564        let mut heap: BinaryHeap<Reverse<PrefixHeapEntry<V>>> = BinaryHeap::new();
565
566        // Collect matching entries from memtable
567        for entry in self.memtable.iter() {
568            let key = entry.key();
569            if key.as_bytes().starts_with(prefix.as_bytes()) {
570                heap.push(Reverse(PrefixHeapEntry {
571                    key: key.clone(),
572                    value: entry.value().clone(),
573                    source_idx: 0,
574                }));
575            }
576        }
577
578        // Collect from sorted runs, using metadata pruning
579        let runs = self.sorted_runs.read();
580        for (run_idx, run) in runs.iter().enumerate() {
581            // Prune: skip runs that can't contain any matching keys
582            if !run.overlaps_prefix(prefix) {
583                continue;  // Run is entirely before prefix
584            }
585
586            // Scan matching entries in this run
587            for (key, value) in run.range_from(prefix) {
588                if !key.as_bytes().starts_with(prefix.as_bytes()) {
589                    break;  // Past prefix range
590                }
591                heap.push(Reverse(PrefixHeapEntry {
592                    key: key.clone(),
593                    value: value.clone(),
594                    source_idx: run_idx + 1,  // 1-indexed (0 = memtable)
595                }));
596            }
597        }
598
599        // Merge and deduplicate
600        let mut result = Vec::with_capacity(heap.len());
601        let mut last_key: Option<ArenaKeyHandle> = None;
602
603        while let Some(Reverse(entry)) = heap.pop() {
604            let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
605            if is_new_key {
606                last_key = Some(entry.key.clone());
607                result.push((entry.key, entry.value));
608            }
609        }
610
611        result
612    }
613
614    /// Check if compaction is needed
615    pub fn needs_compaction(&self) -> bool {
616        let memtable_size = self.memtable_size.load(std::sync::atomic::Ordering::Relaxed);
617        let runs = self.sorted_runs.read();
618        
619        memtable_size >= self.config.target_run_size
620            || runs.len() >= self.config.max_sorted_runs
621    }
622
623    /// Compact memtable to a new sorted run
624    pub fn compact_memtable(&self) {
625        // Drain memtable
626        let entries: Vec<_> = self.memtable.iter()
627            .map(|e| (e.key().clone(), e.value().clone()))
628            .collect();
629        
630        if entries.is_empty() {
631            return;
632        }
633
634        // Clear memtable
635        self.memtable.clear();
636        self.memtable_size.store(0, std::sync::atomic::Ordering::Relaxed);
637
638        // Create new sorted run
639        let run = Arc::new(SortedRun::from_unsorted(entries, 0));
640        
641        let mut runs = self.sorted_runs.write();
642        runs.push(run);
643    }
644
645    /// Merge multiple sorted runs (compaction)
646    pub fn merge_runs(&self, levels_to_merge: usize) {
647        let mut runs = self.sorted_runs.write();
648        
649        if runs.len() < levels_to_merge {
650            return;
651        }
652
653        // Take the oldest runs to merge
654        let to_merge: Vec<_> = runs.drain(..levels_to_merge).collect();
655        
656        // K-way merge
657        let merged = self.k_way_merge(&to_merge);
658        
659        // Create new run at next level
660        let new_run = Arc::new(SortedRun::from_sorted(merged, to_merge.len()));
661        runs.insert(0, new_run);
662    }
663
664    /// K-way merge of sorted runs
665    /// 
666    /// Complexity: O(N log K) where N = total entries, K = number of runs
667    /// 
668    /// Key insight: Use direct indexing into the underlying Vec instead of 
669    /// creating new iterators. `iter().nth(n)` is O(n), making the old
670    /// implementation O(N²/K).
671    fn k_way_merge(&self, runs: &[Arc<SortedRun<ArenaKeyHandle, V>>]) -> Vec<(ArenaKeyHandle, V)> {
672        use std::collections::BinaryHeap;
673        use std::cmp::Reverse;
674
675        #[derive(Eq, PartialEq)]
676        struct HeapEntry<V: Clone> {
677            key: ArenaKeyHandle,
678            value: V,
679            run_idx: usize,
680            entry_idx: usize,
681        }
682
683        impl<V: Clone + Eq + PartialEq> Ord for HeapEntry<V> {
684            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
685                // Primary: key order (min-heap via Reverse wrapper)
686                // Secondary: run_idx for stability (lower = older = superseded)
687                match self.key.cmp(&other.key) {
688                    std::cmp::Ordering::Equal => self.run_idx.cmp(&other.run_idx),
689                    other => other,
690                }
691            }
692        }
693
694        impl<V: Clone + Eq + PartialEq> PartialOrd for HeapEntry<V> {
695            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
696                Some(self.cmp(other))
697            }
698        }
699
700        let mut heap: BinaryHeap<Reverse<HeapEntry<V>>> = BinaryHeap::new();
701        
702        // Track current position in each run (index into the underlying Vec)
703        let mut run_positions: Vec<usize> = vec![0; runs.len()];
704        
705        // Initialize heap with first entry from each run using direct indexing
706        for (run_idx, run) in runs.iter().enumerate() {
707            let entries = run.entries();
708            if !entries.is_empty() {
709                let (key, value) = &entries[0];  // O(1) direct access
710                heap.push(Reverse(HeapEntry {
711                    key: key.clone(),
712                    value: value.clone(),
713                    run_idx,
714                    entry_idx: 0,
715                }));
716            }
717        }
718
719        // Pre-allocate result based on estimated total entries
720        let estimated_size: usize = runs.iter().map(|r| r.len()).sum();
721        let mut result = Vec::with_capacity(estimated_size);
722        let mut last_key: Option<ArenaKeyHandle> = None;
723
724        while let Some(Reverse(entry)) = heap.pop() {
725            // Duplicate suppression: keep only the first occurrence (newest due to run ordering)
726            let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
727            if is_new_key {
728                last_key = Some(entry.key.clone());
729                result.push((entry.key.clone(), entry.value));
730            }
731
732            // Advance position in this run
733            run_positions[entry.run_idx] += 1;
734            let next_idx = run_positions[entry.run_idx];
735            
736            // FIX: Direct indexing is O(1), not O(n) like iter().nth()
737            let run_entries = runs[entry.run_idx].entries();
738            if next_idx < run_entries.len() {
739                let (key, value) = &run_entries[next_idx];  // O(1) access
740                heap.push(Reverse(HeapEntry {
741                    key: key.clone(),
742                    value: value.clone(),
743                    run_idx: entry.run_idx,
744                    entry_idx: next_idx,
745                }));
746            }
747        }
748
749        result
750    }
751
752    /// Get table config
753    pub fn config(&self) -> &TableIndexConfig {
754        &self.config
755    }
756
757    /// Get memtable size
758    pub fn memtable_size(&self) -> usize {
759        self.memtable_size.load(std::sync::atomic::Ordering::Relaxed)
760    }
761
762    /// Get number of sorted runs
763    pub fn run_count(&self) -> usize {
764        self.sorted_runs.read().len()
765    }
766}
767
768// ============================================================================
769// Tests
770// ============================================================================
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775
776    #[test]
777    fn test_index_policy_from_str() {
778        assert_eq!(IndexPolicy::from_str("write_optimized"), Some(IndexPolicy::WriteOptimized));
779        assert_eq!(IndexPolicy::from_str("balanced"), Some(IndexPolicy::Balanced));
780        assert_eq!(IndexPolicy::from_str("scan-optimized"), Some(IndexPolicy::ScanOptimized));
781        assert_eq!(IndexPolicy::from_str("append_only"), Some(IndexPolicy::AppendOnly));
782        assert_eq!(IndexPolicy::from_str("invalid"), None);
783    }
784
785    #[test]
786    fn test_registry_default_policy() {
787        let registry = TableIndexRegistry::new();
788        
789        // Unconfigured table gets default policy
790        assert_eq!(registry.get_policy("unknown"), IndexPolicy::Balanced);
791        
792        // Configure a table
793        registry.configure_table(TableIndexConfig::new("users", IndexPolicy::WriteOptimized));
794        assert_eq!(registry.get_policy("users"), IndexPolicy::WriteOptimized);
795        
796        // Other tables still get default
797        assert_eq!(registry.get_policy("orders"), IndexPolicy::Balanced);
798    }
799
800    #[test]
801    fn test_registry_change_default() {
802        let registry = TableIndexRegistry::new();
803        
804        registry.set_default_policy(IndexPolicy::ScanOptimized);
805        assert_eq!(registry.get_policy("any_table"), IndexPolicy::ScanOptimized);
806    }
807
808    #[test]
809    fn test_sorted_run() {
810        let entries = vec![
811            (ArenaKeyHandle::new(b"c"), 3),
812            (ArenaKeyHandle::new(b"a"), 1),
813            (ArenaKeyHandle::new(b"b"), 2),
814        ];
815        
816        let run = SortedRun::from_unsorted(entries, 0);
817        
818        assert_eq!(run.len(), 3);
819        assert_eq!(run.get(&ArenaKeyHandle::new(b"a")), Some(&1));
820        assert_eq!(run.get(&ArenaKeyHandle::new(b"b")), Some(&2));
821        assert_eq!(run.get(&ArenaKeyHandle::new(b"c")), Some(&3));
822        assert_eq!(run.get(&ArenaKeyHandle::new(b"d")), None);
823    }
824
825    #[test]
826    fn test_balanced_table_index() {
827        let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
828        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
829        
830        index.insert(ArenaKeyHandle::new(b"key1"), 1);
831        index.insert(ArenaKeyHandle::new(b"key2"), 2);
832        
833        assert_eq!(index.get(&ArenaKeyHandle::new(b"key1")), Some(1));
834        assert_eq!(index.get(&ArenaKeyHandle::new(b"key2")), Some(2));
835        assert_eq!(index.get(&ArenaKeyHandle::new(b"key3")), None);
836    }
837
838    #[test]
839    fn test_balanced_compaction() {
840        let config = TableIndexConfig::new("test", IndexPolicy::Balanced)
841            .with_target_run_size(100); // Small size to trigger compaction
842        
843        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
844        
845        for i in 0..10 {
846            let key = format!("key{:03}", i);
847            index.insert(ArenaKeyHandle::new(key.as_bytes()), i as i32);
848        }
849        
850        // Compact memtable
851        index.compact_memtable();
852        
853        assert_eq!(index.run_count(), 1);
854        assert_eq!(index.memtable_size(), 0);
855        
856        // Values should still be accessible
857        assert_eq!(index.get(&ArenaKeyHandle::new(b"key005")), Some(5));
858    }
859
860    #[test]
861    fn test_k_way_merge_scaling() {
862        // Verify O(N log K) complexity by checking that merge time scales linearly
863        // with N (not quadratically as the old iter().nth() implementation would)
864        use std::time::Instant;
865        
866        let sizes = [100, 500, 1000];
867        let mut times_ns: Vec<u128> = Vec::new();
868        
869        for size in sizes {
870            // Create 5 runs with `size` entries each
871            let runs: Vec<Arc<SortedRun<ArenaKeyHandle, i32>>> = (0..5)
872                .map(|run_id| {
873                    let entries: Vec<(ArenaKeyHandle, i32)> = (0..size)
874                        .map(|i| {
875                            let key = format!("key_{:08}_{}", i * 5 + run_id, run_id);
876                            (ArenaKeyHandle::new(key.as_bytes()), (i * 5 + run_id) as i32)
877                        })
878                        .collect();
879                    Arc::new(SortedRun::from_sorted(entries, run_id))
880                })
881                .collect();
882            
883            let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
884            let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
885            
886            let start = Instant::now();
887            let merged = index.k_way_merge(&runs);
888            let elapsed = start.elapsed();
889            
890            times_ns.push(elapsed.as_nanos());
891            
892            // Verify merge produced correct output
893            let total_entries = size * 5;
894            assert_eq!(merged.len(), total_entries, "Merge should produce all unique entries");
895        }
896        
897        // For O(N log K) scaling, time should roughly double when N doubles
898        // (since log K is constant). For O(N²), time would quadruple.
899        // We check that the ratio is closer to linear than quadratic.
900        if times_ns.len() >= 2 && times_ns[0] > 0 {
901            let ratio_1_to_2 = times_ns[1] as f64 / times_ns[0] as f64;
902            let ratio_2_to_3 = times_ns[2] as f64 / times_ns[1] as f64;
903            
904            // For linear scaling with 5x size increase, expect ~5x time increase
905            // For quadratic, expect ~25x. We assert it's closer to linear.
906            assert!(ratio_1_to_2 < 15.0, 
907                "Merge scaling should be sub-quadratic: ratio={:.1}x for 5x size", ratio_1_to_2);
908            assert!(ratio_2_to_3 < 10.0,
909                "Merge scaling should be sub-quadratic: ratio={:.1}x for 2x size", ratio_2_to_3);
910        }
911    }
912
913    #[test]
914    fn test_sorted_run_metadata_pruning() {
915        // Test that min_key and max_key are correctly computed
916        let entries = vec![
917            (ArenaKeyHandle::new(b"apple"), 1),
918            (ArenaKeyHandle::new(b"banana"), 2),
919            (ArenaKeyHandle::new(b"cherry"), 3),
920        ];
921        let run = SortedRun::from_sorted(entries, 0);
922        
923        // Verify min/max are set correctly
924        assert_eq!(run.min_key().map(|k| k.as_bytes()), Some(b"apple".as_slice()));
925        assert_eq!(run.max_key().map(|k| k.as_bytes()), Some(b"cherry".as_slice()));
926        
927        // Test overlaps_prefix pruning
928        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"banana"))); // In range
929        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"apple")));  // At start
930        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"cherry"))); // At end
931        
932        // Prefix BEFORE range should not overlap (max_key < prefix)
933        assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"date")));  // After range
934        assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"zebra"))); // Way after
935        
936        // Test overlaps_range pruning
937        assert!(run.overlaps_range(
938            &ArenaKeyHandle::new(b"banana"),
939            &ArenaKeyHandle::new(b"cherry")
940        ));
941        assert!(!run.overlaps_range(
942            &ArenaKeyHandle::new(b"date"),
943            &ArenaKeyHandle::new(b"fig")
944        )); // Entirely after
945        assert!(!run.overlaps_range(
946            &ArenaKeyHandle::new(b"aaa"),
947            &ArenaKeyHandle::new(b"aab")
948        )); // Entirely before
949    }
950
951    #[test]
952    fn test_scan_prefix() {
953        let config = TableIndexConfig::new("test", IndexPolicy::Balanced)
954            .with_target_run_size(50); // Small to trigger compaction
955        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
956        
957        // Insert entries with different prefixes
958        let prefixes = ["user:1:", "user:2:", "order:1:", "order:2:"];
959        for (i, prefix) in prefixes.iter().enumerate() {
960            for j in 0..5 {
961                let key = format!("{}{}", prefix, j);
962                index.insert(ArenaKeyHandle::new(key.as_bytes()), (i * 10 + j) as i32);
963            }
964        }
965        
966        // Compact to create sorted runs
967        index.compact_memtable();
968        
969        // Add more entries to memtable
970        index.insert(ArenaKeyHandle::new(b"user:1:99"), 199);
971        index.insert(ArenaKeyHandle::new(b"order:1:99"), 299);
972        
973        // Scan for user:1: prefix
974        let results = index.scan_prefix(&ArenaKeyHandle::new(b"user:1:"));
975        assert_eq!(results.len(), 6); // 5 from run + 1 from memtable
976        
977        // Verify all results have the correct prefix
978        for (key, _value) in &results {
979            assert!(key.as_bytes().starts_with(b"user:1:"), 
980                "Key {:?} should start with user:1:", String::from_utf8_lossy(key.as_bytes()));
981        }
982        
983        // Verify results are sorted
984        for window in results.windows(2) {
985            assert!(window[0].0 <= window[1].0, "Results should be sorted by key");
986        }
987        
988        // Scan for order: prefix
989        let results = index.scan_prefix(&ArenaKeyHandle::new(b"order:"));
990        assert_eq!(results.len(), 11); // 10 from run + 1 from memtable
991    }
992
993    #[test]
994    fn test_empty_sorted_run_metadata() {
995        // Empty run should have None for min/max
996        let entries: Vec<(ArenaKeyHandle, i32)> = vec![];
997        let run = SortedRun::from_sorted(entries, 0);
998        
999        assert!(run.min_key().is_none());
1000        assert!(run.max_key().is_none());
1001        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"anything"))); // Conservative: true
1002        assert!(run.overlaps_range(
1003            &ArenaKeyHandle::new(b"a"),
1004            &ArenaKeyHandle::new(b"z")
1005        )); // Conservative: true
1006    }
1007}