sochdb_storage/
index_policy.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-Table Index Policy + Scan-Optimized Structure
16//!
17//! This module replaces the global `enable_ordered_index` toggle with
18//! a per-table index policy, allowing fine-grained control over write
19//! throughput vs scan performance trade-offs.
20//!
21//! ## Problem: Global Toggle is Too Coarse
22//!
23//! The current `DatabaseConfig::enable_ordered_index` is a single global toggle.
24//! This forces a choice:
25//! - Enabled: Pay ~134 ns/op on ALL writes for O(log N) scans
26//! - Disabled: Fast writes but O(N) scans on ALL tables
27//!
28//! Real workloads have mixed access patterns:
29//! - Write-heavy logs tables → don't need ordered index
30//! - Scan-heavy analytics tables → need ordered index
31//! - OLTP tables → need balanced policy
32//!
33//! ## Solution: Per-Table Index Policy
34//!
35//! ```text
36//! ┌─────────────────────────────────────────────────────────────────┐
37//! │                      Table Index Registry                        │
38//! ├─────────────────────────────────────────────────────────────────┤
39//! │                                                                  │
40//! │  "users"    → WriteOptimized (no ordered index)                 │
41//! │  "orders"   → Balanced (lazy compaction to sorted runs)          │
42//! │  "analytics"→ ScanOptimized (maintain ordered index)             │
43//! │  "logs"     → AppendOnly (no index, time-ordered writes)         │
44//! │                                                                  │
45//! └─────────────────────────────────────────────────────────────────┘
46//! ```
47//!
48//! ## Index Policies
49//!
50//! | Policy         | Insert Cost | Scan Cost      | Use Case              |
51//! |----------------|-------------|----------------|------------------------|
52//! | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
53//! | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
54//! | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
55//! | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
56//!
57//! ## LSM-Style Balanced Policy
58//!
59//! For `Balanced` tables, we use an LSM-style approach:
60//! - Unsorted append-friendly memtable (O(1) inserts)
61//! - Periodic compaction to sorted runs
62//! - K-way merge for range scans (O(output + log K))
63//!
64//! This retains range-scan capability without paying O(log N) on every write.
65
66use std::sync::Arc;
67
68use dashmap::DashMap;
69use parking_lot::RwLock;
70
71use crate::key_buffer::ArenaKeyHandle;
72
73// ============================================================================
74// IndexPolicy - Per-Table Index Configuration
75// ============================================================================
76
77/// Index policy for a table
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum IndexPolicy {
80    /// No ordered index - fastest writes, O(N) scans
81    /// Use for write-heavy tables that rarely need range scans.
82    WriteOptimized,
83    
84    /// LSM-style: unsorted memtable + periodic sorted runs
85    /// Amortized O(1) inserts, O(output + log K) scans where K = run count.
86    /// Good balance for mixed OLTP workloads.
87    Balanced,
88    
89    /// Maintain ordered index on every write
90    /// O(log N) inserts, O(log N + K) scans.
91    /// Use for analytics tables with frequent range queries.
92    ScanOptimized,
93    
94    /// Append-only with no indexing
95    /// O(1) inserts, O(N) scans (but efficient forward iteration).
96    /// Use for time-series logs where data is naturally ordered.
97    AppendOnly,
98}
99
100impl Default for IndexPolicy {
101    fn default() -> Self {
102        IndexPolicy::Balanced
103    }
104}
105
106impl IndexPolicy {
107    /// Parse from string
108    pub fn from_str(s: &str) -> Option<Self> {
109        match s.to_lowercase().as_str() {
110            "write_optimized" | "write-optimized" | "write" => Some(IndexPolicy::WriteOptimized),
111            "balanced" | "default" => Some(IndexPolicy::Balanced),
112            "scan_optimized" | "scan-optimized" | "scan" => Some(IndexPolicy::ScanOptimized),
113            "append_only" | "append-only" | "append" => Some(IndexPolicy::AppendOnly),
114            _ => None,
115        }
116    }
117
118    /// Get write cost description
119    pub fn write_cost(&self) -> &'static str {
120        match self {
121            IndexPolicy::WriteOptimized => "O(1)",
122            IndexPolicy::Balanced => "O(1) amortized",
123            IndexPolicy::ScanOptimized => "O(log N)",
124            IndexPolicy::AppendOnly => "O(1)",
125        }
126    }
127
128    /// Get scan cost description
129    pub fn scan_cost(&self) -> &'static str {
130        match self {
131            IndexPolicy::WriteOptimized => "O(N)",
132            IndexPolicy::Balanced => "O(output + log K)",
133            IndexPolicy::ScanOptimized => "O(log N + K)",
134            IndexPolicy::AppendOnly => "O(N)",
135        }
136    }
137
138    /// Whether this policy maintains an ordered index
139    pub fn has_ordered_index(&self) -> bool {
140        matches!(self, IndexPolicy::ScanOptimized)
141    }
142
143    /// Whether this policy supports efficient range scans
144    pub fn supports_efficient_scans(&self) -> bool {
145        matches!(self, IndexPolicy::ScanOptimized | IndexPolicy::Balanced)
146    }
147}
148
149// ============================================================================
150// TableIndexConfig - Configuration for a Single Table
151// ============================================================================
152
153/// Index configuration for a single table
154#[derive(Debug, Clone)]
155pub struct TableIndexConfig {
156    /// Table name
157    pub table_name: String,
158    /// Index policy
159    pub policy: IndexPolicy,
160    /// Maximum sorted runs before compaction (for Balanced policy)
161    pub max_sorted_runs: usize,
162    /// Target sorted run size in bytes
163    pub target_run_size: usize,
164    /// Enable bloom filters for point queries
165    pub enable_bloom_filter: bool,
166}
167
168impl TableIndexConfig {
169    /// Create a new table index config
170    pub fn new(table_name: impl Into<String>, policy: IndexPolicy) -> Self {
171        Self {
172            table_name: table_name.into(),
173            policy,
174            max_sorted_runs: 4,
175            target_run_size: 16 * 1024 * 1024, // 16MB
176            enable_bloom_filter: true,
177        }
178    }
179
180    /// Builder: set max sorted runs
181    pub fn with_max_sorted_runs(mut self, max: usize) -> Self {
182        self.max_sorted_runs = max;
183        self
184    }
185
186    /// Builder: set target run size
187    pub fn with_target_run_size(mut self, size: usize) -> Self {
188        self.target_run_size = size;
189        self
190    }
191
192    /// Builder: enable/disable bloom filter
193    pub fn with_bloom_filter(mut self, enable: bool) -> Self {
194        self.enable_bloom_filter = enable;
195        self
196    }
197}
198
199// ============================================================================
200// TableIndexRegistry - Central Registry for Per-Table Policies
201// ============================================================================
202
203/// Registry of per-table index policies
204///
205/// Allows setting different index policies for different tables,
206/// with a default policy for tables not explicitly configured.
207pub struct TableIndexRegistry {
208    /// Per-table configurations
209    configs: DashMap<String, TableIndexConfig>,
210    /// Default policy for unconfigured tables
211    default_policy: RwLock<IndexPolicy>,
212}
213
214impl TableIndexRegistry {
215    /// Create a new registry with default Balanced policy
216    pub fn new() -> Self {
217        Self {
218            configs: DashMap::new(),
219            default_policy: RwLock::new(IndexPolicy::Balanced),
220        }
221    }
222
223    /// Create with a specific default policy
224    pub fn with_default_policy(policy: IndexPolicy) -> Self {
225        Self {
226            configs: DashMap::new(),
227            default_policy: RwLock::new(policy),
228        }
229    }
230
231    /// Set the default policy for unconfigured tables
232    pub fn set_default_policy(&self, policy: IndexPolicy) {
233        *self.default_policy.write() = policy;
234    }
235
236    /// Get the default policy
237    pub fn default_policy(&self) -> IndexPolicy {
238        *self.default_policy.read()
239    }
240
241    /// Configure a table with a specific policy
242    pub fn configure_table(&self, config: TableIndexConfig) {
243        self.configs.insert(config.table_name.clone(), config);
244    }
245
246    /// Get the policy for a table
247    pub fn get_policy(&self, table_name: &str) -> IndexPolicy {
248        self.configs
249            .get(table_name)
250            .map(|c| c.policy)
251            .unwrap_or_else(|| *self.default_policy.read())
252    }
253
254    /// Get the full config for a table (or default)
255    pub fn get_config(&self, table_name: &str) -> TableIndexConfig {
256        self.configs
257            .get(table_name)
258            .map(|c| c.clone())
259            .unwrap_or_else(|| {
260                TableIndexConfig::new(table_name, *self.default_policy.read())
261            })
262    }
263
264    /// Check if a table has an explicitly configured policy
265    pub fn has_explicit_config(&self, table_name: &str) -> bool {
266        self.configs.contains_key(table_name)
267    }
268
269    /// Remove a table's configuration (reverts to default)
270    pub fn remove_config(&self, table_name: &str) -> Option<TableIndexConfig> {
271        self.configs.remove(table_name).map(|(_, c)| c)
272    }
273
274    /// List all configured tables
275    pub fn configured_tables(&self) -> Vec<String> {
276        self.configs.iter().map(|e| e.key().clone()).collect()
277    }
278}
279
280impl Default for TableIndexRegistry {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286// ============================================================================
287// SortedRun - Immutable Sorted Segment for Balanced Policy
288// ============================================================================
289
290/// An immutable sorted segment of key-value pairs
291///
292/// Used by the Balanced policy for LSM-style scan optimization.
293/// Each run is sorted by key, enabling efficient k-way merge.
294/// 
295/// ## Key Range Metadata
296/// 
297/// Each run stores `min_key` and `max_key` bounds for O(1) overlap checking.
298/// This enables prefix scan pruning: runs that don't overlap the prefix
299/// range can be skipped entirely.
300#[derive(Debug)]
301pub struct SortedRun<K, V> {
302    /// Sorted entries
303    entries: Vec<(K, V)>,
304    /// Minimum key in this run (for scan pruning)
305    min_key: Option<K>,
306    /// Maximum key in this run (for scan pruning)
307    max_key: Option<K>,
308    /// Size in bytes (approximate)
309    size_bytes: usize,
310    /// Creation timestamp
311    #[allow(dead_code)]
312    created_at: std::time::Instant,
313    /// Run level in LSM hierarchy
314    level: usize,
315}
316
317impl<K: Ord + Clone, V: Clone> SortedRun<K, V> {
318    /// Create a new sorted run from unsorted entries
319    pub fn from_unsorted(mut entries: Vec<(K, V)>, level: usize) -> Self {
320        entries.sort_by(|a, b| a.0.cmp(&b.0));
321        let size_bytes = std::mem::size_of_val(&entries);
322        let min_key = entries.first().map(|(k, _)| k.clone());
323        let max_key = entries.last().map(|(k, _)| k.clone());
324        Self {
325            entries,
326            min_key,
327            max_key,
328            size_bytes,
329            created_at: std::time::Instant::now(),
330            level,
331        }
332    }
333
334    /// Create from already-sorted entries
335    pub fn from_sorted(entries: Vec<(K, V)>, level: usize) -> Self {
336        let size_bytes = std::mem::size_of_val(&entries);
337        let min_key = entries.first().map(|(k, _)| k.clone());
338        let max_key = entries.last().map(|(k, _)| k.clone());
339        Self {
340            entries,
341            min_key,
342            max_key,
343            size_bytes,
344            created_at: std::time::Instant::now(),
345            level,
346        }
347    }
348
349    /// Get number of entries
350    pub fn len(&self) -> usize {
351        self.entries.len()
352    }
353
354    /// Check if empty
355    pub fn is_empty(&self) -> bool {
356        self.entries.is_empty()
357    }
358
359    /// Get size in bytes
360    pub fn size_bytes(&self) -> usize {
361        self.size_bytes
362    }
363
364    /// Get the level
365    pub fn level(&self) -> usize {
366        self.level
367    }
368
369    /// Binary search for a key
370    pub fn get(&self, key: &K) -> Option<&V> {
371        self.entries
372            .binary_search_by(|(k, _)| k.cmp(key))
373            .ok()
374            .map(|idx| &self.entries[idx].1)
375    }
376
377    /// Range scan from start key
378    pub fn range_from<'a>(&'a self, start: &K) -> impl Iterator<Item = &'a (K, V)> {
379        let idx = self.entries
380            .binary_search_by(|(k, _)| k.cmp(start))
381            .unwrap_or_else(|i| i);
382        self.entries[idx..].iter()
383    }
384
385    /// Range scan with bounds
386    pub fn range<'a>(&'a self, start: &K, end: &K) -> impl Iterator<Item = &'a (K, V)> {
387        let start_idx = self.entries
388            .binary_search_by(|(k, _)| k.cmp(start))
389            .unwrap_or_else(|i| i);
390        let end_idx = self.entries
391            .binary_search_by(|(k, _)| k.cmp(end))
392            .unwrap_or_else(|i| i);
393        self.entries[start_idx..end_idx].iter()
394    }
395
396    /// Iterate all entries
397    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
398        self.entries.iter()
399    }
400
401    /// Direct access to underlying entries for O(1) indexing
402    /// 
403    /// Required for efficient k-way merge. Without this accessor,
404    /// callers are forced to use `iter().nth()` which is O(n) per call.
405    #[inline]
406    pub fn entries(&self) -> &[(K, V)] {
407        &self.entries
408    }
409
410    /// Check if this run might contain keys with the given prefix.
411    ///
412    /// Uses stored min_key and max_key bounds for O(1) overlap check.
413    /// Returns `true` if the run may contain matching keys (conservative).
414    /// Returns `false` only if we can prove no keys match.
415    ///
416    /// # Prefix Overlap Logic
417    ///
418    /// For a run with [min_key, max_key] to overlap prefix range [prefix, prefix++):
419    /// - If max_key < prefix → run is entirely before prefix → no overlap
420    /// - If min_key starts with prefix OR min_key < prefix and max_key >= prefix → overlap
421    ///
422    /// We use a conservative check: return true unless max_key < prefix.
423    #[inline]
424    pub fn overlaps_prefix(&self, prefix: &K) -> bool {
425        match &self.max_key {
426            Some(max) if max < prefix => false, // Run entirely before prefix
427            _ => true, // Could overlap (conservative)
428        }
429    }
430
431    /// Check if this run might contain keys in the given range.
432    ///
433    /// Uses stored min_key and max_key bounds for O(1) overlap check.
434    /// Returns `true` if the run may contain matching keys (conservative).
435    #[inline]
436    pub fn overlaps_range(&self, start: &K, end: &K) -> bool {
437        // Check if run is entirely outside the range
438        match (&self.min_key, &self.max_key) {
439            (Some(min), _) if min >= end => false,  // Run entirely after range
440            (_, Some(max)) if max < start => false, // Run entirely before range
441            _ => true, // Could overlap
442        }
443    }
444
445    /// Get the minimum key in this run, if any
446    #[inline]
447    pub fn min_key(&self) -> Option<&K> {
448        self.min_key.as_ref()
449    }
450
451    /// Get the maximum key in this run, if any
452    #[inline]
453    pub fn max_key(&self) -> Option<&K> {
454        self.max_key.as_ref()
455    }
456}
457
458// ============================================================================
459// BalancedTableIndex - LSM-Style Index for Balanced Policy
460// ============================================================================
461
462/// LSM-style index for the Balanced policy
463///
464/// Combines:
465/// - Unsorted DashMap for O(1) writes
466/// - Periodic compaction to sorted runs
467/// - K-way merge for range scans
468pub struct BalancedTableIndex<V: Clone + Send + Sync + Eq + 'static> {
469    /// Unsorted memtable for fast writes
470    memtable: DashMap<ArenaKeyHandle, V>,
471    /// Sorted runs for efficient scans
472    sorted_runs: RwLock<Vec<Arc<SortedRun<ArenaKeyHandle, V>>>>,
473    /// Configuration
474    config: TableIndexConfig,
475    /// Size of memtable in bytes
476    memtable_size: std::sync::atomic::AtomicUsize,
477}
478
479impl<V: Clone + Send + Sync + Eq + 'static> BalancedTableIndex<V> {
480    /// Create a new balanced table index
481    pub fn new(config: TableIndexConfig) -> Self {
482        Self {
483            memtable: DashMap::new(),
484            sorted_runs: RwLock::new(Vec::new()),
485            config,
486            memtable_size: std::sync::atomic::AtomicUsize::new(0),
487        }
488    }
489
490    /// Insert a key-value pair (O(1))
491    pub fn insert(&self, key: ArenaKeyHandle, value: V) {
492        let key_size = key.len();
493        let value_size = std::mem::size_of::<V>();
494        
495        self.memtable.insert(key, value);
496        self.memtable_size.fetch_add(key_size + value_size, std::sync::atomic::Ordering::Relaxed);
497    }
498
499    /// Get a value by key
500    /// 
501    /// Uses run metadata for O(1) pruning: skips runs where 
502    /// `max_key < key` (run entirely before search key).
503    pub fn get(&self, key: &ArenaKeyHandle) -> Option<V> {
504        // Check memtable first
505        if let Some(v) = self.memtable.get(key) {
506            return Some(v.clone());
507        }
508
509        // Check sorted runs (newest first)
510        // Use metadata pruning to skip runs that can't contain the key
511        let runs = self.sorted_runs.read();
512        for run in runs.iter().rev() {
513            // Prune: skip runs that are entirely before the key
514            if run.overlaps_prefix(key) {
515                if let Some(v) = run.get(key) {
516                    return Some(v.clone());
517                }
518            }
519        }
520
521        None
522    }
523
524    /// Scan entries with a given key prefix
525    /// 
526    /// Uses run metadata for O(1) pruning: only scans runs whose
527    /// key range overlaps with the prefix. Returns merged results
528    /// in key order with duplicates resolved (newest wins).
529    /// 
530    /// # Pruning Benefit
531    /// 
532    /// For selective prefixes (e.g., "user:123:" in a table with 1M keys),
533    /// most runs will have `max_key < prefix` or `min_key > prefix_end`,
534    /// allowing them to be skipped entirely.
535    pub fn scan_prefix(&self, prefix: &ArenaKeyHandle) -> Vec<(ArenaKeyHandle, V)> {
536        use std::collections::BinaryHeap;
537        use std::cmp::Reverse;
538
539        #[derive(Eq, PartialEq)]
540        struct PrefixHeapEntry<V: Clone> {
541            key: ArenaKeyHandle,
542            value: V,
543            source_idx: usize,  // 0 = memtable, 1+ = sorted runs
544        }
545
546        impl<V: Clone + Eq + PartialEq> Ord for PrefixHeapEntry<V> {
547            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
548                match self.key.cmp(&other.key) {
549                    std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
550                    other => other,
551                }
552            }
553        }
554
555        impl<V: Clone + Eq + PartialEq> PartialOrd for PrefixHeapEntry<V> {
556            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
557                Some(self.cmp(other))
558            }
559        }
560
561        let mut heap: BinaryHeap<Reverse<PrefixHeapEntry<V>>> = BinaryHeap::new();
562
563        // Collect matching entries from memtable
564        for entry in self.memtable.iter() {
565            let key = entry.key();
566            if key.as_bytes().starts_with(prefix.as_bytes()) {
567                heap.push(Reverse(PrefixHeapEntry {
568                    key: key.clone(),
569                    value: entry.value().clone(),
570                    source_idx: 0,
571                }));
572            }
573        }
574
575        // Collect from sorted runs, using metadata pruning
576        let runs = self.sorted_runs.read();
577        for (run_idx, run) in runs.iter().enumerate() {
578            // Prune: skip runs that can't contain any matching keys
579            if !run.overlaps_prefix(prefix) {
580                continue;  // Run is entirely before prefix
581            }
582
583            // Scan matching entries in this run
584            for (key, value) in run.range_from(prefix) {
585                if !key.as_bytes().starts_with(prefix.as_bytes()) {
586                    break;  // Past prefix range
587                }
588                heap.push(Reverse(PrefixHeapEntry {
589                    key: key.clone(),
590                    value: value.clone(),
591                    source_idx: run_idx + 1,  // 1-indexed (0 = memtable)
592                }));
593            }
594        }
595
596        // Merge and deduplicate
597        let mut result = Vec::with_capacity(heap.len());
598        let mut last_key: Option<ArenaKeyHandle> = None;
599
600        while let Some(Reverse(entry)) = heap.pop() {
601            let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
602            if is_new_key {
603                last_key = Some(entry.key.clone());
604                result.push((entry.key, entry.value));
605            }
606        }
607
608        result
609    }
610
611    /// Check if compaction is needed
612    pub fn needs_compaction(&self) -> bool {
613        let memtable_size = self.memtable_size.load(std::sync::atomic::Ordering::Relaxed);
614        let runs = self.sorted_runs.read();
615        
616        memtable_size >= self.config.target_run_size
617            || runs.len() >= self.config.max_sorted_runs
618    }
619
620    /// Compact memtable to a new sorted run
621    pub fn compact_memtable(&self) {
622        // Drain memtable
623        let entries: Vec<_> = self.memtable.iter()
624            .map(|e| (e.key().clone(), e.value().clone()))
625            .collect();
626        
627        if entries.is_empty() {
628            return;
629        }
630
631        // Clear memtable
632        self.memtable.clear();
633        self.memtable_size.store(0, std::sync::atomic::Ordering::Relaxed);
634
635        // Create new sorted run
636        let run = Arc::new(SortedRun::from_unsorted(entries, 0));
637        
638        let mut runs = self.sorted_runs.write();
639        runs.push(run);
640    }
641
642    /// Merge multiple sorted runs (compaction)
643    pub fn merge_runs(&self, levels_to_merge: usize) {
644        let mut runs = self.sorted_runs.write();
645        
646        if runs.len() < levels_to_merge {
647            return;
648        }
649
650        // Take the oldest runs to merge
651        let to_merge: Vec<_> = runs.drain(..levels_to_merge).collect();
652        
653        // K-way merge
654        let merged = self.k_way_merge(&to_merge);
655        
656        // Create new run at next level
657        let new_run = Arc::new(SortedRun::from_sorted(merged, to_merge.len()));
658        runs.insert(0, new_run);
659    }
660
661    /// K-way merge of sorted runs
662    /// 
663    /// Complexity: O(N log K) where N = total entries, K = number of runs
664    /// 
665    /// Key insight: Use direct indexing into the underlying Vec instead of 
666    /// creating new iterators. `iter().nth(n)` is O(n), making the old
667    /// implementation O(N²/K).
668    fn k_way_merge(&self, runs: &[Arc<SortedRun<ArenaKeyHandle, V>>]) -> Vec<(ArenaKeyHandle, V)> {
669        use std::collections::BinaryHeap;
670        use std::cmp::Reverse;
671
672        #[derive(Eq, PartialEq)]
673        struct HeapEntry<V: Clone> {
674            key: ArenaKeyHandle,
675            value: V,
676            run_idx: usize,
677            entry_idx: usize,
678        }
679
680        impl<V: Clone + Eq + PartialEq> Ord for HeapEntry<V> {
681            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
682                // Primary: key order (min-heap via Reverse wrapper)
683                // Secondary: run_idx for stability (lower = older = superseded)
684                match self.key.cmp(&other.key) {
685                    std::cmp::Ordering::Equal => self.run_idx.cmp(&other.run_idx),
686                    other => other,
687                }
688            }
689        }
690
691        impl<V: Clone + Eq + PartialEq> PartialOrd for HeapEntry<V> {
692            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
693                Some(self.cmp(other))
694            }
695        }
696
697        let mut heap: BinaryHeap<Reverse<HeapEntry<V>>> = BinaryHeap::new();
698        
699        // Track current position in each run (index into the underlying Vec)
700        let mut run_positions: Vec<usize> = vec![0; runs.len()];
701        
702        // Initialize heap with first entry from each run using direct indexing
703        for (run_idx, run) in runs.iter().enumerate() {
704            let entries = run.entries();
705            if !entries.is_empty() {
706                let (key, value) = &entries[0];  // O(1) direct access
707                heap.push(Reverse(HeapEntry {
708                    key: key.clone(),
709                    value: value.clone(),
710                    run_idx,
711                    entry_idx: 0,
712                }));
713            }
714        }
715
716        // Pre-allocate result based on estimated total entries
717        let estimated_size: usize = runs.iter().map(|r| r.len()).sum();
718        let mut result = Vec::with_capacity(estimated_size);
719        let mut last_key: Option<ArenaKeyHandle> = None;
720
721        while let Some(Reverse(entry)) = heap.pop() {
722            // Duplicate suppression: keep only the first occurrence (newest due to run ordering)
723            let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
724            if is_new_key {
725                last_key = Some(entry.key.clone());
726                result.push((entry.key.clone(), entry.value));
727            }
728
729            // Advance position in this run
730            run_positions[entry.run_idx] += 1;
731            let next_idx = run_positions[entry.run_idx];
732            
733            // FIX: Direct indexing is O(1), not O(n) like iter().nth()
734            let run_entries = runs[entry.run_idx].entries();
735            if next_idx < run_entries.len() {
736                let (key, value) = &run_entries[next_idx];  // O(1) access
737                heap.push(Reverse(HeapEntry {
738                    key: key.clone(),
739                    value: value.clone(),
740                    run_idx: entry.run_idx,
741                    entry_idx: next_idx,
742                }));
743            }
744        }
745
746        result
747    }
748
749    /// Get table config
750    pub fn config(&self) -> &TableIndexConfig {
751        &self.config
752    }
753
754    /// Get memtable size
755    pub fn memtable_size(&self) -> usize {
756        self.memtable_size.load(std::sync::atomic::Ordering::Relaxed)
757    }
758
759    /// Get number of sorted runs
760    pub fn run_count(&self) -> usize {
761        self.sorted_runs.read().len()
762    }
763}
764
765// ============================================================================
766// Tests
767// ============================================================================
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772
773    #[test]
774    fn test_index_policy_from_str() {
775        assert_eq!(IndexPolicy::from_str("write_optimized"), Some(IndexPolicy::WriteOptimized));
776        assert_eq!(IndexPolicy::from_str("balanced"), Some(IndexPolicy::Balanced));
777        assert_eq!(IndexPolicy::from_str("scan-optimized"), Some(IndexPolicy::ScanOptimized));
778        assert_eq!(IndexPolicy::from_str("append_only"), Some(IndexPolicy::AppendOnly));
779        assert_eq!(IndexPolicy::from_str("invalid"), None);
780    }
781
782    #[test]
783    fn test_registry_default_policy() {
784        let registry = TableIndexRegistry::new();
785        
786        // Unconfigured table gets default policy
787        assert_eq!(registry.get_policy("unknown"), IndexPolicy::Balanced);
788        
789        // Configure a table
790        registry.configure_table(TableIndexConfig::new("users", IndexPolicy::WriteOptimized));
791        assert_eq!(registry.get_policy("users"), IndexPolicy::WriteOptimized);
792        
793        // Other tables still get default
794        assert_eq!(registry.get_policy("orders"), IndexPolicy::Balanced);
795    }
796
797    #[test]
798    fn test_registry_change_default() {
799        let registry = TableIndexRegistry::new();
800        
801        registry.set_default_policy(IndexPolicy::ScanOptimized);
802        assert_eq!(registry.get_policy("any_table"), IndexPolicy::ScanOptimized);
803    }
804
805    #[test]
806    fn test_sorted_run() {
807        let entries = vec![
808            (ArenaKeyHandle::new(b"c"), 3),
809            (ArenaKeyHandle::new(b"a"), 1),
810            (ArenaKeyHandle::new(b"b"), 2),
811        ];
812        
813        let run = SortedRun::from_unsorted(entries, 0);
814        
815        assert_eq!(run.len(), 3);
816        assert_eq!(run.get(&ArenaKeyHandle::new(b"a")), Some(&1));
817        assert_eq!(run.get(&ArenaKeyHandle::new(b"b")), Some(&2));
818        assert_eq!(run.get(&ArenaKeyHandle::new(b"c")), Some(&3));
819        assert_eq!(run.get(&ArenaKeyHandle::new(b"d")), None);
820    }
821
822    #[test]
823    fn test_balanced_table_index() {
824        let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
825        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
826        
827        index.insert(ArenaKeyHandle::new(b"key1"), 1);
828        index.insert(ArenaKeyHandle::new(b"key2"), 2);
829        
830        assert_eq!(index.get(&ArenaKeyHandle::new(b"key1")), Some(1));
831        assert_eq!(index.get(&ArenaKeyHandle::new(b"key2")), Some(2));
832        assert_eq!(index.get(&ArenaKeyHandle::new(b"key3")), None);
833    }
834
835    #[test]
836    fn test_balanced_compaction() {
837        let config = TableIndexConfig::new("test", IndexPolicy::Balanced)
838            .with_target_run_size(100); // Small size to trigger compaction
839        
840        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
841        
842        for i in 0..10 {
843            let key = format!("key{:03}", i);
844            index.insert(ArenaKeyHandle::new(key.as_bytes()), i as i32);
845        }
846        
847        // Compact memtable
848        index.compact_memtable();
849        
850        assert_eq!(index.run_count(), 1);
851        assert_eq!(index.memtable_size(), 0);
852        
853        // Values should still be accessible
854        assert_eq!(index.get(&ArenaKeyHandle::new(b"key005")), Some(5));
855    }
856
857    #[test]
858    fn test_k_way_merge_scaling() {
859        // Verify O(N log K) complexity by checking that merge time scales linearly
860        // with N (not quadratically as the old iter().nth() implementation would)
861        use std::time::Instant;
862        
863        let sizes = [100, 500, 1000];
864        let mut times_ns: Vec<u128> = Vec::new();
865        
866        for size in sizes {
867            // Create 5 runs with `size` entries each
868            let runs: Vec<Arc<SortedRun<ArenaKeyHandle, i32>>> = (0..5)
869                .map(|run_id| {
870                    let entries: Vec<(ArenaKeyHandle, i32)> = (0..size)
871                        .map(|i| {
872                            let key = format!("key_{:08}_{}", i * 5 + run_id, run_id);
873                            (ArenaKeyHandle::new(key.as_bytes()), (i * 5 + run_id) as i32)
874                        })
875                        .collect();
876                    Arc::new(SortedRun::from_sorted(entries, run_id))
877                })
878                .collect();
879            
880            let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
881            let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
882            
883            let start = Instant::now();
884            let merged = index.k_way_merge(&runs);
885            let elapsed = start.elapsed();
886            
887            times_ns.push(elapsed.as_nanos());
888            
889            // Verify merge produced correct output
890            let total_entries = size * 5;
891            assert_eq!(merged.len(), total_entries, "Merge should produce all unique entries");
892        }
893        
894        // For O(N log K) scaling, time should roughly double when N doubles
895        // (since log K is constant). For O(N²), time would quadruple.
896        // We check that the ratio is closer to linear than quadratic.
897        if times_ns.len() >= 2 && times_ns[0] > 0 {
898            let ratio_1_to_2 = times_ns[1] as f64 / times_ns[0] as f64;
899            let ratio_2_to_3 = times_ns[2] as f64 / times_ns[1] as f64;
900            
901            // For linear scaling with 5x size increase, expect ~5x time increase
902            // For quadratic, expect ~25x. We assert it's closer to linear.
903            assert!(ratio_1_to_2 < 15.0, 
904                "Merge scaling should be sub-quadratic: ratio={:.1}x for 5x size", ratio_1_to_2);
905            assert!(ratio_2_to_3 < 10.0,
906                "Merge scaling should be sub-quadratic: ratio={:.1}x for 2x size", ratio_2_to_3);
907        }
908    }
909
910    #[test]
911    fn test_sorted_run_metadata_pruning() {
912        // Test that min_key and max_key are correctly computed
913        let entries = vec![
914            (ArenaKeyHandle::new(b"apple"), 1),
915            (ArenaKeyHandle::new(b"banana"), 2),
916            (ArenaKeyHandle::new(b"cherry"), 3),
917        ];
918        let run = SortedRun::from_sorted(entries, 0);
919        
920        // Verify min/max are set correctly
921        assert_eq!(run.min_key().map(|k| k.as_bytes()), Some(b"apple".as_slice()));
922        assert_eq!(run.max_key().map(|k| k.as_bytes()), Some(b"cherry".as_slice()));
923        
924        // Test overlaps_prefix pruning
925        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"banana"))); // In range
926        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"apple")));  // At start
927        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"cherry"))); // At end
928        
929        // Prefix BEFORE range should not overlap (max_key < prefix)
930        assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"date")));  // After range
931        assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"zebra"))); // Way after
932        
933        // Test overlaps_range pruning
934        assert!(run.overlaps_range(
935            &ArenaKeyHandle::new(b"banana"),
936            &ArenaKeyHandle::new(b"cherry")
937        ));
938        assert!(!run.overlaps_range(
939            &ArenaKeyHandle::new(b"date"),
940            &ArenaKeyHandle::new(b"fig")
941        )); // Entirely after
942        assert!(!run.overlaps_range(
943            &ArenaKeyHandle::new(b"aaa"),
944            &ArenaKeyHandle::new(b"aab")
945        )); // Entirely before
946    }
947
948    #[test]
949    fn test_scan_prefix() {
950        let config = TableIndexConfig::new("test", IndexPolicy::Balanced)
951            .with_target_run_size(50); // Small to trigger compaction
952        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
953        
954        // Insert entries with different prefixes
955        let prefixes = ["user:1:", "user:2:", "order:1:", "order:2:"];
956        for (i, prefix) in prefixes.iter().enumerate() {
957            for j in 0..5 {
958                let key = format!("{}{}", prefix, j);
959                index.insert(ArenaKeyHandle::new(key.as_bytes()), (i * 10 + j) as i32);
960            }
961        }
962        
963        // Compact to create sorted runs
964        index.compact_memtable();
965        
966        // Add more entries to memtable
967        index.insert(ArenaKeyHandle::new(b"user:1:99"), 199);
968        index.insert(ArenaKeyHandle::new(b"order:1:99"), 299);
969        
970        // Scan for user:1: prefix
971        let results = index.scan_prefix(&ArenaKeyHandle::new(b"user:1:"));
972        assert_eq!(results.len(), 6); // 5 from run + 1 from memtable
973        
974        // Verify all results have the correct prefix
975        for (key, _value) in &results {
976            assert!(key.as_bytes().starts_with(b"user:1:"), 
977                "Key {:?} should start with user:1:", String::from_utf8_lossy(key.as_bytes()));
978        }
979        
980        // Verify results are sorted
981        for window in results.windows(2) {
982            assert!(window[0].0 <= window[1].0, "Results should be sorted by key");
983        }
984        
985        // Scan for order: prefix
986        let results = index.scan_prefix(&ArenaKeyHandle::new(b"order:"));
987        assert_eq!(results.len(), 11); // 10 from run + 1 from memtable
988    }
989
990    #[test]
991    fn test_empty_sorted_run_metadata() {
992        // Empty run should have None for min/max
993        let entries: Vec<(ArenaKeyHandle, i32)> = vec![];
994        let run = SortedRun::from_sorted(entries, 0);
995        
996        assert!(run.min_key().is_none());
997        assert!(run.max_key().is_none());
998        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"anything"))); // Conservative: true
999        assert!(run.overlaps_range(
1000            &ArenaKeyHandle::new(b"a"),
1001            &ArenaKeyHandle::new(b"z")
1002        )); // Conservative: true
1003    }
1004}