rustlite_storage/
memtable.rs

1//! Memtable - In-memory sorted write buffer
2//!
3//! The Memtable is an in-memory data structure that holds recent writes
4//! before they are flushed to disk as SSTables. It uses a BTreeMap for
5//! sorted key order, which enables efficient range scans and ordered iteration.
6
7use std::collections::BTreeMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10/// Entry value in the memtable - can be a value or a tombstone (deletion marker)
11#[derive(Debug, Clone, PartialEq)]
12pub enum MemtableEntry {
13    /// A live value
14    Value(Vec<u8>),
15    /// A tombstone marking deletion
16    Tombstone,
17}
18
19impl MemtableEntry {
20    /// Returns the size of this entry in bytes
21    pub fn size(&self) -> usize {
22        match self {
23            MemtableEntry::Value(v) => v.len() + 1, // +1 for type tag
24            MemtableEntry::Tombstone => 1,
25        }
26    }
27}
28
29/// Memtable - an in-memory sorted write buffer
30///
31/// Provides O(log n) insert, lookup, and delete operations.
32/// When the memtable reaches a size threshold, it should be flushed
33/// to disk as an SSTable.
34#[derive(Debug)]
35pub struct Memtable {
36    /// The underlying sorted map
37    data: BTreeMap<Vec<u8>, MemtableEntry>,
38    /// Approximate size in bytes (for flush threshold checking)
39    size_bytes: AtomicU64,
40    /// Sequence number for MVCC (future use)
41    sequence: AtomicU64,
42}
43
44impl Memtable {
45    /// Creates a new empty Memtable
46    pub fn new() -> Self {
47        Self {
48            data: BTreeMap::new(),
49            size_bytes: AtomicU64::new(0),
50            sequence: AtomicU64::new(0),
51        }
52    }
53
54    /// Creates a new Memtable with a starting sequence number
55    pub fn with_sequence(sequence: u64) -> Self {
56        Self {
57            data: BTreeMap::new(),
58            size_bytes: AtomicU64::new(0),
59            sequence: AtomicU64::new(sequence),
60        }
61    }
62
63    /// Inserts or updates a key-value pair
64    pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) {
65        let key_size = key.len() as u64;
66        let value_size = value.len() as u64 + 1; // +1 for entry type
67
68        // Remove old entry size if exists
69        if let Some(old) = self.data.get(&key) {
70            let old_size = old.size() as u64;
71            self.size_bytes
72                .fetch_sub(key_size + old_size, Ordering::Relaxed);
73        }
74
75        self.data.insert(key.clone(), MemtableEntry::Value(value));
76        self.size_bytes
77            .fetch_add(key_size + value_size, Ordering::Relaxed);
78        self.sequence.fetch_add(1, Ordering::Relaxed);
79    }
80
81    /// Retrieves a value by key
82    ///
83    /// Returns:
84    /// - `Some(Some(value))` if the key exists with a value
85    /// - `Some(None)` if the key was deleted (tombstone)
86    /// - `None` if the key is not in the memtable
87    pub fn get(&self, key: &[u8]) -> Option<Option<&[u8]>> {
88        self.data.get(key).map(|entry| match entry {
89            MemtableEntry::Value(v) => Some(v.as_slice()),
90            MemtableEntry::Tombstone => None,
91        })
92    }
93
94    /// Marks a key as deleted with a tombstone
95    pub fn delete(&mut self, key: Vec<u8>) {
96        let key_size = key.len() as u64;
97
98        // Remove old entry size if exists
99        if let Some(old) = self.data.get(&key) {
100            let old_size = old.size() as u64;
101            self.size_bytes
102                .fetch_sub(key_size + old_size, Ordering::Relaxed);
103        }
104
105        self.data.insert(key.clone(), MemtableEntry::Tombstone);
106        self.size_bytes.fetch_add(key_size + 1, Ordering::Relaxed); // +1 for tombstone
107        self.sequence.fetch_add(1, Ordering::Relaxed);
108    }
109
110    /// Returns the approximate size of the memtable in bytes
111    pub fn size_bytes(&self) -> u64 {
112        self.size_bytes.load(Ordering::Relaxed)
113    }
114
115    /// Returns the number of entries in the memtable
116    pub fn len(&self) -> usize {
117        self.data.len()
118    }
119
120    /// Returns true if the memtable is empty
121    pub fn is_empty(&self) -> bool {
122        self.data.is_empty()
123    }
124
125    /// Returns the current sequence number
126    pub fn sequence(&self) -> u64 {
127        self.sequence.load(Ordering::Relaxed)
128    }
129
130    /// Returns an iterator over all entries in sorted order
131    pub fn iter(&self) -> impl Iterator<Item = (&Vec<u8>, &MemtableEntry)> {
132        self.data.iter()
133    }
134
135    /// Returns an iterator over a range of keys
136    pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&Vec<u8>, &MemtableEntry)>
137    where
138        R: std::ops::RangeBounds<Vec<u8>>,
139    {
140        self.data.range(range)
141    }
142
143    /// Clears the memtable
144    pub fn clear(&mut self) {
145        self.data.clear();
146        self.size_bytes.store(0, Ordering::Relaxed);
147    }
148
149    /// Consumes the memtable and returns all entries sorted by key
150    pub fn drain(self) -> impl Iterator<Item = (Vec<u8>, MemtableEntry)> {
151        self.data.into_iter()
152    }
153}
154
155impl Default for Memtable {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[test]
166    fn test_memtable_new() {
167        let mt = Memtable::new();
168        assert!(mt.is_empty());
169        assert_eq!(mt.len(), 0);
170        assert_eq!(mt.size_bytes(), 0);
171    }
172
173    #[test]
174    fn test_memtable_put_get() {
175        let mut mt = Memtable::new();
176
177        mt.put(b"key1".to_vec(), b"value1".to_vec());
178        mt.put(b"key2".to_vec(), b"value2".to_vec());
179
180        assert_eq!(mt.len(), 2);
181        assert_eq!(mt.get(b"key1"), Some(Some(b"value1".as_slice())));
182        assert_eq!(mt.get(b"key2"), Some(Some(b"value2".as_slice())));
183        assert_eq!(mt.get(b"key3"), None);
184    }
185
186    #[test]
187    fn test_memtable_update() {
188        let mut mt = Memtable::new();
189
190        mt.put(b"key".to_vec(), b"value1".to_vec());
191        assert_eq!(mt.get(b"key"), Some(Some(b"value1".as_slice())));
192
193        mt.put(b"key".to_vec(), b"value2".to_vec());
194        assert_eq!(mt.get(b"key"), Some(Some(b"value2".as_slice())));
195        assert_eq!(mt.len(), 1);
196    }
197
198    #[test]
199    fn test_memtable_delete() {
200        let mut mt = Memtable::new();
201
202        mt.put(b"key".to_vec(), b"value".to_vec());
203        assert_eq!(mt.get(b"key"), Some(Some(b"value".as_slice())));
204
205        mt.delete(b"key".to_vec());
206        // Key exists but is a tombstone
207        assert_eq!(mt.get(b"key"), Some(None));
208        assert_eq!(mt.len(), 1);
209    }
210
211    #[test]
212    fn test_memtable_size_tracking() {
213        let mut mt = Memtable::new();
214
215        let initial_size = mt.size_bytes();
216        mt.put(b"key".to_vec(), b"value".to_vec());
217
218        // Size should have increased
219        assert!(mt.size_bytes() > initial_size);
220    }
221
222    #[test]
223    fn test_memtable_iter_sorted() {
224        let mut mt = Memtable::new();
225
226        // Insert in random order
227        mt.put(b"c".to_vec(), b"3".to_vec());
228        mt.put(b"a".to_vec(), b"1".to_vec());
229        mt.put(b"b".to_vec(), b"2".to_vec());
230
231        // Iteration should be sorted
232        let keys: Vec<_> = mt.iter().map(|(k, _)| k.clone()).collect();
233        assert_eq!(keys, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
234    }
235
236    #[test]
237    fn test_memtable_sequence() {
238        let mut mt = Memtable::with_sequence(100);
239        assert_eq!(mt.sequence(), 100);
240
241        mt.put(b"key".to_vec(), b"value".to_vec());
242        assert_eq!(mt.sequence(), 101);
243
244        mt.delete(b"key".to_vec());
245        assert_eq!(mt.sequence(), 102);
246    }
247
248    #[test]
249    fn test_memtable_clear() {
250        let mut mt = Memtable::new();
251
252        mt.put(b"key1".to_vec(), b"value1".to_vec());
253        mt.put(b"key2".to_vec(), b"value2".to_vec());
254
255        assert_eq!(mt.len(), 2);
256
257        mt.clear();
258
259        assert!(mt.is_empty());
260        assert_eq!(mt.size_bytes(), 0);
261    }
262}