rustlite_storage/
memtable.rs

1//! Memtable - In-memory sorted write buffer
2//!
3//! The Memtable is an in-memory data structure that holds recent writes
4//! before they are flushed to disk as SSTables. It uses a BTreeMap for
5//! sorted key order, which enables efficient range scans and ordered iteration.
6
7use std::collections::BTreeMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10/// Entry value in the memtable - can be a value or a tombstone (deletion marker)
11#[derive(Debug, Clone, PartialEq)]
12pub enum MemtableEntry {
13    /// A live value
14    Value(Vec<u8>),
15    /// A tombstone marking deletion
16    Tombstone,
17}
18
19impl MemtableEntry {
20    /// Returns the size of this entry in bytes
21    pub fn size(&self) -> usize {
22        match self {
23            MemtableEntry::Value(v) => v.len() + 1, // +1 for type tag
24            MemtableEntry::Tombstone => 1,
25        }
26    }
27}
28
29/// Memtable - an in-memory sorted write buffer
30///
31/// Provides O(log n) insert, lookup, and delete operations.
32/// When the memtable reaches a size threshold, it should be flushed
33/// to disk as an SSTable.
34#[derive(Debug)]
35pub struct Memtable {
36    /// The underlying sorted map
37    data: BTreeMap<Vec<u8>, MemtableEntry>,
38    /// Approximate size in bytes (for flush threshold checking)
39    size_bytes: AtomicU64,
40    /// Sequence number for MVCC (future use)
41    sequence: AtomicU64,
42}
43
44impl Memtable {
45    /// Creates a new empty Memtable
46    pub fn new() -> Self {
47        Self {
48            data: BTreeMap::new(),
49            size_bytes: AtomicU64::new(0),
50            sequence: AtomicU64::new(0),
51        }
52    }
53
54    /// Creates a new Memtable with a starting sequence number
55    pub fn with_sequence(sequence: u64) -> Self {
56        Self {
57            data: BTreeMap::new(),
58            size_bytes: AtomicU64::new(0),
59            sequence: AtomicU64::new(sequence),
60        }
61    }
62
63    /// Inserts or updates a key-value pair
64    pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) {
65        let key_size = key.len() as u64;
66        let value_size = value.len() as u64 + 1; // +1 for entry type
67
68        // Remove old entry size if exists
69        if let Some(old) = self.data.get(&key) {
70            let old_size = old.size() as u64;
71            self.size_bytes.fetch_sub(key_size + old_size, Ordering::Relaxed);
72        }
73
74        self.data.insert(key.clone(), MemtableEntry::Value(value));
75        self.size_bytes.fetch_add(key_size + value_size, Ordering::Relaxed);
76        self.sequence.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Retrieves a value by key
80    ///
81    /// Returns:
82    /// - `Some(Some(value))` if the key exists with a value
83    /// - `Some(None)` if the key was deleted (tombstone)
84    /// - `None` if the key is not in the memtable
85    pub fn get(&self, key: &[u8]) -> Option<Option<&[u8]>> {
86        self.data.get(key).map(|entry| match entry {
87            MemtableEntry::Value(v) => Some(v.as_slice()),
88            MemtableEntry::Tombstone => None,
89        })
90    }
91
92    /// Marks a key as deleted with a tombstone
93    pub fn delete(&mut self, key: Vec<u8>) {
94        let key_size = key.len() as u64;
95
96        // Remove old entry size if exists
97        if let Some(old) = self.data.get(&key) {
98            let old_size = old.size() as u64;
99            self.size_bytes.fetch_sub(key_size + old_size, Ordering::Relaxed);
100        }
101
102        self.data.insert(key.clone(), MemtableEntry::Tombstone);
103        self.size_bytes.fetch_add(key_size + 1, Ordering::Relaxed); // +1 for tombstone
104        self.sequence.fetch_add(1, Ordering::Relaxed);
105    }
106
107    /// Returns the approximate size of the memtable in bytes
108    pub fn size_bytes(&self) -> u64 {
109        self.size_bytes.load(Ordering::Relaxed)
110    }
111
112    /// Returns the number of entries in the memtable
113    pub fn len(&self) -> usize {
114        self.data.len()
115    }
116
117    /// Returns true if the memtable is empty
118    pub fn is_empty(&self) -> bool {
119        self.data.is_empty()
120    }
121
122    /// Returns the current sequence number
123    pub fn sequence(&self) -> u64 {
124        self.sequence.load(Ordering::Relaxed)
125    }
126
127    /// Returns an iterator over all entries in sorted order
128    pub fn iter(&self) -> impl Iterator<Item = (&Vec<u8>, &MemtableEntry)> {
129        self.data.iter()
130    }
131
132    /// Returns an iterator over a range of keys
133    pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&Vec<u8>, &MemtableEntry)>
134    where
135        R: std::ops::RangeBounds<Vec<u8>>,
136    {
137        self.data.range(range)
138    }
139
140    /// Clears the memtable
141    pub fn clear(&mut self) {
142        self.data.clear();
143        self.size_bytes.store(0, Ordering::Relaxed);
144    }
145
146    /// Consumes the memtable and returns all entries sorted by key
147    pub fn into_iter(self) -> impl Iterator<Item = (Vec<u8>, MemtableEntry)> {
148        self.data.into_iter()
149    }
150}
151
152impl Default for Memtable {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_memtable_new() {
164        let mt = Memtable::new();
165        assert!(mt.is_empty());
166        assert_eq!(mt.len(), 0);
167        assert_eq!(mt.size_bytes(), 0);
168    }
169
170    #[test]
171    fn test_memtable_put_get() {
172        let mut mt = Memtable::new();
173        
174        mt.put(b"key1".to_vec(), b"value1".to_vec());
175        mt.put(b"key2".to_vec(), b"value2".to_vec());
176        
177        assert_eq!(mt.len(), 2);
178        assert_eq!(mt.get(b"key1"), Some(Some(b"value1".as_slice())));
179        assert_eq!(mt.get(b"key2"), Some(Some(b"value2".as_slice())));
180        assert_eq!(mt.get(b"key3"), None);
181    }
182
183    #[test]
184    fn test_memtable_update() {
185        let mut mt = Memtable::new();
186        
187        mt.put(b"key".to_vec(), b"value1".to_vec());
188        assert_eq!(mt.get(b"key"), Some(Some(b"value1".as_slice())));
189        
190        mt.put(b"key".to_vec(), b"value2".to_vec());
191        assert_eq!(mt.get(b"key"), Some(Some(b"value2".as_slice())));
192        assert_eq!(mt.len(), 1);
193    }
194
195    #[test]
196    fn test_memtable_delete() {
197        let mut mt = Memtable::new();
198        
199        mt.put(b"key".to_vec(), b"value".to_vec());
200        assert_eq!(mt.get(b"key"), Some(Some(b"value".as_slice())));
201        
202        mt.delete(b"key".to_vec());
203        // Key exists but is a tombstone
204        assert_eq!(mt.get(b"key"), Some(None));
205        assert_eq!(mt.len(), 1);
206    }
207
208    #[test]
209    fn test_memtable_size_tracking() {
210        let mut mt = Memtable::new();
211        
212        let initial_size = mt.size_bytes();
213        mt.put(b"key".to_vec(), b"value".to_vec());
214        
215        // Size should have increased
216        assert!(mt.size_bytes() > initial_size);
217    }
218
219    #[test]
220    fn test_memtable_iter_sorted() {
221        let mut mt = Memtable::new();
222        
223        // Insert in random order
224        mt.put(b"c".to_vec(), b"3".to_vec());
225        mt.put(b"a".to_vec(), b"1".to_vec());
226        mt.put(b"b".to_vec(), b"2".to_vec());
227        
228        // Iteration should be sorted
229        let keys: Vec<_> = mt.iter().map(|(k, _)| k.clone()).collect();
230        assert_eq!(keys, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
231    }
232
233    #[test]
234    fn test_memtable_sequence() {
235        let mut mt = Memtable::with_sequence(100);
236        assert_eq!(mt.sequence(), 100);
237        
238        mt.put(b"key".to_vec(), b"value".to_vec());
239        assert_eq!(mt.sequence(), 101);
240        
241        mt.delete(b"key".to_vec());
242        assert_eq!(mt.sequence(), 102);
243    }
244
245    #[test]
246    fn test_memtable_clear() {
247        let mut mt = Memtable::new();
248        
249        mt.put(b"key1".to_vec(), b"value1".to_vec());
250        mt.put(b"key2".to_vec(), b"value2".to_vec());
251        
252        assert_eq!(mt.len(), 2);
253        
254        mt.clear();
255        
256        assert!(mt.is_empty());
257        assert_eq!(mt.size_bytes(), 0);
258    }
259}