Skip to main content

sochdb_storage/
tiered_memtable.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Tiered SkipMap Elimination (Recommendation 3)
19//!
20//! ## Problem
21//!
22//! Current write path uses three concurrent data structures:
23//! ```ignore
24//! pub struct ArenaMvccMemTable {
25//!     data: DashMap<ArenaKeyHandle, VersionChain>,      // +47ns lookup
26//!     ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>, // +93ns insert
27//!     dirty_list: ArenaEpochDirtyList,                   // +15ns
28//! }
29//! ```
30//!
31//! The benchmark shows "No-SkipMap Mode" achieves 97% of SQLite. 
32//! The SkipMap provides ordered iteration but costs 40% of insert time.
33//!
34//! ## Solution
35//!
36//! Tiered architecture:
37//! - **Hot tier**: Unsorted append-only buffer (O(1) insert)
38//! - **Warm tier**: Sorted batch (O(N log N) once at flush)
39//!
40//! ## Performance Analysis
41//!
42//! Current per-write cost:
43//! ```text
44//! T_write = T_dashmap_insert + T_skipmap_insert + T_dirty_list
45//!         = 47ns + 93ns + 15ns = 155ns
46//! ```
47//!
48//! Proposed per-write cost:
49//! ```text
50//! T_write = T_vec_push + T_dirty_list
51//!         = 5ns + 15ns = 20ns
52//! ```
53//!
54//! Amortized sort cost at flush (N = 100,000 rows):
55//! ```text
56//! T_sort = N × log(N) × comparison_cost
57//!        = 100,000 × 17 × 10ns = 17ms (once per flush)
58//! ```
59//!
60//! Net throughput: 1 / 20ns = 50M ops/sec (theoretical max)
61//! With other overhead: ~2M ops/sec (matches "No-SkipMap" benchmark result)
62
63use std::collections::HashSet;
64use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
65use std::sync::Arc;
66
67use dashmap::DashMap;
68use parking_lot::{Mutex, RwLock};
69use smallvec::SmallVec;
70
71use crate::durable_storage::InlineKey;
72use sochdb_core::Result;
73
74/// Default hot buffer capacity before flush
75pub const DEFAULT_HOT_BUFFER_CAPACITY: usize = 100_000;
76
77/// Threshold to trigger automatic flush (fraction of capacity)
78pub const FLUSH_THRESHOLD_RATIO: f64 = 0.8;
79
80// =============================================================================
81// Hot Buffer Entry
82// =============================================================================
83
84/// Entry in the hot buffer (unsorted)
85#[derive(Debug, Clone)]
86pub struct HotEntry {
87    /// Key bytes (using SmallVec for inline storage of small keys)
88    pub key: InlineKey,
89    /// Value (None = tombstone)
90    pub value: Option<Vec<u8>>,
91    /// Transaction ID
92    pub txn_id: u64,
93    /// Insertion sequence number (for stable sorting)
94    pub seq: u64,
95}
96
97impl HotEntry {
98    pub fn new(key: InlineKey, value: Option<Vec<u8>>, txn_id: u64, seq: u64) -> Self {
99        Self {
100            key,
101            value,
102            txn_id,
103            seq,
104        }
105    }
106}
107
108// =============================================================================
109// Sorted Batch (Warm Tier)
110// =============================================================================
111
112/// A sorted batch of entries (immutable after creation)
113#[derive(Debug)]
114pub struct SortedBatch {
115    /// Sorted entries (by key, then by seq desc for same key)
116    entries: Vec<HotEntry>,
117    /// Index for binary search: key_hash -> first occurrence index
118    /// Uses hash for O(1) average lookup, falls back to binary search
119    key_index: DashMap<u64, usize>,
120    /// Minimum timestamp in this batch
121    min_ts: u64,
122    /// Maximum timestamp in this batch
123    max_ts: u64,
124}
125
126impl SortedBatch {
127    /// Create from unsorted entries
128    pub fn from_unsorted(mut entries: Vec<HotEntry>) -> Self {
129        if entries.is_empty() {
130            return Self {
131                entries: Vec::new(),
132                key_index: DashMap::new(),
133                min_ts: u64::MAX,
134                max_ts: 0,
135            };
136        }
137
138        // Sort by key, then by seq desc (newest first for same key)
139        entries.sort_unstable_by(|a, b| {
140            match a.key.as_slice().cmp(b.key.as_slice()) {
141                std::cmp::Ordering::Equal => b.seq.cmp(&a.seq), // Descending
142                other => other,
143            }
144        });
145
146        // Build key index (first occurrence of each key)
147        let key_index = DashMap::new();
148        let mut last_key: Option<&[u8]> = None;
149        for (idx, entry) in entries.iter().enumerate() {
150            if last_key != Some(entry.key.as_slice()) {
151                let hash = Self::hash_key(&entry.key);
152                key_index.insert(hash, idx);
153                last_key = Some(entry.key.as_slice());
154            }
155        }
156
157        // Calculate timestamp range
158        let min_ts = entries.iter().map(|e| e.seq).min().unwrap_or(u64::MAX);
159        let max_ts = entries.iter().map(|e| e.seq).max().unwrap_or(0);
160
161        Self {
162            entries,
163            key_index,
164            min_ts,
165            max_ts,
166        }
167    }
168
169    /// Hash key for index lookup
170    #[inline]
171    fn hash_key(key: &[u8]) -> u64 {
172        twox_hash::xxh3::hash64(key)
173    }
174
175    /// Get entry by key - O(1) average, O(log N) worst case
176    pub fn get(&self, key: &[u8]) -> Option<&HotEntry> {
177        let hash = Self::hash_key(key);
178        
179        if let Some(idx) = self.key_index.get(&hash) {
180            // Verify key matches (handle hash collisions)
181            let idx = *idx;
182            if idx < self.entries.len() && self.entries[idx].key.as_slice() == key {
183                return Some(&self.entries[idx]);
184            }
185        }
186
187        // Fall back to binary search
188        self.entries
189            .binary_search_by(|e| e.key.as_slice().cmp(key))
190            .ok()
191            .map(|idx| &self.entries[idx])
192    }
193
194    /// Get all entries with given prefix - O(log N + K)
195    pub fn prefix_scan(&self, prefix: &[u8]) -> impl Iterator<Item = &HotEntry> {
196        // Find first entry >= prefix using binary search
197        let start_idx = self.entries
198            .partition_point(|e| e.key.as_slice() < prefix);
199
200        self.entries[start_idx..]
201            .iter()
202            .take_while(move |e| e.key.starts_with(prefix))
203    }
204
205    /// Get entry count
206    pub fn len(&self) -> usize {
207        self.entries.len()
208    }
209
210    /// Check if empty
211    pub fn is_empty(&self) -> bool {
212        self.entries.is_empty()
213    }
214
215    /// Iterate all entries in sorted order
216    pub fn iter(&self) -> impl Iterator<Item = &HotEntry> {
217        self.entries.iter()
218    }
219
220    /// Get timestamp range
221    pub fn timestamp_range(&self) -> (u64, u64) {
222        (self.min_ts, self.max_ts)
223    }
224}
225
226// =============================================================================
227// Tiered MemTable
228// =============================================================================
229
230/// Tiered MemTable with hot buffer and warm sorted batches
231///
232/// ## Architecture
233///
234/// ```text
235/// ┌─────────────────────────────────────────────────────────┐
236/// │ Hot Buffer (Vec<HotEntry>)                              │
237/// │ • O(1) append                                           │
238/// │ • Unsorted                                              │
239/// │ • Current writes                                        │
240/// └─────────────────────────────────────────────────────────┘
241///                        ↓ flush
242/// ┌─────────────────────────────────────────────────────────┐
243/// │ Warm Batches (Vec<Arc<SortedBatch>>)                    │
244/// │ • Sorted (O(N log N) once)                              │
245/// │ • Immutable                                             │
246/// │ • Binary search reads                                   │
247/// └─────────────────────────────────────────────────────────┘
248/// ```
249pub struct TieredMemTable {
250    /// Hot buffer: unsorted, O(1) append
251    hot_buffer: RwLock<Vec<HotEntry>>,
252    /// Hot buffer capacity
253    hot_capacity: usize,
254    /// Warm batches: sorted, immutable
255    warm_batches: RwLock<Vec<Arc<SortedBatch>>>,
256    /// Hash index for point lookups (key_hash -> batch_idx, entry_idx)
257    /// Avoids scanning all batches for point lookups
258    #[allow(dead_code)]
259    point_index: DashMap<Vec<u8>, (usize, usize)>,
260    /// Sequence counter for ordering
261    seq_counter: AtomicU64,
262    /// Approximate size in bytes
263    size_bytes: AtomicU64,
264    /// Entry count
265    entry_count: AtomicUsize,
266    /// Pending commits (txn_id -> commit_ts)
267    pending_commits: DashMap<u64, u64>,
268    /// Read-write lock for flush coordination
269    flush_lock: Mutex<()>,
270}
271
272impl TieredMemTable {
273    pub fn new() -> Self {
274        Self::with_capacity(DEFAULT_HOT_BUFFER_CAPACITY)
275    }
276
277    pub fn with_capacity(capacity: usize) -> Self {
278        Self {
279            hot_buffer: RwLock::new(Vec::with_capacity(capacity)),
280            hot_capacity: capacity,
281            warm_batches: RwLock::new(Vec::new()),
282            point_index: DashMap::new(),
283            seq_counter: AtomicU64::new(1),
284            size_bytes: AtomicU64::new(0),
285            entry_count: AtomicUsize::new(0),
286            pending_commits: DashMap::new(),
287            flush_lock: Mutex::new(()),
288        }
289    }
290
291    /// Write a key-value pair (uncommitted) - O(1)
292    ///
293    /// This is the key optimization: append to unsorted buffer
294    /// instead of inserting into SkipMap.
295    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
296        let key_inline = SmallVec::from_slice(key);
297        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
298        let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
299
300        let entry = HotEntry::new(key_inline, value, txn_id, seq);
301
302        // Fast path: append to hot buffer (O(1))
303        {
304            let mut buffer = self.hot_buffer.write();
305            buffer.push(entry);
306        }
307
308        self.size_bytes.fetch_add((key.len() + value_size) as u64, Ordering::Relaxed);
309        self.entry_count.fetch_add(1, Ordering::Relaxed);
310
311        // Check if flush needed
312        if self.should_flush() {
313            self.try_flush()?;
314        }
315
316        Ok(())
317    }
318
319    /// Write batch - O(n)
320    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
321        let mut total_size = 0u64;
322        let mut entries = Vec::with_capacity(writes.len());
323
324        for (key, value) in writes {
325            let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
326            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
327            total_size += (key.len() + value_size) as u64;
328
329            entries.push(HotEntry::new(
330                SmallVec::from_slice(key),
331                value.clone(),
332                txn_id,
333                seq,
334            ));
335        }
336
337        // Batch append (still O(n) but fewer lock acquisitions)
338        {
339            let mut buffer = self.hot_buffer.write();
340            buffer.extend(entries);
341        }
342
343        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
344        self.entry_count.fetch_add(writes.len(), Ordering::Relaxed);
345
346        if self.should_flush() {
347            self.try_flush()?;
348        }
349
350        Ok(())
351    }
352
353    /// Read at snapshot timestamp - O(log B + log N) where B = batch count
354    pub fn read(
355        &self,
356        key: &[u8],
357        snapshot_ts: u64,
358        current_txn_id: Option<u64>,
359    ) -> Option<Vec<u8>> {
360        // Check hot buffer first (most recent writes)
361        {
362            let buffer = self.hot_buffer.read();
363            // Scan backwards for most recent version
364            for entry in buffer.iter().rev() {
365                if entry.key.as_slice() == key {
366                    // Check MVCC visibility
367                    if self.is_visible(entry, snapshot_ts, current_txn_id) {
368                        return entry.value.clone();
369                    }
370                }
371            }
372        }
373
374        // Check warm batches (sorted, binary search)
375        {
376            let batches = self.warm_batches.read();
377            // Search from newest to oldest batch
378            for batch in batches.iter().rev() {
379                if let Some(entry) = batch.get(key) {
380                    if self.is_visible(entry, snapshot_ts, current_txn_id) {
381                        return entry.value.clone();
382                    }
383                }
384            }
385        }
386
387        None
388    }
389
390    /// Check if entry is visible at snapshot
391    #[inline]
392    fn is_visible(&self, entry: &HotEntry, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
393        // Own uncommitted write is always visible
394        if let Some(my_txn) = current_txn_id {
395            if entry.txn_id == my_txn {
396                return true;
397            }
398        }
399
400        // Check if committed before snapshot
401        if let Some(commit_ts) = self.pending_commits.get(&entry.txn_id) {
402            return *commit_ts < snapshot_ts;
403        }
404
405        false
406    }
407
408    /// Commit transaction
409    pub fn commit(&self, txn_id: u64, commit_ts: u64, _write_set: &HashSet<InlineKey>) {
410        // Record commit timestamp
411        self.pending_commits.insert(txn_id, commit_ts);
412
413        // Update point index for faster lookups
414        // In a real implementation, we'd update version chains here
415    }
416
417    /// Abort transaction
418    pub fn abort(&self, txn_id: u64) {
419        self.pending_commits.remove(&txn_id);
420        
421        // Remove uncommitted entries from hot buffer
422        let mut buffer = self.hot_buffer.write();
423        buffer.retain(|e| e.txn_id != txn_id);
424    }
425
426    /// Scan with prefix - O(log N + K) - Legacy method using HashSet deduplication
427    ///
428    /// Consider using `scan_prefix_tournament` for better performance with many batches.
429    pub fn scan_prefix(
430        &self,
431        prefix: &[u8],
432        snapshot_ts: u64,
433        current_txn_id: Option<u64>,
434    ) -> Vec<(Vec<u8>, Vec<u8>)> {
435        let mut results = Vec::new();
436        let mut seen_keys: HashSet<Vec<u8>> = HashSet::new();
437
438        // Scan hot buffer first
439        {
440            let buffer = self.hot_buffer.read();
441            for entry in buffer.iter().rev() {
442                if entry.key.starts_with(prefix) 
443                    && !seen_keys.contains(entry.key.as_slice())
444                    && self.is_visible(entry, snapshot_ts, current_txn_id)
445                {
446                    if let Some(ref value) = entry.value {
447                        results.push((entry.key.to_vec(), value.clone()));
448                        seen_keys.insert(entry.key.to_vec());
449                    }
450                }
451            }
452        }
453
454        // Scan warm batches
455        {
456            let batches = self.warm_batches.read();
457            for batch in batches.iter().rev() {
458                for entry in batch.prefix_scan(prefix) {
459                    if !seen_keys.contains(entry.key.as_slice())
460                        && self.is_visible(entry, snapshot_ts, current_txn_id)
461                    {
462                        if let Some(ref value) = entry.value {
463                            results.push((entry.key.to_vec(), value.clone()));
464                            seen_keys.insert(entry.key.to_vec());
465                        }
466                    }
467                }
468            }
469        }
470
471        // Sort results by key
472        results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
473        results
474    }
475
476    /// Scan with prefix using Tournament Tree K-way merge
477    ///
478    /// ## Performance
479    ///
480    /// For K sorted runs with total N matching entries:
481    /// - Time: O(N log K) vs O(N × K) for the naive approach
482    /// - Memory: O(K) for the tournament tree vs O(N) for HashSet dedup
483    ///
484    /// This is significantly faster when:
485    /// - K (number of batches) > 4
486    /// - N (total entries) is large
487    ///
488    /// ## Algorithm
489    ///
490    /// 1. Sort hot buffer entries with matching prefix
491    /// 2. Collect prefix-matching iterators from each sorted batch
492    /// 3. Merge using tournament tree (loser tree) for O(log K) per element
493    /// 4. Deduplicate by key (first occurrence wins = newest version)
494    /// 5. Filter by MVCC visibility
495    pub fn scan_prefix_tournament(
496        &self,
497        prefix: &[u8],
498        snapshot_ts: u64,
499        current_txn_id: Option<u64>,
500    ) -> Vec<(Vec<u8>, Vec<u8>)> {
501        use crate::tournament_tree::TournamentTree;
502        
503        // Collect all sources
504        let mut sorted_sources: Vec<Vec<HotEntry>> = Vec::new();
505        
506        // Source 0: Sorted hot buffer entries matching prefix
507        {
508            let buffer = self.hot_buffer.read();
509            let mut hot_entries: Vec<HotEntry> = buffer
510                .iter()
511                .filter(|e| e.key.starts_with(prefix))
512                .cloned()
513                .collect();
514            
515            // Sort by key, then by seq desc (newest first for same key)
516            hot_entries.sort_unstable_by(|a, b| {
517                match a.key.as_slice().cmp(b.key.as_slice()) {
518                    std::cmp::Ordering::Equal => b.seq.cmp(&a.seq),
519                    other => other,
520                }
521            });
522            
523            // Deduplicate within hot buffer (keep newest = first occurrence)
524            let mut seen = HashSet::new();
525            hot_entries.retain(|e| seen.insert(e.key.to_vec()));
526            
527            if !hot_entries.is_empty() {
528                sorted_sources.push(hot_entries);
529            }
530        }
531        
532        // Sources 1..K: Warm batches (already sorted)
533        {
534            let batches = self.warm_batches.read();
535            // Iterate from newest to oldest
536            for batch in batches.iter().rev() {
537                let entries: Vec<HotEntry> = batch
538                    .prefix_scan(prefix)
539                    .cloned()
540                    .collect();
541                
542                if !entries.is_empty() {
543                    sorted_sources.push(entries);
544                }
545            }
546        }
547        
548        if sorted_sources.is_empty() {
549            return Vec::new();
550        }
551        
552        // Special case: single source, no merge needed
553        if sorted_sources.len() == 1 {
554            return sorted_sources
555                .into_iter()
556                .next()
557                .unwrap()
558                .into_iter()
559                .filter(|e| self.is_visible(e, snapshot_ts, current_txn_id))
560                .filter_map(|e| e.value.map(|v| (e.key.to_vec(), v)))
561                .collect();
562        }
563        
564        // K-way merge using tournament tree
565        // Wrap entries with source index for priority-based comparison
566        // When keys are equal, lower source_idx wins (newer data)
567        #[derive(Clone)]
568        struct KeyedEntry {
569            entry: HotEntry,
570            source_idx: usize,
571        }
572        
573        impl PartialEq for KeyedEntry {
574            fn eq(&self, other: &Self) -> bool {
575                self.entry.key.as_slice() == other.entry.key.as_slice()
576            }
577        }
578        impl Eq for KeyedEntry {}
579        impl PartialOrd for KeyedEntry {
580            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
581                Some(self.cmp(other))
582            }
583        }
584        impl Ord for KeyedEntry {
585            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
586                // Primary: sort by key
587                // Secondary: lower source_idx wins (source 0 = hot buffer = newest)
588                match self.entry.key.as_slice().cmp(other.entry.key.as_slice()) {
589                    std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
590                    other => other,
591                }
592            }
593        }
594        
595        let iters: Vec<_> = sorted_sources
596            .into_iter()
597            .enumerate()
598            .map(|(source_idx, v)| {
599                v.into_iter().map(move |e| KeyedEntry { entry: e, source_idx })
600            })
601            .collect();
602        
603        let mut tree = TournamentTree::new(iters);
604        let mut results = Vec::new();
605        let mut last_key: Option<Vec<u8>> = None;
606        
607        // Merge with deduplication and visibility filtering
608        while let Some((_, keyed)) = tree.pop() {
609            let entry = keyed.entry;
610            
611            // Deduplicate by key (first occurrence = from newest source due to ordering)
612            if let Some(ref last) = last_key {
613                if entry.key.as_slice() == last.as_slice() {
614                    continue;
615                }
616            }
617            last_key = Some(entry.key.to_vec());
618            
619            // Check visibility
620            if !self.is_visible(&entry, snapshot_ts, current_txn_id) {
621                // Not visible at this snapshot, try next version
622                // Note: since we deduplicated, we might miss older visible versions
623                // In a production system, we'd need a more sophisticated approach
624                continue;
625            }
626            
627            // Include non-tombstone entries
628            if let Some(value) = entry.value {
629                results.push((entry.key.to_vec(), value));
630            }
631        }
632        
633        results
634    }
635
636    /// Check if flush is needed
637    fn should_flush(&self) -> bool {
638        let buffer = self.hot_buffer.read();
639        buffer.len() >= (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize
640    }
641
642    /// Try to flush hot buffer to warm batch
643    pub fn try_flush(&self) -> Result<()> {
644        // Try to acquire flush lock (non-blocking)
645        let guard = match self.flush_lock.try_lock() {
646            Some(g) => g,
647            None => return Ok(()), // Another thread is flushing
648        };
649
650        // Swap hot buffer with empty
651        let entries = {
652            let mut buffer = self.hot_buffer.write();
653            if buffer.len() < (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize {
654                // Buffer was flushed by another thread
655                return Ok(());
656            }
657            std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
658        };
659
660        if entries.is_empty() {
661            return Ok(());
662        }
663
664        // Create sorted batch (O(N log N))
665        let batch = Arc::new(SortedBatch::from_unsorted(entries));
666
667        // Add to warm batches
668        {
669            let mut batches = self.warm_batches.write();
670            batches.push(batch);
671        }
672
673        drop(guard);
674        Ok(())
675    }
676
677    /// Force flush hot buffer
678    pub fn flush(&self) -> Result<()> {
679        let _guard = self.flush_lock.lock();
680
681        let entries = {
682            let mut buffer = self.hot_buffer.write();
683            std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
684        };
685
686        if entries.is_empty() {
687            return Ok(());
688        }
689
690        let batch = Arc::new(SortedBatch::from_unsorted(entries));
691
692        {
693            let mut batches = self.warm_batches.write();
694            batches.push(batch);
695        }
696
697        Ok(())
698    }
699
700    /// Get approximate size in bytes
701    pub fn size(&self) -> u64 {
702        self.size_bytes.load(Ordering::Relaxed)
703    }
704
705    /// Get entry count
706    pub fn len(&self) -> usize {
707        self.entry_count.load(Ordering::Relaxed)
708    }
709
710    /// Check if empty
711    pub fn is_empty(&self) -> bool {
712        self.len() == 0
713    }
714
715    /// Get batch count
716    pub fn batch_count(&self) -> usize {
717        self.warm_batches.read().len()
718    }
719
720    /// Get hot buffer length
721    pub fn hot_buffer_len(&self) -> usize {
722        self.hot_buffer.read().len()
723    }
724
725    /// Compact warm batches
726    pub fn compact(&self) -> Result<()> {
727        let batches = {
728            let mut b = self.warm_batches.write();
729            std::mem::take(&mut *b)
730        };
731
732        if batches.len() <= 1 {
733            let mut b = self.warm_batches.write();
734            *b = batches;
735            return Ok(());
736        }
737
738        // Merge all batches into one
739        let all_entries: Vec<HotEntry> = batches
740            .iter()
741            .flat_map(|b| b.iter().cloned())
742            .collect();
743
744        // Re-sort
745        let merged = Arc::new(SortedBatch::from_unsorted(all_entries));
746
747        {
748            let mut b = self.warm_batches.write();
749            b.clear();
750            b.push(merged);
751        }
752
753        Ok(())
754    }
755}
756
757impl Default for TieredMemTable {
758    fn default() -> Self {
759        Self::new()
760    }
761}
762
763#[cfg(test)]
764mod tests {
765    use super::*;
766
767    #[test]
768    fn test_tiered_memtable_basic() {
769        let table = TieredMemTable::new();
770        
771        table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
772        table.write(b"key2", Some(b"value2".to_vec()), 1).unwrap();
773        
774        // Commit transaction
775        let mut write_set = HashSet::new();
776        write_set.insert(SmallVec::from_slice(b"key1"));
777        write_set.insert(SmallVec::from_slice(b"key2"));
778        table.commit(1, 100, &write_set);
779        
780        // Read back
781        let v1 = table.read(b"key1", 200, None);
782        let v2 = table.read(b"key2", 200, None);
783        
784        assert_eq!(v1, Some(b"value1".to_vec()));
785        assert_eq!(v2, Some(b"value2".to_vec()));
786    }
787
788    #[test]
789    fn test_tiered_memtable_uncommitted_own() {
790        let table = TieredMemTable::new();
791        
792        table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
793        
794        // Should see own uncommitted write
795        let v = table.read(b"key1", 100, Some(1));
796        assert_eq!(v, Some(b"value1".to_vec()));
797        
798        // Should not see other's uncommitted write
799        let v = table.read(b"key1", 100, Some(2));
800        assert_eq!(v, None);
801    }
802
803    #[test]
804    fn test_tiered_memtable_flush() {
805        let table = TieredMemTable::with_capacity(100);
806        
807        // Write enough to trigger flush
808        for i in 0..90 {
809            table.write(
810                format!("key{:04}", i).as_bytes(),
811                Some(format!("value{}", i).into_bytes()),
812                1,
813            ).unwrap();
814        }
815        
816        // Force flush
817        table.flush().unwrap();
818        
819        assert!(table.batch_count() >= 1);
820        assert_eq!(table.hot_buffer_len(), 0);
821    }
822
823    #[test]
824    fn test_tiered_memtable_scan_prefix() {
825        let table = TieredMemTable::new();
826        
827        table.write(b"users:1", Some(b"alice".to_vec()), 1).unwrap();
828        table.write(b"users:2", Some(b"bob".to_vec()), 1).unwrap();
829        table.write(b"posts:1", Some(b"post1".to_vec()), 1).unwrap();
830        
831        let mut write_set = HashSet::new();
832        write_set.insert(SmallVec::from_slice(b"users:1"));
833        write_set.insert(SmallVec::from_slice(b"users:2"));
834        write_set.insert(SmallVec::from_slice(b"posts:1"));
835        table.commit(1, 100, &write_set);
836        
837        let results = table.scan_prefix(b"users:", 200, None);
838        assert_eq!(results.len(), 2);
839    }
840
841    #[test]
842    fn test_sorted_batch() {
843        let entries = vec![
844            HotEntry::new(SmallVec::from_slice(b"c"), Some(b"3".to_vec()), 1, 3),
845            HotEntry::new(SmallVec::from_slice(b"a"), Some(b"1".to_vec()), 1, 1),
846            HotEntry::new(SmallVec::from_slice(b"b"), Some(b"2".to_vec()), 1, 2),
847        ];
848        
849        let batch = SortedBatch::from_unsorted(entries);
850        
851        assert_eq!(batch.len(), 3);
852        assert_eq!(batch.get(b"a").unwrap().value, Some(b"1".to_vec()));
853        assert_eq!(batch.get(b"b").unwrap().value, Some(b"2".to_vec()));
854        assert_eq!(batch.get(b"c").unwrap().value, Some(b"3".to_vec()));
855    }
856
857    #[test]
858    fn test_sorted_batch_prefix_scan() {
859        let entries = vec![
860            HotEntry::new(SmallVec::from_slice(b"ab"), Some(b"1".to_vec()), 1, 1),
861            HotEntry::new(SmallVec::from_slice(b"abc"), Some(b"2".to_vec()), 1, 2),
862            HotEntry::new(SmallVec::from_slice(b"abd"), Some(b"3".to_vec()), 1, 3),
863            HotEntry::new(SmallVec::from_slice(b"xyz"), Some(b"4".to_vec()), 1, 4),
864        ];
865        
866        let batch = SortedBatch::from_unsorted(entries);
867        let results: Vec<_> = batch.prefix_scan(b"ab").collect();
868        
869        assert_eq!(results.len(), 3);
870    }
871
872    #[test]
873    fn test_scan_prefix_tournament() {
874        let table = TieredMemTable::with_capacity(100);
875        
876        // Create multiple batches by flushing
877        for batch_idx in 0..3 {
878            for i in 0..10 {
879                let key = format!("users:{:02}", i);
880                let value = format!("value_batch{}_item{}", batch_idx, i);
881                table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
882            }
883            table.flush().unwrap();
884        }
885        
886        // Add some to hot buffer
887        for i in 0..5 {
888            let key = format!("users:{:02}", i);
889            let value = format!("newest_value_{}", i);
890            table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
891        }
892        
893        // Commit all
894        let mut write_set = HashSet::new();
895        for i in 0..10 {
896            write_set.insert(SmallVec::from_slice(format!("users:{:02}", i).as_bytes()));
897        }
898        table.commit(1, 100, &write_set);
899        
900        // Scan using tournament tree
901        let results = table.scan_prefix_tournament(b"users:", 200, None);
902        
903        // Should have 10 unique keys with newest values
904        assert_eq!(results.len(), 10);
905        
906        // First 5 should have "newest_value_X"
907        for (i, (key, value)) in results.iter().take(5).enumerate() {
908            let expected_key = format!("users:{:02}", i);
909            assert_eq!(key.as_slice(), expected_key.as_bytes());
910            assert!(String::from_utf8_lossy(value).starts_with("newest_value_"));
911        }
912    }
913
914    #[test]
915    fn test_scan_tournament_deduplication() {
916        let table = TieredMemTable::with_capacity(100);
917        
918        // Write same key multiple times across batches
919        table.write(b"key:001", Some(b"old1".to_vec()), 1).unwrap();
920        table.flush().unwrap();
921        
922        table.write(b"key:001", Some(b"old2".to_vec()), 1).unwrap();
923        table.flush().unwrap();
924        
925        table.write(b"key:001", Some(b"newest".to_vec()), 1).unwrap();
926        
927        // Commit
928        let mut write_set = HashSet::new();
929        write_set.insert(SmallVec::from_slice(b"key:001"));
930        table.commit(1, 100, &write_set);
931        
932        // Tournament scan should return only newest
933        let results = table.scan_prefix_tournament(b"key:", 200, None);
934        assert_eq!(results.len(), 1);
935        assert_eq!(results[0].1.as_slice(), b"newest");
936    }
937}