Skip to main content

sochdb_storage/
index_policy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Per-Table Index Policy + Scan-Optimized Structure
19//!
20//! This module replaces the global `enable_ordered_index` toggle with
21//! a per-table index policy, allowing fine-grained control over write
22//! throughput vs scan performance trade-offs.
23//!
24//! ## Problem: Global Toggle is Too Coarse
25//!
26//! The current `DatabaseConfig::enable_ordered_index` is a single global toggle.
27//! This forces a choice:
28//! - Enabled: Pay ~134 ns/op on ALL writes for O(log N) scans
29//! - Disabled: Fast writes but O(N) scans on ALL tables
30//!
31//! Real workloads have mixed access patterns:
32//! - Write-heavy logs tables → don't need ordered index
33//! - Scan-heavy analytics tables → need ordered index
34//! - OLTP tables → need balanced policy
35//!
36//! ## Solution: Per-Table Index Policy
37//!
38//! ```text
39//! ┌─────────────────────────────────────────────────────────────────┐
40//! │                      Table Index Registry                        │
41//! ├─────────────────────────────────────────────────────────────────┤
42//! │                                                                  │
43//! │  "users"    → WriteOptimized (no ordered index)                 │
44//! │  "orders"   → Balanced (lazy compaction to sorted runs)          │
45//! │  "analytics"→ ScanOptimized (maintain ordered index)             │
46//! │  "logs"     → AppendOnly (no index, time-ordered writes)         │
47//! │                                                                  │
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Index Policies
52//!
53//! | Policy         | Insert Cost | Scan Cost      | Use Case              |
54//! |----------------|-------------|----------------|------------------------|
55//! | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
56//! | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
57//! | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
58//! | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
59//!
60//! ## LSM-Style Balanced Policy
61//!
62//! For `Balanced` tables, we use an LSM-style approach:
63//! - Unsorted append-friendly memtable (O(1) inserts)
64//! - Periodic compaction to sorted runs
65//! - K-way merge for range scans (O(output + log K))
66//!
67//! This retains range-scan capability without paying O(log N) on every write.
68
69use std::sync::Arc;
70
71use dashmap::DashMap;
72use parking_lot::RwLock;
73
74use crate::key_buffer::ArenaKeyHandle;
75
76// ============================================================================
77// IndexPolicy - Per-Table Index Configuration
78// ============================================================================
79
80/// Index policy for a table
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum IndexPolicy {
83    /// No ordered index - fastest writes, O(N) scans
84    /// Use for write-heavy tables that rarely need range scans.
85    WriteOptimized,
86
87    /// LSM-style: unsorted memtable + periodic sorted runs
88    /// Amortized O(1) inserts, O(output + log K) scans where K = run count.
89    /// Good balance for mixed OLTP workloads.
90    Balanced,
91
92    /// Maintain ordered index on every write
93    /// O(log N) inserts, O(log N + K) scans.
94    /// Use for analytics tables with frequent range queries.
95    ScanOptimized,
96
97    /// Append-only with no indexing
98    /// O(1) inserts, O(N) scans (but efficient forward iteration).
99    /// Use for time-series logs where data is naturally ordered.
100    AppendOnly,
101}
102
103impl Default for IndexPolicy {
104    fn default() -> Self {
105        IndexPolicy::Balanced
106    }
107}
108
109impl IndexPolicy {
110    /// Parse from string
111    pub fn from_str(s: &str) -> Option<Self> {
112        match s.to_lowercase().as_str() {
113            "write_optimized" | "write-optimized" | "write" => Some(IndexPolicy::WriteOptimized),
114            "balanced" | "default" => Some(IndexPolicy::Balanced),
115            "scan_optimized" | "scan-optimized" | "scan" => Some(IndexPolicy::ScanOptimized),
116            "append_only" | "append-only" | "append" => Some(IndexPolicy::AppendOnly),
117            _ => None,
118        }
119    }
120
121    /// Get write cost description
122    pub fn write_cost(&self) -> &'static str {
123        match self {
124            IndexPolicy::WriteOptimized => "O(1)",
125            IndexPolicy::Balanced => "O(1) amortized",
126            IndexPolicy::ScanOptimized => "O(log N)",
127            IndexPolicy::AppendOnly => "O(1)",
128        }
129    }
130
131    /// Get scan cost description
132    pub fn scan_cost(&self) -> &'static str {
133        match self {
134            IndexPolicy::WriteOptimized => "O(N)",
135            IndexPolicy::Balanced => "O(output + log K)",
136            IndexPolicy::ScanOptimized => "O(log N + K)",
137            IndexPolicy::AppendOnly => "O(N)",
138        }
139    }
140
141    /// Whether this policy maintains an ordered index
142    pub fn has_ordered_index(&self) -> bool {
143        matches!(self, IndexPolicy::ScanOptimized)
144    }
145
146    /// Whether this policy supports efficient range scans
147    pub fn supports_efficient_scans(&self) -> bool {
148        matches!(self, IndexPolicy::ScanOptimized | IndexPolicy::Balanced)
149    }
150}
151
152// ============================================================================
153// TableIndexConfig - Configuration for a Single Table
154// ============================================================================
155
156/// Index configuration for a single table
157#[derive(Debug, Clone)]
158pub struct TableIndexConfig {
159    /// Table name
160    pub table_name: String,
161    /// Index policy
162    pub policy: IndexPolicy,
163    /// Maximum sorted runs before compaction (for Balanced policy)
164    pub max_sorted_runs: usize,
165    /// Target sorted run size in bytes
166    pub target_run_size: usize,
167    /// Enable bloom filters for point queries
168    pub enable_bloom_filter: bool,
169}
170
171impl TableIndexConfig {
172    /// Create a new table index config
173    pub fn new(table_name: impl Into<String>, policy: IndexPolicy) -> Self {
174        Self {
175            table_name: table_name.into(),
176            policy,
177            max_sorted_runs: 4,
178            target_run_size: 16 * 1024 * 1024, // 16MB
179            enable_bloom_filter: true,
180        }
181    }
182
183    /// Builder: set max sorted runs
184    pub fn with_max_sorted_runs(mut self, max: usize) -> Self {
185        self.max_sorted_runs = max;
186        self
187    }
188
189    /// Builder: set target run size
190    pub fn with_target_run_size(mut self, size: usize) -> Self {
191        self.target_run_size = size;
192        self
193    }
194
195    /// Builder: enable/disable bloom filter
196    pub fn with_bloom_filter(mut self, enable: bool) -> Self {
197        self.enable_bloom_filter = enable;
198        self
199    }
200}
201
202// ============================================================================
203// TableIndexRegistry - Central Registry for Per-Table Policies
204// ============================================================================
205
206/// Registry of per-table index policies
207///
208/// Allows setting different index policies for different tables,
209/// with a default policy for tables not explicitly configured.
210pub struct TableIndexRegistry {
211    /// Per-table configurations
212    configs: DashMap<String, TableIndexConfig>,
213    /// Default policy for unconfigured tables
214    default_policy: RwLock<IndexPolicy>,
215}
216
217impl TableIndexRegistry {
218    /// Create a new registry with default Balanced policy
219    pub fn new() -> Self {
220        Self {
221            configs: DashMap::new(),
222            default_policy: RwLock::new(IndexPolicy::Balanced),
223        }
224    }
225
226    /// Create with a specific default policy
227    pub fn with_default_policy(policy: IndexPolicy) -> Self {
228        Self {
229            configs: DashMap::new(),
230            default_policy: RwLock::new(policy),
231        }
232    }
233
234    /// Set the default policy for unconfigured tables
235    pub fn set_default_policy(&self, policy: IndexPolicy) {
236        *self.default_policy.write() = policy;
237    }
238
239    /// Get the default policy
240    pub fn default_policy(&self) -> IndexPolicy {
241        *self.default_policy.read()
242    }
243
244    /// Configure a table with a specific policy
245    pub fn configure_table(&self, config: TableIndexConfig) {
246        self.configs.insert(config.table_name.clone(), config);
247    }
248
249    /// Get the policy for a table
250    pub fn get_policy(&self, table_name: &str) -> IndexPolicy {
251        self.configs
252            .get(table_name)
253            .map(|c| c.policy)
254            .unwrap_or_else(|| *self.default_policy.read())
255    }
256
257    /// Get the full config for a table (or default)
258    pub fn get_config(&self, table_name: &str) -> TableIndexConfig {
259        self.configs
260            .get(table_name)
261            .map(|c| c.clone())
262            .unwrap_or_else(|| TableIndexConfig::new(table_name, *self.default_policy.read()))
263    }
264
265    /// Check if a table has an explicitly configured policy
266    pub fn has_explicit_config(&self, table_name: &str) -> bool {
267        self.configs.contains_key(table_name)
268    }
269
270    /// Remove a table's configuration (reverts to default)
271    pub fn remove_config(&self, table_name: &str) -> Option<TableIndexConfig> {
272        self.configs.remove(table_name).map(|(_, c)| c)
273    }
274
275    /// List all configured tables
276    pub fn configured_tables(&self) -> Vec<String> {
277        self.configs.iter().map(|e| e.key().clone()).collect()
278    }
279}
280
281impl Default for TableIndexRegistry {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287// ============================================================================
288// SortedRun - Immutable Sorted Segment for Balanced Policy
289// ============================================================================
290
291/// An immutable sorted segment of key-value pairs
292///
293/// Used by the Balanced policy for LSM-style scan optimization.
294/// Each run is sorted by key, enabling efficient k-way merge.
295///
296/// ## Key Range Metadata
297///
298/// Each run stores `min_key` and `max_key` bounds for O(1) overlap checking.
299/// This enables prefix scan pruning: runs that don't overlap the prefix
300/// range can be skipped entirely.
301#[derive(Debug)]
302pub struct SortedRun<K, V> {
303    /// Sorted entries
304    entries: Vec<(K, V)>,
305    /// Minimum key in this run (for scan pruning)
306    min_key: Option<K>,
307    /// Maximum key in this run (for scan pruning)
308    max_key: Option<K>,
309    /// Size in bytes (approximate)
310    size_bytes: usize,
311    /// Creation timestamp
312    #[allow(dead_code)]
313    created_at: std::time::Instant,
314    /// Run level in LSM hierarchy
315    level: usize,
316}
317
318impl<K: Ord + Clone, V: Clone> SortedRun<K, V> {
319    /// Create a new sorted run from unsorted entries
320    pub fn from_unsorted(mut entries: Vec<(K, V)>, level: usize) -> Self {
321        entries.sort_by(|a, b| a.0.cmp(&b.0));
322        let size_bytes = std::mem::size_of_val(&entries);
323        let min_key = entries.first().map(|(k, _)| k.clone());
324        let max_key = entries.last().map(|(k, _)| k.clone());
325        Self {
326            entries,
327            min_key,
328            max_key,
329            size_bytes,
330            created_at: std::time::Instant::now(),
331            level,
332        }
333    }
334
335    /// Create from already-sorted entries
336    pub fn from_sorted(entries: Vec<(K, V)>, level: usize) -> Self {
337        let size_bytes = std::mem::size_of_val(&entries);
338        let min_key = entries.first().map(|(k, _)| k.clone());
339        let max_key = entries.last().map(|(k, _)| k.clone());
340        Self {
341            entries,
342            min_key,
343            max_key,
344            size_bytes,
345            created_at: std::time::Instant::now(),
346            level,
347        }
348    }
349
350    /// Get number of entries
351    pub fn len(&self) -> usize {
352        self.entries.len()
353    }
354
355    /// Check if empty
356    pub fn is_empty(&self) -> bool {
357        self.entries.is_empty()
358    }
359
360    /// Get size in bytes
361    pub fn size_bytes(&self) -> usize {
362        self.size_bytes
363    }
364
365    /// Get the level
366    pub fn level(&self) -> usize {
367        self.level
368    }
369
370    /// Binary search for a key
371    pub fn get(&self, key: &K) -> Option<&V> {
372        self.entries
373            .binary_search_by(|(k, _)| k.cmp(key))
374            .ok()
375            .map(|idx| &self.entries[idx].1)
376    }
377
378    /// Range scan from start key
379    pub fn range_from<'a>(&'a self, start: &K) -> impl Iterator<Item = &'a (K, V)> {
380        let idx = self
381            .entries
382            .binary_search_by(|(k, _)| k.cmp(start))
383            .unwrap_or_else(|i| i);
384        self.entries[idx..].iter()
385    }
386
387    /// Range scan with bounds
388    pub fn range<'a>(&'a self, start: &K, end: &K) -> impl Iterator<Item = &'a (K, V)> {
389        let start_idx = self
390            .entries
391            .binary_search_by(|(k, _)| k.cmp(start))
392            .unwrap_or_else(|i| i);
393        let end_idx = self
394            .entries
395            .binary_search_by(|(k, _)| k.cmp(end))
396            .unwrap_or_else(|i| i);
397        self.entries[start_idx..end_idx].iter()
398    }
399
400    /// Iterate all entries
401    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
402        self.entries.iter()
403    }
404
405    /// Direct access to underlying entries for O(1) indexing
406    ///
407    /// Required for efficient k-way merge. Without this accessor,
408    /// callers are forced to use `iter().nth()` which is O(n) per call.
409    #[inline]
410    pub fn entries(&self) -> &[(K, V)] {
411        &self.entries
412    }
413
414    /// Check if this run might contain keys with the given prefix.
415    ///
416    /// Uses stored min_key and max_key bounds for O(1) overlap check.
417    /// Returns `true` if the run may contain matching keys (conservative).
418    /// Returns `false` only if we can prove no keys match.
419    ///
420    /// # Prefix Overlap Logic
421    ///
422    /// For a run with [min_key, max_key] to overlap prefix range [prefix, prefix++):
423    /// - If max_key < prefix → run is entirely before prefix → no overlap
424    /// - If min_key starts with prefix OR min_key < prefix and max_key >= prefix → overlap
425    ///
426    /// We use a conservative check: return true unless max_key < prefix.
427    #[inline]
428    pub fn overlaps_prefix(&self, prefix: &K) -> bool {
429        match &self.max_key {
430            Some(max) if max < prefix => false, // Run entirely before prefix
431            _ => true,                          // Could overlap (conservative)
432        }
433    }
434
435    /// Check if this run might contain keys in the given range.
436    ///
437    /// Uses stored min_key and max_key bounds for O(1) overlap check.
438    /// Returns `true` if the run may contain matching keys (conservative).
439    #[inline]
440    pub fn overlaps_range(&self, start: &K, end: &K) -> bool {
441        // Check if run is entirely outside the range
442        match (&self.min_key, &self.max_key) {
443            (Some(min), _) if min >= end => false, // Run entirely after range
444            (_, Some(max)) if max < start => false, // Run entirely before range
445            _ => true,                             // Could overlap
446        }
447    }
448
449    /// Get the minimum key in this run, if any
450    #[inline]
451    pub fn min_key(&self) -> Option<&K> {
452        self.min_key.as_ref()
453    }
454
455    /// Get the maximum key in this run, if any
456    #[inline]
457    pub fn max_key(&self) -> Option<&K> {
458        self.max_key.as_ref()
459    }
460}
461
462// ============================================================================
463// BalancedTableIndex - LSM-Style Index for Balanced Policy
464// ============================================================================
465
466/// LSM-style index for the Balanced policy
467///
468/// Combines:
469/// - Unsorted DashMap for O(1) writes
470/// - Periodic compaction to sorted runs
471/// - K-way merge for range scans
472pub struct BalancedTableIndex<V: Clone + Send + Sync + Eq + 'static> {
473    /// Unsorted memtable for fast writes
474    memtable: DashMap<ArenaKeyHandle, V>,
475    /// Sorted runs for efficient scans
476    sorted_runs: RwLock<Vec<Arc<SortedRun<ArenaKeyHandle, V>>>>,
477    /// Configuration
478    config: TableIndexConfig,
479    /// Size of memtable in bytes
480    memtable_size: std::sync::atomic::AtomicUsize,
481}
482
483impl<V: Clone + Send + Sync + Eq + 'static> BalancedTableIndex<V> {
484    /// Create a new balanced table index
485    pub fn new(config: TableIndexConfig) -> Self {
486        Self {
487            memtable: DashMap::new(),
488            sorted_runs: RwLock::new(Vec::new()),
489            config,
490            memtable_size: std::sync::atomic::AtomicUsize::new(0),
491        }
492    }
493
494    /// Insert a key-value pair (O(1))
495    pub fn insert(&self, key: ArenaKeyHandle, value: V) {
496        let key_size = key.len();
497        let value_size = std::mem::size_of::<V>();
498
499        self.memtable.insert(key, value);
500        self.memtable_size
501            .fetch_add(key_size + value_size, std::sync::atomic::Ordering::Relaxed);
502    }
503
504    /// Get a value by key
505    ///
506    /// Uses run metadata for O(1) pruning: skips runs where
507    /// `max_key < key` (run entirely before search key).
508    pub fn get(&self, key: &ArenaKeyHandle) -> Option<V> {
509        // Check memtable first
510        if let Some(v) = self.memtable.get(key) {
511            return Some(v.clone());
512        }
513
514        // Check sorted runs (newest first)
515        // Use metadata pruning to skip runs that can't contain the key
516        let runs = self.sorted_runs.read();
517        for run in runs.iter().rev() {
518            // Prune: skip runs that are entirely before the key
519            if run.overlaps_prefix(key) {
520                if let Some(v) = run.get(key) {
521                    return Some(v.clone());
522                }
523            }
524        }
525
526        None
527    }
528
529    /// Scan entries with a given key prefix
530    ///
531    /// Uses run metadata for O(1) pruning: only scans runs whose
532    /// key range overlaps with the prefix. Returns merged results
533    /// in key order with duplicates resolved (newest wins).
534    ///
535    /// # Pruning Benefit
536    ///
537    /// For selective prefixes (e.g., "user:123:" in a table with 1M keys),
538    /// most runs will have `max_key < prefix` or `min_key > prefix_end`,
539    /// allowing them to be skipped entirely.
540    pub fn scan_prefix(&self, prefix: &ArenaKeyHandle) -> Vec<(ArenaKeyHandle, V)> {
541        use std::cmp::Reverse;
542        use std::collections::BinaryHeap;
543
544        #[derive(Eq, PartialEq)]
545        struct PrefixHeapEntry<V: Clone> {
546            key: ArenaKeyHandle,
547            value: V,
548            source_idx: usize, // 0 = memtable, 1+ = sorted runs
549        }
550
551        impl<V: Clone + Eq + PartialEq> Ord for PrefixHeapEntry<V> {
552            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
553                match self.key.cmp(&other.key) {
554                    std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
555                    other => other,
556                }
557            }
558        }
559
560        impl<V: Clone + Eq + PartialEq> PartialOrd for PrefixHeapEntry<V> {
561            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
562                Some(self.cmp(other))
563            }
564        }
565
566        let mut heap: BinaryHeap<Reverse<PrefixHeapEntry<V>>> = BinaryHeap::new();
567
568        // Collect matching entries from memtable
569        for entry in self.memtable.iter() {
570            let key = entry.key();
571            if key.as_bytes().starts_with(prefix.as_bytes()) {
572                heap.push(Reverse(PrefixHeapEntry {
573                    key: key.clone(),
574                    value: entry.value().clone(),
575                    source_idx: 0,
576                }));
577            }
578        }
579
580        // Collect from sorted runs, using metadata pruning
581        let runs = self.sorted_runs.read();
582        for (run_idx, run) in runs.iter().enumerate() {
583            // Prune: skip runs that can't contain any matching keys
584            if !run.overlaps_prefix(prefix) {
585                continue; // Run is entirely before prefix
586            }
587
588            // Scan matching entries in this run
589            for (key, value) in run.range_from(prefix) {
590                if !key.as_bytes().starts_with(prefix.as_bytes()) {
591                    break; // Past prefix range
592                }
593                heap.push(Reverse(PrefixHeapEntry {
594                    key: key.clone(),
595                    value: value.clone(),
596                    source_idx: run_idx + 1, // 1-indexed (0 = memtable)
597                }));
598            }
599        }
600
601        // Merge and deduplicate
602        let mut result = Vec::with_capacity(heap.len());
603        let mut last_key: Option<ArenaKeyHandle> = None;
604
605        while let Some(Reverse(entry)) = heap.pop() {
606            let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
607            if is_new_key {
608                last_key = Some(entry.key.clone());
609                result.push((entry.key, entry.value));
610            }
611        }
612
613        result
614    }
615
616    /// Check if compaction is needed
617    pub fn needs_compaction(&self) -> bool {
618        let memtable_size = self
619            .memtable_size
620            .load(std::sync::atomic::Ordering::Relaxed);
621        let runs = self.sorted_runs.read();
622
623        memtable_size >= self.config.target_run_size || runs.len() >= self.config.max_sorted_runs
624    }
625
626    /// Compact memtable to a new sorted run
627    pub fn compact_memtable(&self) {
628        // Drain memtable
629        let entries: Vec<_> = self
630            .memtable
631            .iter()
632            .map(|e| (e.key().clone(), e.value().clone()))
633            .collect();
634
635        if entries.is_empty() {
636            return;
637        }
638
639        // Clear memtable
640        self.memtable.clear();
641        self.memtable_size
642            .store(0, std::sync::atomic::Ordering::Relaxed);
643
644        // Create new sorted run
645        let run = Arc::new(SortedRun::from_unsorted(entries, 0));
646
647        let mut runs = self.sorted_runs.write();
648        runs.push(run);
649    }
650
651    /// Merge multiple sorted runs (compaction)
652    pub fn merge_runs(&self, levels_to_merge: usize) {
653        let mut runs = self.sorted_runs.write();
654
655        if runs.len() < levels_to_merge {
656            return;
657        }
658
659        // Take the oldest runs to merge
660        let to_merge: Vec<_> = runs.drain(..levels_to_merge).collect();
661
662        // K-way merge
663        let merged = self.k_way_merge(&to_merge);
664
665        // Create new run at next level
666        let new_run = Arc::new(SortedRun::from_sorted(merged, to_merge.len()));
667        runs.insert(0, new_run);
668    }
669
670    /// K-way merge of sorted runs
671    ///
672    /// Complexity: O(N log K) where N = total entries, K = number of runs
673    ///
674    /// Key insight: Use direct indexing into the underlying Vec instead of
675    /// creating new iterators. `iter().nth(n)` is O(n), making the old
676    /// implementation O(N²/K).
677    fn k_way_merge(&self, runs: &[Arc<SortedRun<ArenaKeyHandle, V>>]) -> Vec<(ArenaKeyHandle, V)> {
678        use std::cmp::Reverse;
679        use std::collections::BinaryHeap;
680
681        #[derive(Eq, PartialEq)]
682        struct HeapEntry<V: Clone> {
683            key: ArenaKeyHandle,
684            value: V,
685            run_idx: usize,
686            entry_idx: usize,
687        }
688
689        impl<V: Clone + Eq + PartialEq> Ord for HeapEntry<V> {
690            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
691                // Primary: key order (min-heap via Reverse wrapper)
692                // Secondary: run_idx for stability (lower = older = superseded)
693                match self.key.cmp(&other.key) {
694                    std::cmp::Ordering::Equal => self.run_idx.cmp(&other.run_idx),
695                    other => other,
696                }
697            }
698        }
699
700        impl<V: Clone + Eq + PartialEq> PartialOrd for HeapEntry<V> {
701            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
702                Some(self.cmp(other))
703            }
704        }
705
706        let mut heap: BinaryHeap<Reverse<HeapEntry<V>>> = BinaryHeap::new();
707
708        // Track current position in each run (index into the underlying Vec)
709        let mut run_positions: Vec<usize> = vec![0; runs.len()];
710
711        // Initialize heap with first entry from each run using direct indexing
712        for (run_idx, run) in runs.iter().enumerate() {
713            let entries = run.entries();
714            if !entries.is_empty() {
715                let (key, value) = &entries[0]; // O(1) direct access
716                heap.push(Reverse(HeapEntry {
717                    key: key.clone(),
718                    value: value.clone(),
719                    run_idx,
720                    entry_idx: 0,
721                }));
722            }
723        }
724
725        // Pre-allocate result based on estimated total entries
726        let estimated_size: usize = runs.iter().map(|r| r.len()).sum();
727        let mut result = Vec::with_capacity(estimated_size);
728        let mut last_key: Option<ArenaKeyHandle> = None;
729
730        while let Some(Reverse(entry)) = heap.pop() {
731            // Duplicate suppression: keep only the first occurrence (newest due to run ordering)
732            let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
733            if is_new_key {
734                last_key = Some(entry.key.clone());
735                result.push((entry.key.clone(), entry.value));
736            }
737
738            // Advance position in this run
739            run_positions[entry.run_idx] += 1;
740            let next_idx = run_positions[entry.run_idx];
741
742            // FIX: Direct indexing is O(1), not O(n) like iter().nth()
743            let run_entries = runs[entry.run_idx].entries();
744            if next_idx < run_entries.len() {
745                let (key, value) = &run_entries[next_idx]; // O(1) access
746                heap.push(Reverse(HeapEntry {
747                    key: key.clone(),
748                    value: value.clone(),
749                    run_idx: entry.run_idx,
750                    entry_idx: next_idx,
751                }));
752            }
753        }
754
755        result
756    }
757
758    /// Get table config
759    pub fn config(&self) -> &TableIndexConfig {
760        &self.config
761    }
762
763    /// Get memtable size
764    pub fn memtable_size(&self) -> usize {
765        self.memtable_size
766            .load(std::sync::atomic::Ordering::Relaxed)
767    }
768
769    /// Get number of sorted runs
770    pub fn run_count(&self) -> usize {
771        self.sorted_runs.read().len()
772    }
773}
774
775// ============================================================================
776// Tests
777// ============================================================================
778
779#[cfg(test)]
780mod tests {
781    use super::*;
782
783    #[test]
784    fn test_index_policy_from_str() {
785        assert_eq!(
786            IndexPolicy::from_str("write_optimized"),
787            Some(IndexPolicy::WriteOptimized)
788        );
789        assert_eq!(
790            IndexPolicy::from_str("balanced"),
791            Some(IndexPolicy::Balanced)
792        );
793        assert_eq!(
794            IndexPolicy::from_str("scan-optimized"),
795            Some(IndexPolicy::ScanOptimized)
796        );
797        assert_eq!(
798            IndexPolicy::from_str("append_only"),
799            Some(IndexPolicy::AppendOnly)
800        );
801        assert_eq!(IndexPolicy::from_str("invalid"), None);
802    }
803
804    #[test]
805    fn test_registry_default_policy() {
806        let registry = TableIndexRegistry::new();
807
808        // Unconfigured table gets default policy
809        assert_eq!(registry.get_policy("unknown"), IndexPolicy::Balanced);
810
811        // Configure a table
812        registry.configure_table(TableIndexConfig::new("users", IndexPolicy::WriteOptimized));
813        assert_eq!(registry.get_policy("users"), IndexPolicy::WriteOptimized);
814
815        // Other tables still get default
816        assert_eq!(registry.get_policy("orders"), IndexPolicy::Balanced);
817    }
818
819    #[test]
820    fn test_registry_change_default() {
821        let registry = TableIndexRegistry::new();
822
823        registry.set_default_policy(IndexPolicy::ScanOptimized);
824        assert_eq!(registry.get_policy("any_table"), IndexPolicy::ScanOptimized);
825    }
826
827    #[test]
828    fn test_sorted_run() {
829        let entries = vec![
830            (ArenaKeyHandle::new(b"c"), 3),
831            (ArenaKeyHandle::new(b"a"), 1),
832            (ArenaKeyHandle::new(b"b"), 2),
833        ];
834
835        let run = SortedRun::from_unsorted(entries, 0);
836
837        assert_eq!(run.len(), 3);
838        assert_eq!(run.get(&ArenaKeyHandle::new(b"a")), Some(&1));
839        assert_eq!(run.get(&ArenaKeyHandle::new(b"b")), Some(&2));
840        assert_eq!(run.get(&ArenaKeyHandle::new(b"c")), Some(&3));
841        assert_eq!(run.get(&ArenaKeyHandle::new(b"d")), None);
842    }
843
844    #[test]
845    fn test_balanced_table_index() {
846        let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
847        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
848
849        index.insert(ArenaKeyHandle::new(b"key1"), 1);
850        index.insert(ArenaKeyHandle::new(b"key2"), 2);
851
852        assert_eq!(index.get(&ArenaKeyHandle::new(b"key1")), Some(1));
853        assert_eq!(index.get(&ArenaKeyHandle::new(b"key2")), Some(2));
854        assert_eq!(index.get(&ArenaKeyHandle::new(b"key3")), None);
855    }
856
857    #[test]
858    fn test_balanced_compaction() {
859        let config = TableIndexConfig::new("test", IndexPolicy::Balanced).with_target_run_size(100); // Small size to trigger compaction
860
861        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
862
863        for i in 0..10 {
864            let key = format!("key{:03}", i);
865            index.insert(ArenaKeyHandle::new(key.as_bytes()), i as i32);
866        }
867
868        // Compact memtable
869        index.compact_memtable();
870
871        assert_eq!(index.run_count(), 1);
872        assert_eq!(index.memtable_size(), 0);
873
874        // Values should still be accessible
875        assert_eq!(index.get(&ArenaKeyHandle::new(b"key005")), Some(5));
876    }
877
878    #[test]
879    fn test_k_way_merge_scaling() {
880        // Verify O(N log K) complexity by checking that merge time scales linearly
881        // with N (not quadratically as the old iter().nth() implementation would)
882        use std::time::Instant;
883
884        let sizes = [100, 500, 1000];
885        let mut times_ns: Vec<u128> = Vec::new();
886
887        for size in sizes {
888            // Create 5 runs with `size` entries each
889            let runs: Vec<Arc<SortedRun<ArenaKeyHandle, i32>>> = (0..5)
890                .map(|run_id| {
891                    let entries: Vec<(ArenaKeyHandle, i32)> = (0..size)
892                        .map(|i| {
893                            let key = format!("key_{:08}_{}", i * 5 + run_id, run_id);
894                            (ArenaKeyHandle::new(key.as_bytes()), (i * 5 + run_id) as i32)
895                        })
896                        .collect();
897                    Arc::new(SortedRun::from_sorted(entries, run_id))
898                })
899                .collect();
900
901            let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
902            let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
903
904            let start = Instant::now();
905            let merged = index.k_way_merge(&runs);
906            let elapsed = start.elapsed();
907
908            times_ns.push(elapsed.as_nanos());
909
910            // Verify merge produced correct output
911            let total_entries = size * 5;
912            assert_eq!(
913                merged.len(),
914                total_entries,
915                "Merge should produce all unique entries"
916            );
917        }
918
919        // For O(N log K) scaling, time should roughly double when N doubles
920        // (since log K is constant). For O(N²), time would quadruple.
921        // We check that the ratio is closer to linear than quadratic.
922        if times_ns.len() >= 2 && times_ns[0] > 0 {
923            let ratio_1_to_2 = times_ns[1] as f64 / times_ns[0] as f64;
924            let ratio_2_to_3 = times_ns[2] as f64 / times_ns[1] as f64;
925
926            // For linear scaling with 5x size increase, expect ~5x time increase
927            // For quadratic, expect ~25x. We assert it's closer to linear.
928            assert!(
929                ratio_1_to_2 < 15.0,
930                "Merge scaling should be sub-quadratic: ratio={:.1}x for 5x size",
931                ratio_1_to_2
932            );
933            assert!(
934                ratio_2_to_3 < 10.0,
935                "Merge scaling should be sub-quadratic: ratio={:.1}x for 2x size",
936                ratio_2_to_3
937            );
938        }
939    }
940
941    #[test]
942    fn test_sorted_run_metadata_pruning() {
943        // Test that min_key and max_key are correctly computed
944        let entries = vec![
945            (ArenaKeyHandle::new(b"apple"), 1),
946            (ArenaKeyHandle::new(b"banana"), 2),
947            (ArenaKeyHandle::new(b"cherry"), 3),
948        ];
949        let run = SortedRun::from_sorted(entries, 0);
950
951        // Verify min/max are set correctly
952        assert_eq!(
953            run.min_key().map(|k| k.as_bytes()),
954            Some(b"apple".as_slice())
955        );
956        assert_eq!(
957            run.max_key().map(|k| k.as_bytes()),
958            Some(b"cherry".as_slice())
959        );
960
961        // Test overlaps_prefix pruning
962        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"banana"))); // In range
963        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"apple"))); // At start
964        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"cherry"))); // At end
965
966        // Prefix BEFORE range should not overlap (max_key < prefix)
967        assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"date"))); // After range
968        assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"zebra"))); // Way after
969
970        // Test overlaps_range pruning
971        assert!(run.overlaps_range(
972            &ArenaKeyHandle::new(b"banana"),
973            &ArenaKeyHandle::new(b"cherry")
974        ));
975        assert!(!run.overlaps_range(&ArenaKeyHandle::new(b"date"), &ArenaKeyHandle::new(b"fig"))); // Entirely after
976        assert!(!run.overlaps_range(&ArenaKeyHandle::new(b"aaa"), &ArenaKeyHandle::new(b"aab"))); // Entirely before
977    }
978
979    #[test]
980    fn test_scan_prefix() {
981        let config = TableIndexConfig::new("test", IndexPolicy::Balanced).with_target_run_size(50); // Small to trigger compaction
982        let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
983
984        // Insert entries with different prefixes
985        let prefixes = ["user:1:", "user:2:", "order:1:", "order:2:"];
986        for (i, prefix) in prefixes.iter().enumerate() {
987            for j in 0..5 {
988                let key = format!("{}{}", prefix, j);
989                index.insert(ArenaKeyHandle::new(key.as_bytes()), (i * 10 + j) as i32);
990            }
991        }
992
993        // Compact to create sorted runs
994        index.compact_memtable();
995
996        // Add more entries to memtable
997        index.insert(ArenaKeyHandle::new(b"user:1:99"), 199);
998        index.insert(ArenaKeyHandle::new(b"order:1:99"), 299);
999
1000        // Scan for user:1: prefix
1001        let results = index.scan_prefix(&ArenaKeyHandle::new(b"user:1:"));
1002        assert_eq!(results.len(), 6); // 5 from run + 1 from memtable
1003
1004        // Verify all results have the correct prefix
1005        for (key, _value) in &results {
1006            assert!(
1007                key.as_bytes().starts_with(b"user:1:"),
1008                "Key {:?} should start with user:1:",
1009                String::from_utf8_lossy(key.as_bytes())
1010            );
1011        }
1012
1013        // Verify results are sorted
1014        for window in results.windows(2) {
1015            assert!(
1016                window[0].0 <= window[1].0,
1017                "Results should be sorted by key"
1018            );
1019        }
1020
1021        // Scan for order: prefix
1022        let results = index.scan_prefix(&ArenaKeyHandle::new(b"order:"));
1023        assert_eq!(results.len(), 11); // 10 from run + 1 from memtable
1024    }
1025
1026    #[test]
1027    fn test_empty_sorted_run_metadata() {
1028        // Empty run should have None for min/max
1029        let entries: Vec<(ArenaKeyHandle, i32)> = vec![];
1030        let run = SortedRun::from_sorted(entries, 0);
1031
1032        assert!(run.min_key().is_none());
1033        assert!(run.max_key().is_none());
1034        assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"anything"))); // Conservative: true
1035        assert!(run.overlaps_range(&ArenaKeyHandle::new(b"a"), &ArenaKeyHandle::new(b"z"))); // Conservative: true
1036    }
1037}