Skip to main content

sochdb_storage/
tiered_memtable.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Tiered SkipMap Elimination (Recommendation 3)
19//!
20//! ## Problem
21//!
22//! Current write path uses three concurrent data structures:
23//! ```ignore
24//! pub struct ArenaMvccMemTable {
25//!     data: DashMap<ArenaKeyHandle, VersionChain>,      // +47ns lookup
26//!     ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>, // +93ns insert
27//!     dirty_list: ArenaEpochDirtyList,                   // +15ns
28//! }
29//! ```
30//!
31//! The benchmark shows "No-SkipMap Mode" achieves 97% of SQLite.
32//! The SkipMap provides ordered iteration but costs 40% of insert time.
33//!
34//! ## Solution
35//!
36//! Tiered architecture:
37//! - **Hot tier**: Unsorted append-only buffer (O(1) insert)
38//! - **Warm tier**: Sorted batch (O(N log N) once at flush)
39//!
40//! ## Performance Analysis
41//!
42//! Current per-write cost:
43//! ```text
44//! T_write = T_dashmap_insert + T_skipmap_insert + T_dirty_list
45//!         = 47ns + 93ns + 15ns = 155ns
46//! ```
47//!
48//! Proposed per-write cost:
49//! ```text
50//! T_write = T_vec_push + T_dirty_list
51//!         = 5ns + 15ns = 20ns
52//! ```
53//!
54//! Amortized sort cost at flush (N = 100,000 rows):
55//! ```text
56//! T_sort = N × log(N) × comparison_cost
57//!        = 100,000 × 17 × 10ns = 17ms (once per flush)
58//! ```
59//!
60//! Net throughput: 1 / 20ns = 50M ops/sec (theoretical max)
61//! With other overhead: ~2M ops/sec (matches "No-SkipMap" benchmark result)
62
63use std::collections::HashSet;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
66
67use dashmap::DashMap;
68use parking_lot::{Mutex, RwLock};
69use smallvec::SmallVec;
70
71use crate::durable_storage::InlineKey;
72use sochdb_core::Result;
73
74/// Default hot buffer capacity before flush
75pub const DEFAULT_HOT_BUFFER_CAPACITY: usize = 100_000;
76
77/// Threshold to trigger automatic flush (fraction of capacity)
78pub const FLUSH_THRESHOLD_RATIO: f64 = 0.8;
79
80// =============================================================================
81// Hot Buffer Entry
82// =============================================================================
83
84/// Entry in the hot buffer (unsorted)
85#[derive(Debug, Clone)]
86pub struct HotEntry {
87    /// Key bytes (using SmallVec for inline storage of small keys)
88    pub key: InlineKey,
89    /// Value (None = tombstone)
90    pub value: Option<Vec<u8>>,
91    /// Transaction ID
92    pub txn_id: u64,
93    /// Insertion sequence number (for stable sorting)
94    pub seq: u64,
95}
96
97impl HotEntry {
98    pub fn new(key: InlineKey, value: Option<Vec<u8>>, txn_id: u64, seq: u64) -> Self {
99        Self {
100            key,
101            value,
102            txn_id,
103            seq,
104        }
105    }
106}
107
108// =============================================================================
109// Sorted Batch (Warm Tier)
110// =============================================================================
111
112/// A sorted batch of entries (immutable after creation)
113#[derive(Debug)]
114pub struct SortedBatch {
115    /// Sorted entries (by key, then by seq desc for same key)
116    entries: Vec<HotEntry>,
117    /// Index for binary search: key_hash -> first occurrence index
118    /// Uses hash for O(1) average lookup, falls back to binary search
119    key_index: DashMap<u64, usize>,
120    /// Minimum timestamp in this batch
121    min_ts: u64,
122    /// Maximum timestamp in this batch
123    max_ts: u64,
124}
125
126impl SortedBatch {
127    /// Create from unsorted entries
128    pub fn from_unsorted(mut entries: Vec<HotEntry>) -> Self {
129        if entries.is_empty() {
130            return Self {
131                entries: Vec::new(),
132                key_index: DashMap::new(),
133                min_ts: u64::MAX,
134                max_ts: 0,
135            };
136        }
137
138        // Sort by key, then by seq desc (newest first for same key)
139        entries.sort_unstable_by(|a, b| {
140            match a.key.as_slice().cmp(b.key.as_slice()) {
141                std::cmp::Ordering::Equal => b.seq.cmp(&a.seq), // Descending
142                other => other,
143            }
144        });
145
146        // Build key index (first occurrence of each key)
147        let key_index = DashMap::new();
148        let mut last_key: Option<&[u8]> = None;
149        for (idx, entry) in entries.iter().enumerate() {
150            if last_key != Some(entry.key.as_slice()) {
151                let hash = Self::hash_key(&entry.key);
152                key_index.insert(hash, idx);
153                last_key = Some(entry.key.as_slice());
154            }
155        }
156
157        // Calculate timestamp range
158        let min_ts = entries.iter().map(|e| e.seq).min().unwrap_or(u64::MAX);
159        let max_ts = entries.iter().map(|e| e.seq).max().unwrap_or(0);
160
161        Self {
162            entries,
163            key_index,
164            min_ts,
165            max_ts,
166        }
167    }
168
169    /// Hash key for index lookup
170    #[inline]
171    fn hash_key(key: &[u8]) -> u64 {
172        twox_hash::xxh3::hash64(key)
173    }
174
175    /// Get entry by key - O(1) average, O(log N) worst case
176    pub fn get(&self, key: &[u8]) -> Option<&HotEntry> {
177        let hash = Self::hash_key(key);
178
179        if let Some(idx) = self.key_index.get(&hash) {
180            // Verify key matches (handle hash collisions)
181            let idx = *idx;
182            if idx < self.entries.len() && self.entries[idx].key.as_slice() == key {
183                return Some(&self.entries[idx]);
184            }
185        }
186
187        // Fall back to binary search
188        self.entries
189            .binary_search_by(|e| e.key.as_slice().cmp(key))
190            .ok()
191            .map(|idx| &self.entries[idx])
192    }
193
194    /// Get all entries with given prefix - O(log N + K)
195    pub fn prefix_scan(&self, prefix: &[u8]) -> impl Iterator<Item = &HotEntry> {
196        // Find first entry >= prefix using binary search
197        let start_idx = self.entries.partition_point(|e| e.key.as_slice() < prefix);
198
199        self.entries[start_idx..]
200            .iter()
201            .take_while(move |e| e.key.starts_with(prefix))
202    }
203
204    /// Get entry count
205    pub fn len(&self) -> usize {
206        self.entries.len()
207    }
208
209    /// Check if empty
210    pub fn is_empty(&self) -> bool {
211        self.entries.is_empty()
212    }
213
214    /// Iterate all entries in sorted order
215    pub fn iter(&self) -> impl Iterator<Item = &HotEntry> {
216        self.entries.iter()
217    }
218
219    /// Get timestamp range
220    pub fn timestamp_range(&self) -> (u64, u64) {
221        (self.min_ts, self.max_ts)
222    }
223}
224
225// =============================================================================
226// Tiered MemTable
227// =============================================================================
228
229/// Tiered MemTable with hot buffer and warm sorted batches
230///
231/// ## Architecture
232///
233/// ```text
234/// ┌─────────────────────────────────────────────────────────┐
235/// │ Hot Buffer (Vec<HotEntry>)                              │
236/// │ • O(1) append                                           │
237/// │ • Unsorted                                              │
238/// │ • Current writes                                        │
239/// └─────────────────────────────────────────────────────────┘
240///                        ↓ flush
241/// ┌─────────────────────────────────────────────────────────┐
242/// │ Warm Batches (Vec<Arc<SortedBatch>>)                    │
243/// │ • Sorted (O(N log N) once)                              │
244/// │ • Immutable                                             │
245/// │ • Binary search reads                                   │
246/// └─────────────────────────────────────────────────────────┘
247/// ```
248pub struct TieredMemTable {
249    /// Hot buffer: unsorted, O(1) append
250    hot_buffer: RwLock<Vec<HotEntry>>,
251    /// Hot buffer capacity
252    hot_capacity: usize,
253    /// Warm batches: sorted, immutable
254    warm_batches: RwLock<Vec<Arc<SortedBatch>>>,
255    /// Hash index for point lookups (key_hash -> batch_idx, entry_idx)
256    /// Avoids scanning all batches for point lookups
257    #[allow(dead_code)]
258    point_index: DashMap<Vec<u8>, (usize, usize)>,
259    /// Sequence counter for ordering
260    seq_counter: AtomicU64,
261    /// Approximate size in bytes
262    size_bytes: AtomicU64,
263    /// Entry count
264    entry_count: AtomicUsize,
265    /// Pending commits (txn_id -> commit_ts)
266    pending_commits: DashMap<u64, u64>,
267    /// Read-write lock for flush coordination
268    flush_lock: Mutex<()>,
269}
270
271impl TieredMemTable {
272    pub fn new() -> Self {
273        Self::with_capacity(DEFAULT_HOT_BUFFER_CAPACITY)
274    }
275
276    pub fn with_capacity(capacity: usize) -> Self {
277        Self {
278            hot_buffer: RwLock::new(Vec::with_capacity(capacity)),
279            hot_capacity: capacity,
280            warm_batches: RwLock::new(Vec::new()),
281            point_index: DashMap::new(),
282            seq_counter: AtomicU64::new(1),
283            size_bytes: AtomicU64::new(0),
284            entry_count: AtomicUsize::new(0),
285            pending_commits: DashMap::new(),
286            flush_lock: Mutex::new(()),
287        }
288    }
289
290    /// Write a key-value pair (uncommitted) - O(1)
291    ///
292    /// This is the key optimization: append to unsorted buffer
293    /// instead of inserting into SkipMap.
294    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
295        let key_inline = SmallVec::from_slice(key);
296        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
297        let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
298
299        let entry = HotEntry::new(key_inline, value, txn_id, seq);
300
301        // Fast path: append to hot buffer (O(1))
302        {
303            let mut buffer = self.hot_buffer.write();
304            buffer.push(entry);
305        }
306
307        self.size_bytes
308            .fetch_add((key.len() + value_size) as u64, Ordering::Relaxed);
309        self.entry_count.fetch_add(1, Ordering::Relaxed);
310
311        // Check if flush needed
312        if self.should_flush() {
313            self.try_flush()?;
314        }
315
316        Ok(())
317    }
318
319    /// Write batch - O(n)
320    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
321        let mut total_size = 0u64;
322        let mut entries = Vec::with_capacity(writes.len());
323
324        for (key, value) in writes {
325            let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
326            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
327            total_size += (key.len() + value_size) as u64;
328
329            entries.push(HotEntry::new(
330                SmallVec::from_slice(key),
331                value.clone(),
332                txn_id,
333                seq,
334            ));
335        }
336
337        // Batch append (still O(n) but fewer lock acquisitions)
338        {
339            let mut buffer = self.hot_buffer.write();
340            buffer.extend(entries);
341        }
342
343        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
344        self.entry_count.fetch_add(writes.len(), Ordering::Relaxed);
345
346        if self.should_flush() {
347            self.try_flush()?;
348        }
349
350        Ok(())
351    }
352
353    /// Read at snapshot timestamp - O(log B + log N) where B = batch count
354    pub fn read(
355        &self,
356        key: &[u8],
357        snapshot_ts: u64,
358        current_txn_id: Option<u64>,
359    ) -> Option<Vec<u8>> {
360        // Check hot buffer first (most recent writes)
361        {
362            let buffer = self.hot_buffer.read();
363            // Scan backwards for most recent version
364            for entry in buffer.iter().rev() {
365                if entry.key.as_slice() == key {
366                    // Check MVCC visibility
367                    if self.is_visible(entry, snapshot_ts, current_txn_id) {
368                        return entry.value.clone();
369                    }
370                }
371            }
372        }
373
374        // Check warm batches (sorted, binary search)
375        {
376            let batches = self.warm_batches.read();
377            // Search from newest to oldest batch
378            for batch in batches.iter().rev() {
379                if let Some(entry) = batch.get(key) {
380                    if self.is_visible(entry, snapshot_ts, current_txn_id) {
381                        return entry.value.clone();
382                    }
383                }
384            }
385        }
386
387        None
388    }
389
390    /// Check if entry is visible at snapshot
391    #[inline]
392    fn is_visible(&self, entry: &HotEntry, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
393        // Own uncommitted write is always visible
394        if let Some(my_txn) = current_txn_id {
395            if entry.txn_id == my_txn {
396                return true;
397            }
398        }
399
400        // Check if committed before snapshot
401        if let Some(commit_ts) = self.pending_commits.get(&entry.txn_id) {
402            return *commit_ts < snapshot_ts;
403        }
404
405        false
406    }
407
408    /// Commit transaction
409    pub fn commit(&self, txn_id: u64, commit_ts: u64, _write_set: &HashSet<InlineKey>) {
410        // Record commit timestamp
411        self.pending_commits.insert(txn_id, commit_ts);
412
413        // Update point index for faster lookups
414        // In a real implementation, we'd update version chains here
415    }
416
417    /// Abort transaction
418    pub fn abort(&self, txn_id: u64) {
419        self.pending_commits.remove(&txn_id);
420
421        // Remove uncommitted entries from hot buffer
422        let mut buffer = self.hot_buffer.write();
423        buffer.retain(|e| e.txn_id != txn_id);
424    }
425
426    /// Scan with prefix - O(log N + K) - Legacy method using HashSet deduplication
427    ///
428    /// Consider using `scan_prefix_tournament` for better performance with many batches.
429    pub fn scan_prefix(
430        &self,
431        prefix: &[u8],
432        snapshot_ts: u64,
433        current_txn_id: Option<u64>,
434    ) -> Vec<(Vec<u8>, Vec<u8>)> {
435        let mut results = Vec::new();
436        let mut seen_keys: HashSet<Vec<u8>> = HashSet::new();
437
438        // Scan hot buffer first
439        {
440            let buffer = self.hot_buffer.read();
441            for entry in buffer.iter().rev() {
442                if entry.key.starts_with(prefix)
443                    && !seen_keys.contains(entry.key.as_slice())
444                    && self.is_visible(entry, snapshot_ts, current_txn_id)
445                {
446                    if let Some(ref value) = entry.value {
447                        results.push((entry.key.to_vec(), value.clone()));
448                        seen_keys.insert(entry.key.to_vec());
449                    }
450                }
451            }
452        }
453
454        // Scan warm batches
455        {
456            let batches = self.warm_batches.read();
457            for batch in batches.iter().rev() {
458                for entry in batch.prefix_scan(prefix) {
459                    if !seen_keys.contains(entry.key.as_slice())
460                        && self.is_visible(entry, snapshot_ts, current_txn_id)
461                    {
462                        if let Some(ref value) = entry.value {
463                            results.push((entry.key.to_vec(), value.clone()));
464                            seen_keys.insert(entry.key.to_vec());
465                        }
466                    }
467                }
468            }
469        }
470
471        // Sort results by key
472        results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
473        results
474    }
475
476    /// Scan with prefix using Tournament Tree K-way merge
477    ///
478    /// ## Performance
479    ///
480    /// For K sorted runs with total N matching entries:
481    /// - Time: O(N log K) vs O(N × K) for the naive approach
482    /// - Memory: O(K) for the tournament tree vs O(N) for HashSet dedup
483    ///
484    /// This is significantly faster when:
485    /// - K (number of batches) > 4
486    /// - N (total entries) is large
487    ///
488    /// ## Algorithm
489    ///
490    /// 1. Sort hot buffer entries with matching prefix
491    /// 2. Collect prefix-matching iterators from each sorted batch
492    /// 3. Merge using tournament tree (loser tree) for O(log K) per element
493    /// 4. Deduplicate by key (first occurrence wins = newest version)
494    /// 5. Filter by MVCC visibility
495    pub fn scan_prefix_tournament(
496        &self,
497        prefix: &[u8],
498        snapshot_ts: u64,
499        current_txn_id: Option<u64>,
500    ) -> Vec<(Vec<u8>, Vec<u8>)> {
501        use crate::tournament_tree::TournamentTree;
502
503        // Collect all sources
504        let mut sorted_sources: Vec<Vec<HotEntry>> = Vec::new();
505
506        // Source 0: Sorted hot buffer entries matching prefix
507        {
508            let buffer = self.hot_buffer.read();
509            let mut hot_entries: Vec<HotEntry> = buffer
510                .iter()
511                .filter(|e| e.key.starts_with(prefix))
512                .cloned()
513                .collect();
514
515            // Sort by key, then by seq desc (newest first for same key)
516            hot_entries.sort_unstable_by(|a, b| match a.key.as_slice().cmp(b.key.as_slice()) {
517                std::cmp::Ordering::Equal => b.seq.cmp(&a.seq),
518                other => other,
519            });
520
521            // Deduplicate within hot buffer (keep newest = first occurrence)
522            let mut seen = HashSet::new();
523            hot_entries.retain(|e| seen.insert(e.key.to_vec()));
524
525            if !hot_entries.is_empty() {
526                sorted_sources.push(hot_entries);
527            }
528        }
529
530        // Sources 1..K: Warm batches (already sorted)
531        {
532            let batches = self.warm_batches.read();
533            // Iterate from newest to oldest
534            for batch in batches.iter().rev() {
535                let entries: Vec<HotEntry> = batch.prefix_scan(prefix).cloned().collect();
536
537                if !entries.is_empty() {
538                    sorted_sources.push(entries);
539                }
540            }
541        }
542
543        if sorted_sources.is_empty() {
544            return Vec::new();
545        }
546
547        // Special case: single source, no merge needed
548        if sorted_sources.len() == 1 {
549            return sorted_sources
550                .into_iter()
551                .next()
552                .unwrap()
553                .into_iter()
554                .filter(|e| self.is_visible(e, snapshot_ts, current_txn_id))
555                .filter_map(|e| e.value.map(|v| (e.key.to_vec(), v)))
556                .collect();
557        }
558
559        // K-way merge using tournament tree
560        // Wrap entries with source index for priority-based comparison
561        // When keys are equal, lower source_idx wins (newer data)
562        #[derive(Clone)]
563        struct KeyedEntry {
564            entry: HotEntry,
565            source_idx: usize,
566        }
567
568        impl PartialEq for KeyedEntry {
569            fn eq(&self, other: &Self) -> bool {
570                self.entry.key.as_slice() == other.entry.key.as_slice()
571            }
572        }
573        impl Eq for KeyedEntry {}
574        impl PartialOrd for KeyedEntry {
575            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
576                Some(self.cmp(other))
577            }
578        }
579        impl Ord for KeyedEntry {
580            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
581                // Primary: sort by key
582                // Secondary: lower source_idx wins (source 0 = hot buffer = newest)
583                match self.entry.key.as_slice().cmp(other.entry.key.as_slice()) {
584                    std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
585                    other => other,
586                }
587            }
588        }
589
590        let iters: Vec<_> = sorted_sources
591            .into_iter()
592            .enumerate()
593            .map(|(source_idx, v)| {
594                v.into_iter().map(move |e| KeyedEntry {
595                    entry: e,
596                    source_idx,
597                })
598            })
599            .collect();
600
601        let mut tree = TournamentTree::new(iters);
602        let mut results = Vec::new();
603        let mut last_key: Option<Vec<u8>> = None;
604
605        // Merge with deduplication and visibility filtering
606        while let Some((_, keyed)) = tree.pop() {
607            let entry = keyed.entry;
608
609            // Deduplicate by key (first occurrence = from newest source due to ordering)
610            if let Some(ref last) = last_key {
611                if entry.key.as_slice() == last.as_slice() {
612                    continue;
613                }
614            }
615            last_key = Some(entry.key.to_vec());
616
617            // Check visibility
618            if !self.is_visible(&entry, snapshot_ts, current_txn_id) {
619                // Not visible at this snapshot, try next version
620                // Note: since we deduplicated, we might miss older visible versions
621                // In a production system, we'd need a more sophisticated approach
622                continue;
623            }
624
625            // Include non-tombstone entries
626            if let Some(value) = entry.value {
627                results.push((entry.key.to_vec(), value));
628            }
629        }
630
631        results
632    }
633
634    /// Check if flush is needed
635    fn should_flush(&self) -> bool {
636        let buffer = self.hot_buffer.read();
637        buffer.len() >= (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize
638    }
639
640    /// Try to flush hot buffer to warm batch
641    pub fn try_flush(&self) -> Result<()> {
642        // Try to acquire flush lock (non-blocking)
643        let guard = match self.flush_lock.try_lock() {
644            Some(g) => g,
645            None => return Ok(()), // Another thread is flushing
646        };
647
648        // Swap hot buffer with empty
649        let entries = {
650            let mut buffer = self.hot_buffer.write();
651            if buffer.len() < (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize {
652                // Buffer was flushed by another thread
653                return Ok(());
654            }
655            std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
656        };
657
658        if entries.is_empty() {
659            return Ok(());
660        }
661
662        // Create sorted batch (O(N log N))
663        let batch = Arc::new(SortedBatch::from_unsorted(entries));
664
665        // Add to warm batches
666        {
667            let mut batches = self.warm_batches.write();
668            batches.push(batch);
669        }
670
671        drop(guard);
672        Ok(())
673    }
674
675    /// Force flush hot buffer
676    pub fn flush(&self) -> Result<()> {
677        let _guard = self.flush_lock.lock();
678
679        let entries = {
680            let mut buffer = self.hot_buffer.write();
681            std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
682        };
683
684        if entries.is_empty() {
685            return Ok(());
686        }
687
688        let batch = Arc::new(SortedBatch::from_unsorted(entries));
689
690        {
691            let mut batches = self.warm_batches.write();
692            batches.push(batch);
693        }
694
695        Ok(())
696    }
697
698    /// Get approximate size in bytes
699    pub fn size(&self) -> u64 {
700        self.size_bytes.load(Ordering::Relaxed)
701    }
702
703    /// Get entry count
704    pub fn len(&self) -> usize {
705        self.entry_count.load(Ordering::Relaxed)
706    }
707
708    /// Check if empty
709    pub fn is_empty(&self) -> bool {
710        self.len() == 0
711    }
712
713    /// Get batch count
714    pub fn batch_count(&self) -> usize {
715        self.warm_batches.read().len()
716    }
717
718    /// Get hot buffer length
719    pub fn hot_buffer_len(&self) -> usize {
720        self.hot_buffer.read().len()
721    }
722
723    /// Compact warm batches
724    pub fn compact(&self) -> Result<()> {
725        let batches = {
726            let mut b = self.warm_batches.write();
727            std::mem::take(&mut *b)
728        };
729
730        if batches.len() <= 1 {
731            let mut b = self.warm_batches.write();
732            *b = batches;
733            return Ok(());
734        }
735
736        // Merge all batches into one
737        let all_entries: Vec<HotEntry> = batches.iter().flat_map(|b| b.iter().cloned()).collect();
738
739        // Re-sort
740        let merged = Arc::new(SortedBatch::from_unsorted(all_entries));
741
742        {
743            let mut b = self.warm_batches.write();
744            b.clear();
745            b.push(merged);
746        }
747
748        Ok(())
749    }
750}
751
752impl Default for TieredMemTable {
753    fn default() -> Self {
754        Self::new()
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761
762    #[test]
763    fn test_tiered_memtable_basic() {
764        let table = TieredMemTable::new();
765
766        table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
767        table.write(b"key2", Some(b"value2".to_vec()), 1).unwrap();
768
769        // Commit transaction
770        let mut write_set = HashSet::new();
771        write_set.insert(SmallVec::from_slice(b"key1"));
772        write_set.insert(SmallVec::from_slice(b"key2"));
773        table.commit(1, 100, &write_set);
774
775        // Read back
776        let v1 = table.read(b"key1", 200, None);
777        let v2 = table.read(b"key2", 200, None);
778
779        assert_eq!(v1, Some(b"value1".to_vec()));
780        assert_eq!(v2, Some(b"value2".to_vec()));
781    }
782
783    #[test]
784    fn test_tiered_memtable_uncommitted_own() {
785        let table = TieredMemTable::new();
786
787        table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
788
789        // Should see own uncommitted write
790        let v = table.read(b"key1", 100, Some(1));
791        assert_eq!(v, Some(b"value1".to_vec()));
792
793        // Should not see other's uncommitted write
794        let v = table.read(b"key1", 100, Some(2));
795        assert_eq!(v, None);
796    }
797
798    #[test]
799    fn test_tiered_memtable_flush() {
800        let table = TieredMemTable::with_capacity(100);
801
802        // Write enough to trigger flush
803        for i in 0..90 {
804            table
805                .write(
806                    format!("key{:04}", i).as_bytes(),
807                    Some(format!("value{}", i).into_bytes()),
808                    1,
809                )
810                .unwrap();
811        }
812
813        // Force flush
814        table.flush().unwrap();
815
816        assert!(table.batch_count() >= 1);
817        assert_eq!(table.hot_buffer_len(), 0);
818    }
819
820    #[test]
821    fn test_tiered_memtable_scan_prefix() {
822        let table = TieredMemTable::new();
823
824        table.write(b"users:1", Some(b"alice".to_vec()), 1).unwrap();
825        table.write(b"users:2", Some(b"bob".to_vec()), 1).unwrap();
826        table.write(b"posts:1", Some(b"post1".to_vec()), 1).unwrap();
827
828        let mut write_set = HashSet::new();
829        write_set.insert(SmallVec::from_slice(b"users:1"));
830        write_set.insert(SmallVec::from_slice(b"users:2"));
831        write_set.insert(SmallVec::from_slice(b"posts:1"));
832        table.commit(1, 100, &write_set);
833
834        let results = table.scan_prefix(b"users:", 200, None);
835        assert_eq!(results.len(), 2);
836    }
837
838    #[test]
839    fn test_sorted_batch() {
840        let entries = vec![
841            HotEntry::new(SmallVec::from_slice(b"c"), Some(b"3".to_vec()), 1, 3),
842            HotEntry::new(SmallVec::from_slice(b"a"), Some(b"1".to_vec()), 1, 1),
843            HotEntry::new(SmallVec::from_slice(b"b"), Some(b"2".to_vec()), 1, 2),
844        ];
845
846        let batch = SortedBatch::from_unsorted(entries);
847
848        assert_eq!(batch.len(), 3);
849        assert_eq!(batch.get(b"a").unwrap().value, Some(b"1".to_vec()));
850        assert_eq!(batch.get(b"b").unwrap().value, Some(b"2".to_vec()));
851        assert_eq!(batch.get(b"c").unwrap().value, Some(b"3".to_vec()));
852    }
853
854    #[test]
855    fn test_sorted_batch_prefix_scan() {
856        let entries = vec![
857            HotEntry::new(SmallVec::from_slice(b"ab"), Some(b"1".to_vec()), 1, 1),
858            HotEntry::new(SmallVec::from_slice(b"abc"), Some(b"2".to_vec()), 1, 2),
859            HotEntry::new(SmallVec::from_slice(b"abd"), Some(b"3".to_vec()), 1, 3),
860            HotEntry::new(SmallVec::from_slice(b"xyz"), Some(b"4".to_vec()), 1, 4),
861        ];
862
863        let batch = SortedBatch::from_unsorted(entries);
864        let results: Vec<_> = batch.prefix_scan(b"ab").collect();
865
866        assert_eq!(results.len(), 3);
867    }
868
869    #[test]
870    fn test_scan_prefix_tournament() {
871        let table = TieredMemTable::with_capacity(100);
872
873        // Create multiple batches by flushing
874        for batch_idx in 0..3 {
875            for i in 0..10 {
876                let key = format!("users:{:02}", i);
877                let value = format!("value_batch{}_item{}", batch_idx, i);
878                table
879                    .write(key.as_bytes(), Some(value.into_bytes()), 1)
880                    .unwrap();
881            }
882            table.flush().unwrap();
883        }
884
885        // Add some to hot buffer
886        for i in 0..5 {
887            let key = format!("users:{:02}", i);
888            let value = format!("newest_value_{}", i);
889            table
890                .write(key.as_bytes(), Some(value.into_bytes()), 1)
891                .unwrap();
892        }
893
894        // Commit all
895        let mut write_set = HashSet::new();
896        for i in 0..10 {
897            write_set.insert(SmallVec::from_slice(format!("users:{:02}", i).as_bytes()));
898        }
899        table.commit(1, 100, &write_set);
900
901        // Scan using tournament tree
902        let results = table.scan_prefix_tournament(b"users:", 200, None);
903
904        // Should have 10 unique keys with newest values
905        assert_eq!(results.len(), 10);
906
907        // First 5 should have "newest_value_X"
908        for (i, (key, value)) in results.iter().take(5).enumerate() {
909            let expected_key = format!("users:{:02}", i);
910            assert_eq!(key.as_slice(), expected_key.as_bytes());
911            assert!(String::from_utf8_lossy(value).starts_with("newest_value_"));
912        }
913    }
914
915    #[test]
916    fn test_scan_tournament_deduplication() {
917        let table = TieredMemTable::with_capacity(100);
918
919        // Write same key multiple times across batches
920        table.write(b"key:001", Some(b"old1".to_vec()), 1).unwrap();
921        table.flush().unwrap();
922
923        table.write(b"key:001", Some(b"old2".to_vec()), 1).unwrap();
924        table.flush().unwrap();
925
926        table
927            .write(b"key:001", Some(b"newest".to_vec()), 1)
928            .unwrap();
929
930        // Commit
931        let mut write_set = HashSet::new();
932        write_set.insert(SmallVec::from_slice(b"key:001"));
933        table.commit(1, 100, &write_set);
934
935        // Tournament scan should return only newest
936        let results = table.scan_prefix_tournament(b"key:", 200, None);
937        assert_eq!(results.len(), 1);
938        assert_eq!(results[0].1.as_slice(), b"newest");
939    }
940}