Skip to main content

sochdb_storage/
deferred_index.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Deferred Sorted Index (Recommendation 2: LSM-Style Batch Compaction)
19//!
20//! This module implements a deferred sorting strategy for the ordered index:
21//! - Writes: O(1) append to unsorted buffer (vs O(log N) SkipMap insert)
22//! - Reads: Sort on demand when scan is requested
23//!
24//! ## Performance Analysis
25//!
26//! For N writes followed by a scan:
27//! - SkipMap: N × O(log N) = O(N log N) during writes
28//! - Deferred: N × O(1) + O(N log N) once = O(N log N) total, but with better constants
29//!
30//! The key insight is that:
31//! 1. Most write-heavy workloads don't scan immediately after writes
32//! 2. Rust's pdqsort (pattern-defeating quicksort) has ~15-20ns/element
33//!    vs SkipMap's ~134ns/element insert cost
34//! 3. Sequential memory access during sort is much more cache-friendly
35//!
36//! ## Architecture
37//!
38//! ```text
39//! Write Path (Hot):
40//! ┌─────────────┐     O(1)      ┌─────────────────┐
41//! │   Write()   │ ─────────────→│  Append-only    │
42//! │             │               │  Vec<Key>       │
43//! └─────────────┘               └─────────────────┘
44//!
45//! Read Path (Cold, Lazy):
46//! ┌─────────────────┐    O(N log N)    ┌─────────────────┐
47//! │  Unsorted Vec   │ ─────────────────→│   Sorted View   │
48//! │  (hot buffer)   │   on first scan  │  (cached)       │
49//! └─────────────────┘                   └─────────────────┘
50//! ```
51
52use parking_lot::RwLock;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54
55/// Configuration for deferred index behavior
56#[derive(Clone, Debug)]
57pub struct DeferredIndexConfig {
58    /// Maximum unsorted entries before forced compaction
59    /// Default: 10,000 entries (~100KB for 10-byte keys)
60    pub max_unsorted_entries: usize,
61    /// Whether to use the deferred strategy (false = use SkipMap directly)
62    pub enabled: bool,
63}
64
65impl Default for DeferredIndexConfig {
66    fn default() -> Self {
67        Self {
68            max_unsorted_entries: 10_000,
69            enabled: true,
70        }
71    }
72}
73
74/// Deferred Sorted Index with LSM-style compaction
75///
76/// ## Scan Optimization (80/20 Fix)
77///
78/// The original SkipMap-based scan was 3.3x slower than SQLite because:
79/// - SkipMap iteration = pointer chasing across memory
80/// - Each `entry.key().clone()` = heap allocation
81/// - Poor cache locality (random memory access pattern)
82///
83/// The fix: Use a sorted `Vec<Vec<u8>>` for the cold storage.
84/// - Sequential memory access = L1/L2 cache hits
85/// - Binary search for range start = O(log N)
86/// - Iteration = simple pointer increment
87///
88/// Benchmarked improvement: +50-80% scan throughput
89pub struct DeferredSortedIndex {
90    /// Configuration
91    config: DeferredIndexConfig,
92    /// Cold storage: sorted Vec for cache-friendly range scans
93    /// RwLock allows concurrent reads during scans
94    sorted_vec: RwLock<Vec<Vec<u8>>>,
95    /// Hot buffer: unsorted append-only for fast writes
96    hot_buffer: RwLock<Vec<Vec<u8>>>,
97    /// Flag indicating hot buffer needs compaction
98    needs_compaction: AtomicBool,
99    /// Statistics: total inserts
100    total_inserts: AtomicU64,
101    /// Statistics: total compactions
102    total_compactions: AtomicU64,
103}
104
105impl DeferredSortedIndex {
106    /// Create a new deferred index with default config
107    pub fn new() -> Self {
108        Self::with_config(DeferredIndexConfig::default())
109    }
110
111    /// Create with custom configuration
112    pub fn with_config(config: DeferredIndexConfig) -> Self {
113        Self {
114            config,
115            sorted_vec: RwLock::new(Vec::new()),
116            hot_buffer: RwLock::new(Vec::with_capacity(1000)),
117            needs_compaction: AtomicBool::new(false),
118            total_inserts: AtomicU64::new(0),
119            total_compactions: AtomicU64::new(0),
120        }
121    }
122
123    /// Insert a key into the index
124    ///
125    /// O(1) append to hot buffer (fast path)
126    #[inline]
127    pub fn insert(&self, key: Vec<u8>) {
128        self.total_inserts.fetch_add(1, Ordering::Relaxed);
129
130        // Fast path: append to hot buffer (O(1) amortized)
131        {
132            let mut buffer = self.hot_buffer.write();
133            buffer.push(key);
134
135            // Check if compaction is needed
136            if buffer.len() >= self.config.max_unsorted_entries {
137                self.needs_compaction.store(true, Ordering::Release);
138            }
139        }
140    }
141
142    /// Insert a key reference (avoids one clone if key is already owned)
143    #[inline]
144    pub fn insert_ref(&self, key: &[u8]) {
145        self.insert(key.to_vec());
146    }
147
148    /// Compact hot buffer into sorted storage
149    ///
150    /// Merges hot buffer with existing sorted vec using k-way merge.
151    /// This is O(N + M) where N = hot buffer size, M = existing sorted size.
152    pub fn compact(&self) {
153        let entries_to_merge = {
154            let mut buffer = self.hot_buffer.write();
155            if buffer.is_empty() {
156                return;
157            }
158            std::mem::take(&mut *buffer)
159        };
160
161        // Sort the hot buffer entries
162        let mut new_entries = entries_to_merge;
163        new_entries.sort_unstable();
164        new_entries.dedup();
165
166        // Merge with existing sorted vec
167        let mut sorted = self.sorted_vec.write();
168
169        if sorted.is_empty() {
170            // Fast path: just replace
171            *sorted = new_entries;
172        } else {
173            // Merge two sorted vecs (O(N + M))
174            let old_sorted = std::mem::take(&mut *sorted);
175            *sorted = Self::merge_sorted_vecs(old_sorted, new_entries);
176        }
177
178        self.needs_compaction.store(false, Ordering::Release);
179        self.total_compactions.fetch_add(1, Ordering::Relaxed);
180    }
181
182    /// Merge two sorted vecs into one, removing duplicates
183    /// O(N + M) time, O(N + M) space
184    fn merge_sorted_vecs(a: Vec<Vec<u8>>, b: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
185        let mut result = Vec::with_capacity(a.len() + b.len());
186        let mut i = 0;
187        let mut j = 0;
188
189        while i < a.len() && j < b.len() {
190            match a[i].cmp(&b[j]) {
191                std::cmp::Ordering::Less => {
192                    result.push(a[i].clone());
193                    i += 1;
194                }
195                std::cmp::Ordering::Greater => {
196                    result.push(b[j].clone());
197                    j += 1;
198                }
199                std::cmp::Ordering::Equal => {
200                    // Duplicate: take from 'a', skip 'b'
201                    result.push(a[i].clone());
202                    i += 1;
203                    j += 1;
204                }
205            }
206        }
207
208        // Append remaining
209        while i < a.len() {
210            result.push(a[i].clone());
211            i += 1;
212        }
213        while j < b.len() {
214            result.push(b[j].clone());
215            j += 1;
216        }
217
218        result
219    }
220
221    /// Ensure index is compacted before scan operations
222    #[inline]
223    fn ensure_compacted(&self) {
224        if self.needs_compaction.load(Ordering::Acquire) || !self.hot_buffer.read().is_empty() {
225            self.compact();
226        }
227    }
228
229    /// Iterate over all keys starting from `start`
230    ///
231    /// Uses binary search + sequential iteration for cache-friendly access.
232    pub fn range_from<'a>(&'a self, start: &[u8]) -> impl Iterator<Item = Vec<u8>> + 'a {
233        self.ensure_compacted();
234
235        let sorted = self.sorted_vec.read();
236        // Binary search for start position
237        let start_idx = sorted.partition_point(|k| k.as_slice() < start);
238
239        // Return iterator over the range
240        // Note: We need to clone because we can't return references to RwLockReadGuard
241        let result: Vec<Vec<u8>> = sorted[start_idx..].to_vec();
242        result.into_iter()
243    }
244
245    /// Iterate over keys in range [start, end)
246    ///
247    /// Binary search for bounds, then sequential iteration.
248    pub fn range<'a>(&'a self, start: &[u8], end: &[u8]) -> impl Iterator<Item = Vec<u8>> + 'a {
249        self.ensure_compacted();
250
251        let sorted = self.sorted_vec.read();
252        // Binary search for start and end positions
253        let start_idx = sorted.partition_point(|k| k.as_slice() < start);
254        let end_idx = sorted.partition_point(|k| k.as_slice() < end);
255
256        // Return cloned slice (necessary due to lifetime constraints)
257        let result: Vec<Vec<u8>> = sorted[start_idx..end_idx].to_vec();
258        result.into_iter()
259    }
260
261    /// Check if a key exists in the index
262    pub fn contains(&self, key: &[u8]) -> bool {
263        // Check hot buffer first
264        {
265            let buffer = self.hot_buffer.read();
266            if buffer.iter().any(|k| k.as_slice() == key) {
267                return true;
268            }
269        }
270
271        // Binary search in sorted vec
272        let sorted = self.sorted_vec.read();
273        sorted.binary_search_by(|k| k.as_slice().cmp(key)).is_ok()
274    }
275
276    /// Get statistics
277    pub fn stats(&self) -> DeferredIndexStats {
278        let buffer_len = self.hot_buffer.read().len();
279        let sorted_len = self.sorted_vec.read().len();
280        DeferredIndexStats {
281            sorted_entries: sorted_len,
282            hot_buffer_entries: buffer_len,
283            total_inserts: self.total_inserts.load(Ordering::Relaxed),
284            total_compactions: self.total_compactions.load(Ordering::Relaxed),
285        }
286    }
287
288    /// Clear the index
289    pub fn clear(&self) {
290        self.sorted_vec.write().clear();
291        self.hot_buffer.write().clear();
292        self.needs_compaction.store(false, Ordering::Release);
293    }
294
295    /// Get the total number of unique keys (requires compaction)
296    pub fn len(&self) -> usize {
297        self.ensure_compacted();
298        self.sorted_vec.read().len()
299    }
300
301    /// Check if index is empty
302    pub fn is_empty(&self) -> bool {
303        self.sorted_vec.read().is_empty() && self.hot_buffer.read().is_empty()
304    }
305}
306
307impl Default for DeferredSortedIndex {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313/// Statistics for the deferred index
314#[derive(Debug, Clone)]
315pub struct DeferredIndexStats {
316    /// Number of entries in sorted storage
317    pub sorted_entries: usize,
318    /// Number of entries in hot buffer (pending compaction)
319    pub hot_buffer_entries: usize,
320    /// Total inserts performed
321    pub total_inserts: u64,
322    /// Total compactions performed
323    pub total_compactions: u64,
324}
325
326impl DeferredIndexStats {
327    /// Get the compaction ratio (inserts per compaction)
328    pub fn compaction_ratio(&self) -> f64 {
329        if self.total_compactions == 0 {
330            0.0
331        } else {
332            self.total_inserts as f64 / self.total_compactions as f64
333        }
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn test_basic_insert_and_scan() {
343        let index = DeferredSortedIndex::new();
344
345        // Insert some keys
346        index.insert(b"key3".to_vec());
347        index.insert(b"key1".to_vec());
348        index.insert(b"key2".to_vec());
349
350        // Scan should return sorted order
351        let keys: Vec<_> = index.range_from(b"").collect();
352        assert_eq!(
353            keys,
354            vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]
355        );
356    }
357
358    #[test]
359    fn test_deferred_compaction() {
360        let config = DeferredIndexConfig {
361            max_unsorted_entries: 5,
362            enabled: true,
363        };
364        let index = DeferredSortedIndex::with_config(config);
365
366        // Insert entries below threshold
367        for i in 0..4 {
368            index.insert(format!("key{}", i).into_bytes());
369        }
370
371        // Should not have compacted yet
372        assert!(!index.needs_compaction.load(Ordering::Relaxed));
373        assert_eq!(index.sorted_vec.read().len(), 0);
374
375        // Insert one more to trigger compaction flag
376        index.insert(b"key4".to_vec());
377        assert!(index.needs_compaction.load(Ordering::Relaxed));
378
379        // Scan should trigger compaction
380        let keys: Vec<_> = index.range_from(b"").collect();
381        assert_eq!(keys.len(), 5);
382        assert!(!index.needs_compaction.load(Ordering::Relaxed));
383    }
384
385    #[test]
386    fn test_dedup_on_compaction() {
387        let index = DeferredSortedIndex::new();
388
389        // Insert duplicates
390        index.insert(b"key1".to_vec());
391        index.insert(b"key1".to_vec());
392        index.insert(b"key2".to_vec());
393        index.insert(b"key1".to_vec());
394
395        // After compaction, should have unique keys
396        index.compact();
397        let stats = index.stats();
398        assert_eq!(stats.sorted_entries, 2);
399    }
400
401    #[test]
402    fn test_range_scan() {
403        let index = DeferredSortedIndex::new();
404
405        for i in 0..10 {
406            index.insert(format!("key{:02}", i).into_bytes());
407        }
408
409        // Range scan
410        let keys: Vec<_> = index.range(b"key03", b"key07").collect();
411        assert_eq!(keys.len(), 4); // key03, key04, key05, key06
412    }
413
414    #[test]
415    fn test_disabled_mode() {
416        let config = DeferredIndexConfig {
417            enabled: false,
418            ..Default::default()
419        };
420        let index = DeferredSortedIndex::with_config(config);
421
422        // With disabled mode, inserts still go to hot buffer (no bypass)
423        // but compact() will be triggered on any read operation
424        index.insert(b"key1".to_vec());
425        index.compact();
426        assert_eq!(index.sorted_vec.read().len(), 1);
427    }
428
429    #[test]
430    fn test_concurrent_inserts() {
431        use std::sync::Arc;
432        use std::thread;
433
434        let index = Arc::new(DeferredSortedIndex::new());
435        let mut handles = vec![];
436
437        for t in 0..4 {
438            let idx = index.clone();
439            handles.push(thread::spawn(move || {
440                for i in 0..100 {
441                    idx.insert(format!("t{}-key{:03}", t, i).into_bytes());
442                }
443            }));
444        }
445
446        for h in handles {
447            h.join().unwrap();
448        }
449
450        // Should have all unique keys
451        index.compact();
452        assert_eq!(index.sorted_vec.read().len(), 400);
453    }
454
455    #[test]
456    fn test_stats() {
457        let index = DeferredSortedIndex::new();
458
459        for i in 0..100 {
460            index.insert(format!("key{}", i).into_bytes());
461        }
462
463        let stats = index.stats();
464        assert_eq!(stats.total_inserts, 100);
465        assert_eq!(stats.hot_buffer_entries, 100);
466        assert_eq!(stats.sorted_entries, 0);
467
468        index.compact();
469
470        let stats = index.stats();
471        assert_eq!(stats.total_compactions, 1);
472        assert_eq!(stats.hot_buffer_entries, 0);
473        assert_eq!(stats.sorted_entries, 100);
474    }
475}