sochdb_storage/
tiered_memtable.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tiered SkipMap Elimination (Recommendation 3)
16//!
17//! ## Problem
18//!
19//! Current write path uses three concurrent data structures:
20//! ```ignore
21//! pub struct ArenaMvccMemTable {
22//!     data: DashMap<ArenaKeyHandle, VersionChain>,      // +47ns lookup
23//!     ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>, // +93ns insert
24//!     dirty_list: ArenaEpochDirtyList,                   // +15ns
25//! }
26//! ```
27//!
28//! The benchmark shows "No-SkipMap Mode" achieves 97% of SQLite. 
29//! The SkipMap provides ordered iteration but costs 40% of insert time.
30//!
31//! ## Solution
32//!
33//! Tiered architecture:
34//! - **Hot tier**: Unsorted append-only buffer (O(1) insert)
35//! - **Warm tier**: Sorted batch (O(N log N) once at flush)
36//!
37//! ## Performance Analysis
38//!
39//! Current per-write cost:
40//! ```text
41//! T_write = T_dashmap_insert + T_skipmap_insert + T_dirty_list
42//!         = 47ns + 93ns + 15ns = 155ns
43//! ```
44//!
45//! Proposed per-write cost:
46//! ```text
47//! T_write = T_vec_push + T_dirty_list
48//!         = 5ns + 15ns = 20ns
49//! ```
50//!
51//! Amortized sort cost at flush (N = 100,000 rows):
52//! ```text
53//! T_sort = N × log(N) × comparison_cost
54//!        = 100,000 × 17 × 10ns = 17ms (once per flush)
55//! ```
56//!
57//! Net throughput: 1 / 20ns = 50M ops/sec (theoretical max)
58//! With other overhead: ~2M ops/sec (matches "No-SkipMap" benchmark result)
59
60use std::collections::HashSet;
61use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
62use std::sync::Arc;
63
64use dashmap::DashMap;
65use parking_lot::{Mutex, RwLock};
66use smallvec::SmallVec;
67
68use crate::durable_storage::InlineKey;
69use sochdb_core::Result;
70
71/// Default hot buffer capacity before flush
72pub const DEFAULT_HOT_BUFFER_CAPACITY: usize = 100_000;
73
74/// Threshold to trigger automatic flush (fraction of capacity)
75pub const FLUSH_THRESHOLD_RATIO: f64 = 0.8;
76
77// =============================================================================
78// Hot Buffer Entry
79// =============================================================================
80
81/// Entry in the hot buffer (unsorted)
82#[derive(Debug, Clone)]
83pub struct HotEntry {
84    /// Key bytes (using SmallVec for inline storage of small keys)
85    pub key: InlineKey,
86    /// Value (None = tombstone)
87    pub value: Option<Vec<u8>>,
88    /// Transaction ID
89    pub txn_id: u64,
90    /// Insertion sequence number (for stable sorting)
91    pub seq: u64,
92}
93
94impl HotEntry {
95    pub fn new(key: InlineKey, value: Option<Vec<u8>>, txn_id: u64, seq: u64) -> Self {
96        Self {
97            key,
98            value,
99            txn_id,
100            seq,
101        }
102    }
103}
104
105// =============================================================================
106// Sorted Batch (Warm Tier)
107// =============================================================================
108
109/// A sorted batch of entries (immutable after creation)
110#[derive(Debug)]
111pub struct SortedBatch {
112    /// Sorted entries (by key, then by seq desc for same key)
113    entries: Vec<HotEntry>,
114    /// Index for binary search: key_hash -> first occurrence index
115    /// Uses hash for O(1) average lookup, falls back to binary search
116    key_index: DashMap<u64, usize>,
117    /// Minimum timestamp in this batch
118    min_ts: u64,
119    /// Maximum timestamp in this batch
120    max_ts: u64,
121}
122
123impl SortedBatch {
124    /// Create from unsorted entries
125    pub fn from_unsorted(mut entries: Vec<HotEntry>) -> Self {
126        if entries.is_empty() {
127            return Self {
128                entries: Vec::new(),
129                key_index: DashMap::new(),
130                min_ts: u64::MAX,
131                max_ts: 0,
132            };
133        }
134
135        // Sort by key, then by seq desc (newest first for same key)
136        entries.sort_unstable_by(|a, b| {
137            match a.key.as_slice().cmp(b.key.as_slice()) {
138                std::cmp::Ordering::Equal => b.seq.cmp(&a.seq), // Descending
139                other => other,
140            }
141        });
142
143        // Build key index (first occurrence of each key)
144        let key_index = DashMap::new();
145        let mut last_key: Option<&[u8]> = None;
146        for (idx, entry) in entries.iter().enumerate() {
147            if last_key != Some(entry.key.as_slice()) {
148                let hash = Self::hash_key(&entry.key);
149                key_index.insert(hash, idx);
150                last_key = Some(entry.key.as_slice());
151            }
152        }
153
154        // Calculate timestamp range
155        let min_ts = entries.iter().map(|e| e.seq).min().unwrap_or(u64::MAX);
156        let max_ts = entries.iter().map(|e| e.seq).max().unwrap_or(0);
157
158        Self {
159            entries,
160            key_index,
161            min_ts,
162            max_ts,
163        }
164    }
165
166    /// Hash key for index lookup
167    #[inline]
168    fn hash_key(key: &[u8]) -> u64 {
169        twox_hash::xxh3::hash64(key)
170    }
171
172    /// Get entry by key - O(1) average, O(log N) worst case
173    pub fn get(&self, key: &[u8]) -> Option<&HotEntry> {
174        let hash = Self::hash_key(key);
175        
176        if let Some(idx) = self.key_index.get(&hash) {
177            // Verify key matches (handle hash collisions)
178            let idx = *idx;
179            if idx < self.entries.len() && self.entries[idx].key.as_slice() == key {
180                return Some(&self.entries[idx]);
181            }
182        }
183
184        // Fall back to binary search
185        self.entries
186            .binary_search_by(|e| e.key.as_slice().cmp(key))
187            .ok()
188            .map(|idx| &self.entries[idx])
189    }
190
191    /// Get all entries with given prefix - O(log N + K)
192    pub fn prefix_scan(&self, prefix: &[u8]) -> impl Iterator<Item = &HotEntry> {
193        // Find first entry >= prefix using binary search
194        let start_idx = self.entries
195            .partition_point(|e| e.key.as_slice() < prefix);
196
197        self.entries[start_idx..]
198            .iter()
199            .take_while(move |e| e.key.starts_with(prefix))
200    }
201
202    /// Get entry count
203    pub fn len(&self) -> usize {
204        self.entries.len()
205    }
206
207    /// Check if empty
208    pub fn is_empty(&self) -> bool {
209        self.entries.is_empty()
210    }
211
212    /// Iterate all entries in sorted order
213    pub fn iter(&self) -> impl Iterator<Item = &HotEntry> {
214        self.entries.iter()
215    }
216
217    /// Get timestamp range
218    pub fn timestamp_range(&self) -> (u64, u64) {
219        (self.min_ts, self.max_ts)
220    }
221}
222
223// =============================================================================
224// Tiered MemTable
225// =============================================================================
226
227/// Tiered MemTable with hot buffer and warm sorted batches
228///
229/// ## Architecture
230///
231/// ```text
232/// ┌─────────────────────────────────────────────────────────┐
233/// │ Hot Buffer (Vec<HotEntry>)                              │
234/// │ • O(1) append                                           │
235/// │ • Unsorted                                              │
236/// │ • Current writes                                        │
237/// └─────────────────────────────────────────────────────────┘
238///                        ↓ flush
239/// ┌─────────────────────────────────────────────────────────┐
240/// │ Warm Batches (Vec<Arc<SortedBatch>>)                    │
241/// │ • Sorted (O(N log N) once)                              │
242/// │ • Immutable                                             │
243/// │ • Binary search reads                                   │
244/// └─────────────────────────────────────────────────────────┘
245/// ```
246pub struct TieredMemTable {
247    /// Hot buffer: unsorted, O(1) append
248    hot_buffer: RwLock<Vec<HotEntry>>,
249    /// Hot buffer capacity
250    hot_capacity: usize,
251    /// Warm batches: sorted, immutable
252    warm_batches: RwLock<Vec<Arc<SortedBatch>>>,
253    /// Hash index for point lookups (key_hash -> batch_idx, entry_idx)
254    /// Avoids scanning all batches for point lookups
255    #[allow(dead_code)]
256    point_index: DashMap<Vec<u8>, (usize, usize)>,
257    /// Sequence counter for ordering
258    seq_counter: AtomicU64,
259    /// Approximate size in bytes
260    size_bytes: AtomicU64,
261    /// Entry count
262    entry_count: AtomicUsize,
263    /// Pending commits (txn_id -> commit_ts)
264    pending_commits: DashMap<u64, u64>,
265    /// Read-write lock for flush coordination
266    flush_lock: Mutex<()>,
267}
268
269impl TieredMemTable {
270    pub fn new() -> Self {
271        Self::with_capacity(DEFAULT_HOT_BUFFER_CAPACITY)
272    }
273
274    pub fn with_capacity(capacity: usize) -> Self {
275        Self {
276            hot_buffer: RwLock::new(Vec::with_capacity(capacity)),
277            hot_capacity: capacity,
278            warm_batches: RwLock::new(Vec::new()),
279            point_index: DashMap::new(),
280            seq_counter: AtomicU64::new(1),
281            size_bytes: AtomicU64::new(0),
282            entry_count: AtomicUsize::new(0),
283            pending_commits: DashMap::new(),
284            flush_lock: Mutex::new(()),
285        }
286    }
287
288    /// Write a key-value pair (uncommitted) - O(1)
289    ///
290    /// This is the key optimization: append to unsorted buffer
291    /// instead of inserting into SkipMap.
292    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
293        let key_inline = SmallVec::from_slice(key);
294        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
295        let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
296
297        let entry = HotEntry::new(key_inline, value, txn_id, seq);
298
299        // Fast path: append to hot buffer (O(1))
300        {
301            let mut buffer = self.hot_buffer.write();
302            buffer.push(entry);
303        }
304
305        self.size_bytes.fetch_add((key.len() + value_size) as u64, Ordering::Relaxed);
306        self.entry_count.fetch_add(1, Ordering::Relaxed);
307
308        // Check if flush needed
309        if self.should_flush() {
310            self.try_flush()?;
311        }
312
313        Ok(())
314    }
315
316    /// Write batch - O(n)
317    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
318        let mut total_size = 0u64;
319        let mut entries = Vec::with_capacity(writes.len());
320
321        for (key, value) in writes {
322            let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
323            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
324            total_size += (key.len() + value_size) as u64;
325
326            entries.push(HotEntry::new(
327                SmallVec::from_slice(key),
328                value.clone(),
329                txn_id,
330                seq,
331            ));
332        }
333
334        // Batch append (still O(n) but fewer lock acquisitions)
335        {
336            let mut buffer = self.hot_buffer.write();
337            buffer.extend(entries);
338        }
339
340        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
341        self.entry_count.fetch_add(writes.len(), Ordering::Relaxed);
342
343        if self.should_flush() {
344            self.try_flush()?;
345        }
346
347        Ok(())
348    }
349
350    /// Read at snapshot timestamp - O(log B + log N) where B = batch count
351    pub fn read(
352        &self,
353        key: &[u8],
354        snapshot_ts: u64,
355        current_txn_id: Option<u64>,
356    ) -> Option<Vec<u8>> {
357        // Check hot buffer first (most recent writes)
358        {
359            let buffer = self.hot_buffer.read();
360            // Scan backwards for most recent version
361            for entry in buffer.iter().rev() {
362                if entry.key.as_slice() == key {
363                    // Check MVCC visibility
364                    if self.is_visible(entry, snapshot_ts, current_txn_id) {
365                        return entry.value.clone();
366                    }
367                }
368            }
369        }
370
371        // Check warm batches (sorted, binary search)
372        {
373            let batches = self.warm_batches.read();
374            // Search from newest to oldest batch
375            for batch in batches.iter().rev() {
376                if let Some(entry) = batch.get(key) {
377                    if self.is_visible(entry, snapshot_ts, current_txn_id) {
378                        return entry.value.clone();
379                    }
380                }
381            }
382        }
383
384        None
385    }
386
387    /// Check if entry is visible at snapshot
388    #[inline]
389    fn is_visible(&self, entry: &HotEntry, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
390        // Own uncommitted write is always visible
391        if let Some(my_txn) = current_txn_id {
392            if entry.txn_id == my_txn {
393                return true;
394            }
395        }
396
397        // Check if committed before snapshot
398        if let Some(commit_ts) = self.pending_commits.get(&entry.txn_id) {
399            return *commit_ts < snapshot_ts;
400        }
401
402        false
403    }
404
405    /// Commit transaction
406    pub fn commit(&self, txn_id: u64, commit_ts: u64, _write_set: &HashSet<InlineKey>) {
407        // Record commit timestamp
408        self.pending_commits.insert(txn_id, commit_ts);
409
410        // Update point index for faster lookups
411        // In a real implementation, we'd update version chains here
412    }
413
414    /// Abort transaction
415    pub fn abort(&self, txn_id: u64) {
416        self.pending_commits.remove(&txn_id);
417        
418        // Remove uncommitted entries from hot buffer
419        let mut buffer = self.hot_buffer.write();
420        buffer.retain(|e| e.txn_id != txn_id);
421    }
422
423    /// Scan with prefix - O(log N + K) - Legacy method using HashSet deduplication
424    ///
425    /// Consider using `scan_prefix_tournament` for better performance with many batches.
426    pub fn scan_prefix(
427        &self,
428        prefix: &[u8],
429        snapshot_ts: u64,
430        current_txn_id: Option<u64>,
431    ) -> Vec<(Vec<u8>, Vec<u8>)> {
432        let mut results = Vec::new();
433        let mut seen_keys: HashSet<Vec<u8>> = HashSet::new();
434
435        // Scan hot buffer first
436        {
437            let buffer = self.hot_buffer.read();
438            for entry in buffer.iter().rev() {
439                if entry.key.starts_with(prefix) 
440                    && !seen_keys.contains(entry.key.as_slice())
441                    && self.is_visible(entry, snapshot_ts, current_txn_id)
442                {
443                    if let Some(ref value) = entry.value {
444                        results.push((entry.key.to_vec(), value.clone()));
445                        seen_keys.insert(entry.key.to_vec());
446                    }
447                }
448            }
449        }
450
451        // Scan warm batches
452        {
453            let batches = self.warm_batches.read();
454            for batch in batches.iter().rev() {
455                for entry in batch.prefix_scan(prefix) {
456                    if !seen_keys.contains(entry.key.as_slice())
457                        && self.is_visible(entry, snapshot_ts, current_txn_id)
458                    {
459                        if let Some(ref value) = entry.value {
460                            results.push((entry.key.to_vec(), value.clone()));
461                            seen_keys.insert(entry.key.to_vec());
462                        }
463                    }
464                }
465            }
466        }
467
468        // Sort results by key
469        results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
470        results
471    }
472
473    /// Scan with prefix using Tournament Tree K-way merge
474    ///
475    /// ## Performance
476    ///
477    /// For K sorted runs with total N matching entries:
478    /// - Time: O(N log K) vs O(N × K) for the naive approach
479    /// - Memory: O(K) for the tournament tree vs O(N) for HashSet dedup
480    ///
481    /// This is significantly faster when:
482    /// - K (number of batches) > 4
483    /// - N (total entries) is large
484    ///
485    /// ## Algorithm
486    ///
487    /// 1. Sort hot buffer entries with matching prefix
488    /// 2. Collect prefix-matching iterators from each sorted batch
489    /// 3. Merge using tournament tree (loser tree) for O(log K) per element
490    /// 4. Deduplicate by key (first occurrence wins = newest version)
491    /// 5. Filter by MVCC visibility
492    pub fn scan_prefix_tournament(
493        &self,
494        prefix: &[u8],
495        snapshot_ts: u64,
496        current_txn_id: Option<u64>,
497    ) -> Vec<(Vec<u8>, Vec<u8>)> {
498        use crate::tournament_tree::TournamentTree;
499        
500        // Collect all sources
501        let mut sorted_sources: Vec<Vec<HotEntry>> = Vec::new();
502        
503        // Source 0: Sorted hot buffer entries matching prefix
504        {
505            let buffer = self.hot_buffer.read();
506            let mut hot_entries: Vec<HotEntry> = buffer
507                .iter()
508                .filter(|e| e.key.starts_with(prefix))
509                .cloned()
510                .collect();
511            
512            // Sort by key, then by seq desc (newest first for same key)
513            hot_entries.sort_unstable_by(|a, b| {
514                match a.key.as_slice().cmp(b.key.as_slice()) {
515                    std::cmp::Ordering::Equal => b.seq.cmp(&a.seq),
516                    other => other,
517                }
518            });
519            
520            // Deduplicate within hot buffer (keep newest = first occurrence)
521            let mut seen = HashSet::new();
522            hot_entries.retain(|e| seen.insert(e.key.to_vec()));
523            
524            if !hot_entries.is_empty() {
525                sorted_sources.push(hot_entries);
526            }
527        }
528        
529        // Sources 1..K: Warm batches (already sorted)
530        {
531            let batches = self.warm_batches.read();
532            // Iterate from newest to oldest
533            for batch in batches.iter().rev() {
534                let entries: Vec<HotEntry> = batch
535                    .prefix_scan(prefix)
536                    .cloned()
537                    .collect();
538                
539                if !entries.is_empty() {
540                    sorted_sources.push(entries);
541                }
542            }
543        }
544        
545        if sorted_sources.is_empty() {
546            return Vec::new();
547        }
548        
549        // Special case: single source, no merge needed
550        if sorted_sources.len() == 1 {
551            return sorted_sources
552                .into_iter()
553                .next()
554                .unwrap()
555                .into_iter()
556                .filter(|e| self.is_visible(e, snapshot_ts, current_txn_id))
557                .filter_map(|e| e.value.map(|v| (e.key.to_vec(), v)))
558                .collect();
559        }
560        
561        // K-way merge using tournament tree
562        // Wrap entries with source index for priority-based comparison
563        // When keys are equal, lower source_idx wins (newer data)
564        #[derive(Clone)]
565        struct KeyedEntry {
566            entry: HotEntry,
567            source_idx: usize,
568        }
569        
570        impl PartialEq for KeyedEntry {
571            fn eq(&self, other: &Self) -> bool {
572                self.entry.key.as_slice() == other.entry.key.as_slice()
573            }
574        }
575        impl Eq for KeyedEntry {}
576        impl PartialOrd for KeyedEntry {
577            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
578                Some(self.cmp(other))
579            }
580        }
581        impl Ord for KeyedEntry {
582            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
583                // Primary: sort by key
584                // Secondary: lower source_idx wins (source 0 = hot buffer = newest)
585                match self.entry.key.as_slice().cmp(other.entry.key.as_slice()) {
586                    std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
587                    other => other,
588                }
589            }
590        }
591        
592        let iters: Vec<_> = sorted_sources
593            .into_iter()
594            .enumerate()
595            .map(|(source_idx, v)| {
596                v.into_iter().map(move |e| KeyedEntry { entry: e, source_idx })
597            })
598            .collect();
599        
600        let mut tree = TournamentTree::new(iters);
601        let mut results = Vec::new();
602        let mut last_key: Option<Vec<u8>> = None;
603        
604        // Merge with deduplication and visibility filtering
605        while let Some((_, keyed)) = tree.pop() {
606            let entry = keyed.entry;
607            
608            // Deduplicate by key (first occurrence = from newest source due to ordering)
609            if let Some(ref last) = last_key {
610                if entry.key.as_slice() == last.as_slice() {
611                    continue;
612                }
613            }
614            last_key = Some(entry.key.to_vec());
615            
616            // Check visibility
617            if !self.is_visible(&entry, snapshot_ts, current_txn_id) {
618                // Not visible at this snapshot, try next version
619                // Note: since we deduplicated, we might miss older visible versions
620                // In a production system, we'd need a more sophisticated approach
621                continue;
622            }
623            
624            // Include non-tombstone entries
625            if let Some(value) = entry.value {
626                results.push((entry.key.to_vec(), value));
627            }
628        }
629        
630        results
631    }
632
633    /// Check if flush is needed
634    fn should_flush(&self) -> bool {
635        let buffer = self.hot_buffer.read();
636        buffer.len() >= (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize
637    }
638
639    /// Try to flush hot buffer to warm batch
640    pub fn try_flush(&self) -> Result<()> {
641        // Try to acquire flush lock (non-blocking)
642        let guard = match self.flush_lock.try_lock() {
643            Some(g) => g,
644            None => return Ok(()), // Another thread is flushing
645        };
646
647        // Swap hot buffer with empty
648        let entries = {
649            let mut buffer = self.hot_buffer.write();
650            if buffer.len() < (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize {
651                // Buffer was flushed by another thread
652                return Ok(());
653            }
654            std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
655        };
656
657        if entries.is_empty() {
658            return Ok(());
659        }
660
661        // Create sorted batch (O(N log N))
662        let batch = Arc::new(SortedBatch::from_unsorted(entries));
663
664        // Add to warm batches
665        {
666            let mut batches = self.warm_batches.write();
667            batches.push(batch);
668        }
669
670        drop(guard);
671        Ok(())
672    }
673
674    /// Force flush hot buffer
675    pub fn flush(&self) -> Result<()> {
676        let _guard = self.flush_lock.lock();
677
678        let entries = {
679            let mut buffer = self.hot_buffer.write();
680            std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
681        };
682
683        if entries.is_empty() {
684            return Ok(());
685        }
686
687        let batch = Arc::new(SortedBatch::from_unsorted(entries));
688
689        {
690            let mut batches = self.warm_batches.write();
691            batches.push(batch);
692        }
693
694        Ok(())
695    }
696
697    /// Get approximate size in bytes
698    pub fn size(&self) -> u64 {
699        self.size_bytes.load(Ordering::Relaxed)
700    }
701
702    /// Get entry count
703    pub fn len(&self) -> usize {
704        self.entry_count.load(Ordering::Relaxed)
705    }
706
707    /// Check if empty
708    pub fn is_empty(&self) -> bool {
709        self.len() == 0
710    }
711
712    /// Get batch count
713    pub fn batch_count(&self) -> usize {
714        self.warm_batches.read().len()
715    }
716
717    /// Get hot buffer length
718    pub fn hot_buffer_len(&self) -> usize {
719        self.hot_buffer.read().len()
720    }
721
722    /// Compact warm batches
723    pub fn compact(&self) -> Result<()> {
724        let batches = {
725            let mut b = self.warm_batches.write();
726            std::mem::take(&mut *b)
727        };
728
729        if batches.len() <= 1 {
730            let mut b = self.warm_batches.write();
731            *b = batches;
732            return Ok(());
733        }
734
735        // Merge all batches into one
736        let all_entries: Vec<HotEntry> = batches
737            .iter()
738            .flat_map(|b| b.iter().cloned())
739            .collect();
740
741        // Re-sort
742        let merged = Arc::new(SortedBatch::from_unsorted(all_entries));
743
744        {
745            let mut b = self.warm_batches.write();
746            b.clear();
747            b.push(merged);
748        }
749
750        Ok(())
751    }
752}
753
754impl Default for TieredMemTable {
755    fn default() -> Self {
756        Self::new()
757    }
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763
764    #[test]
765    fn test_tiered_memtable_basic() {
766        let table = TieredMemTable::new();
767        
768        table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
769        table.write(b"key2", Some(b"value2".to_vec()), 1).unwrap();
770        
771        // Commit transaction
772        let mut write_set = HashSet::new();
773        write_set.insert(SmallVec::from_slice(b"key1"));
774        write_set.insert(SmallVec::from_slice(b"key2"));
775        table.commit(1, 100, &write_set);
776        
777        // Read back
778        let v1 = table.read(b"key1", 200, None);
779        let v2 = table.read(b"key2", 200, None);
780        
781        assert_eq!(v1, Some(b"value1".to_vec()));
782        assert_eq!(v2, Some(b"value2".to_vec()));
783    }
784
785    #[test]
786    fn test_tiered_memtable_uncommitted_own() {
787        let table = TieredMemTable::new();
788        
789        table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
790        
791        // Should see own uncommitted write
792        let v = table.read(b"key1", 100, Some(1));
793        assert_eq!(v, Some(b"value1".to_vec()));
794        
795        // Should not see other's uncommitted write
796        let v = table.read(b"key1", 100, Some(2));
797        assert_eq!(v, None);
798    }
799
800    #[test]
801    fn test_tiered_memtable_flush() {
802        let table = TieredMemTable::with_capacity(100);
803        
804        // Write enough to trigger flush
805        for i in 0..90 {
806            table.write(
807                format!("key{:04}", i).as_bytes(),
808                Some(format!("value{}", i).into_bytes()),
809                1,
810            ).unwrap();
811        }
812        
813        // Force flush
814        table.flush().unwrap();
815        
816        assert!(table.batch_count() >= 1);
817        assert_eq!(table.hot_buffer_len(), 0);
818    }
819
820    #[test]
821    fn test_tiered_memtable_scan_prefix() {
822        let table = TieredMemTable::new();
823        
824        table.write(b"users:1", Some(b"alice".to_vec()), 1).unwrap();
825        table.write(b"users:2", Some(b"bob".to_vec()), 1).unwrap();
826        table.write(b"posts:1", Some(b"post1".to_vec()), 1).unwrap();
827        
828        let mut write_set = HashSet::new();
829        write_set.insert(SmallVec::from_slice(b"users:1"));
830        write_set.insert(SmallVec::from_slice(b"users:2"));
831        write_set.insert(SmallVec::from_slice(b"posts:1"));
832        table.commit(1, 100, &write_set);
833        
834        let results = table.scan_prefix(b"users:", 200, None);
835        assert_eq!(results.len(), 2);
836    }
837
838    #[test]
839    fn test_sorted_batch() {
840        let entries = vec![
841            HotEntry::new(SmallVec::from_slice(b"c"), Some(b"3".to_vec()), 1, 3),
842            HotEntry::new(SmallVec::from_slice(b"a"), Some(b"1".to_vec()), 1, 1),
843            HotEntry::new(SmallVec::from_slice(b"b"), Some(b"2".to_vec()), 1, 2),
844        ];
845        
846        let batch = SortedBatch::from_unsorted(entries);
847        
848        assert_eq!(batch.len(), 3);
849        assert_eq!(batch.get(b"a").unwrap().value, Some(b"1".to_vec()));
850        assert_eq!(batch.get(b"b").unwrap().value, Some(b"2".to_vec()));
851        assert_eq!(batch.get(b"c").unwrap().value, Some(b"3".to_vec()));
852    }
853
854    #[test]
855    fn test_sorted_batch_prefix_scan() {
856        let entries = vec![
857            HotEntry::new(SmallVec::from_slice(b"ab"), Some(b"1".to_vec()), 1, 1),
858            HotEntry::new(SmallVec::from_slice(b"abc"), Some(b"2".to_vec()), 1, 2),
859            HotEntry::new(SmallVec::from_slice(b"abd"), Some(b"3".to_vec()), 1, 3),
860            HotEntry::new(SmallVec::from_slice(b"xyz"), Some(b"4".to_vec()), 1, 4),
861        ];
862        
863        let batch = SortedBatch::from_unsorted(entries);
864        let results: Vec<_> = batch.prefix_scan(b"ab").collect();
865        
866        assert_eq!(results.len(), 3);
867    }
868
869    #[test]
870    fn test_scan_prefix_tournament() {
871        let table = TieredMemTable::with_capacity(100);
872        
873        // Create multiple batches by flushing
874        for batch_idx in 0..3 {
875            for i in 0..10 {
876                let key = format!("users:{:02}", i);
877                let value = format!("value_batch{}_item{}", batch_idx, i);
878                table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
879            }
880            table.flush().unwrap();
881        }
882        
883        // Add some to hot buffer
884        for i in 0..5 {
885            let key = format!("users:{:02}", i);
886            let value = format!("newest_value_{}", i);
887            table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
888        }
889        
890        // Commit all
891        let mut write_set = HashSet::new();
892        for i in 0..10 {
893            write_set.insert(SmallVec::from_slice(format!("users:{:02}", i).as_bytes()));
894        }
895        table.commit(1, 100, &write_set);
896        
897        // Scan using tournament tree
898        let results = table.scan_prefix_tournament(b"users:", 200, None);
899        
900        // Should have 10 unique keys with newest values
901        assert_eq!(results.len(), 10);
902        
903        // First 5 should have "newest_value_X"
904        for (i, (key, value)) in results.iter().take(5).enumerate() {
905            let expected_key = format!("users:{:02}", i);
906            assert_eq!(key.as_slice(), expected_key.as_bytes());
907            assert!(String::from_utf8_lossy(value).starts_with("newest_value_"));
908        }
909    }
910
911    #[test]
912    fn test_scan_tournament_deduplication() {
913        let table = TieredMemTable::with_capacity(100);
914        
915        // Write same key multiple times across batches
916        table.write(b"key:001", Some(b"old1".to_vec()), 1).unwrap();
917        table.flush().unwrap();
918        
919        table.write(b"key:001", Some(b"old2".to_vec()), 1).unwrap();
920        table.flush().unwrap();
921        
922        table.write(b"key:001", Some(b"newest".to_vec()), 1).unwrap();
923        
924        // Commit
925        let mut write_set = HashSet::new();
926        write_set.insert(SmallVec::from_slice(b"key:001"));
927        table.commit(1, 100, &write_set);
928        
929        // Tournament scan should return only newest
930        let results = table.scan_prefix_tournament(b"key:", 200, None);
931        assert_eq!(results.len(), 1);
932        assert_eq!(results[0].1.as_slice(), b"newest");
933    }
934}