lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::{
7    value::{InternalValue, SeqNo, UserValue},
8    ValueType,
9};
10use crossbeam_skiplist::SkipMap;
11use std::ops::RangeBounds;
12use std::sync::atomic::AtomicU64;
13
14/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
15///
16/// When the Memtable exceeds some size, it should be flushed to a table.
17#[derive(Default)]
18pub struct Memtable {
19    /// The actual content, stored in a lock-free skiplist.
20    #[doc(hidden)]
21    pub items: SkipMap<InternalKey, UserValue>,
22
23    /// Approximate active memtable size.
24    ///
25    /// If this grows too large, a flush is triggered.
26    pub(crate) approximate_size: AtomicU64,
27
28    /// Highest encountered sequence number.
29    ///
30    /// This is used so that `get_highest_seqno` has O(1) complexity.
31    pub(crate) highest_seqno: AtomicU64,
32}
33
34impl Memtable {
35    /// Clears the memtable.
36    pub fn clear(&mut self) {
37        self.items.clear();
38        self.highest_seqno = AtomicU64::new(0);
39        self.approximate_size
40            .store(0, std::sync::atomic::Ordering::Release);
41    }
42
43    /// Creates an iterator over all items.
44    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
45        self.items.iter().map(|entry| InternalValue {
46            key: entry.key().clone(),
47            value: entry.value().clone(),
48        })
49    }
50
51    /// Creates an iterator over a range of items.
52    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
53        &'a self,
54        range: R,
55    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
56        self.items.range(range).map(|entry| InternalValue {
57            key: entry.key().clone(),
58            value: entry.value().clone(),
59        })
60    }
61
62    /// Returns the item by key if it exists.
63    ///
64    /// The item with the highest seqno will be returned, if `seqno` is None.
65    #[doc(hidden)]
66    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
67        if seqno == 0 {
68            return None;
69        }
70
71        // NOTE: This range start deserves some explanation...
72        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
73        // We search for the lowest entry that is greater or equal the user's prefix key
74        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
75        //
76        // Example: We search for "abc"
77        //
78        // key -> seqno
79        //
80        // a   -> 7
81        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
82        // abc -> 4
83        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
84        // abcdef -> 6
85        // abcdef -> 5
86        //
87        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
88
89        let mut iter = self
90            .items
91            .range(lower_bound..)
92            .take_while(|entry| &*entry.key().user_key == key);
93
94        iter.next().map(|entry| InternalValue {
95            key: entry.key().clone(),
96            value: entry.value().clone(),
97        })
98    }
99
100    /// Gets approximate size of memtable in bytes.
101    pub fn size(&self) -> u64 {
102        self.approximate_size
103            .load(std::sync::atomic::Ordering::Acquire)
104    }
105
106    /// Counts the number of items in the memtable.
107    pub fn len(&self) -> usize {
108        self.items.len()
109    }
110
111    /// Returns `true` if the memtable is empty.
112    #[must_use]
113    pub fn is_empty(&self) -> bool {
114        self.items.is_empty()
115    }
116
117    /// Inserts an item into the memtable
118    #[doc(hidden)]
119    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
120        #[expect(
121            clippy::expect_used,
122            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
123        )]
124        let item_size =
125            (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
126                .try_into()
127                .expect("should fit into u64");
128
129        let size_before = self
130            .approximate_size
131            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
132
133        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
134        self.items.insert(key, item.value);
135
136        self.highest_seqno
137            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
138
139        (item_size, size_before + item_size)
140    }
141
142    /// Returns the highest sequence number in the memtable.
143    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
144        if self.is_empty() {
145            None
146        } else {
147            Some(
148                self.highest_seqno
149                    .load(std::sync::atomic::Ordering::Acquire),
150            )
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::ValueType;
159    use test_log::test;
160
161    #[test]
162    #[expect(clippy::unwrap_used)]
163    fn memtable_mvcc_point_read() {
164        let memtable = Memtable::default();
165
166        memtable.insert(InternalValue::from_components(
167            *b"hello-key-999991",
168            *b"hello-value-999991",
169            0,
170            ValueType::Value,
171        ));
172
173        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
174        assert_eq!(None, item);
175
176        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
177        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
178
179        memtable.insert(InternalValue::from_components(
180            *b"hello-key-999991",
181            *b"hello-value-999991-2",
182            1,
183            ValueType::Value,
184        ));
185
186        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
187        assert_eq!(None, item);
188
189        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
190        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
191
192        let item = memtable.get(b"hello-key-99999", 1);
193        assert_eq!(None, item);
194
195        let item = memtable.get(b"hello-key-999991", 1);
196        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
197
198        let item = memtable.get(b"hello-key-99999", 2);
199        assert_eq!(None, item);
200
201        let item = memtable.get(b"hello-key-999991", 2);
202        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
203    }
204
205    #[test]
206    fn memtable_get() {
207        let memtable = Memtable::default();
208
209        let value =
210            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
211
212        memtable.insert(value.clone());
213
214        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
215    }
216
217    #[test]
218    fn memtable_get_highest_seqno() {
219        let memtable = Memtable::default();
220
221        memtable.insert(InternalValue::from_components(
222            b"abc".to_vec(),
223            b"abc".to_vec(),
224            0,
225            ValueType::Value,
226        ));
227        memtable.insert(InternalValue::from_components(
228            b"abc".to_vec(),
229            b"abc".to_vec(),
230            1,
231            ValueType::Value,
232        ));
233        memtable.insert(InternalValue::from_components(
234            b"abc".to_vec(),
235            b"abc".to_vec(),
236            2,
237            ValueType::Value,
238        ));
239        memtable.insert(InternalValue::from_components(
240            b"abc".to_vec(),
241            b"abc".to_vec(),
242            3,
243            ValueType::Value,
244        ));
245        memtable.insert(InternalValue::from_components(
246            b"abc".to_vec(),
247            b"abc".to_vec(),
248            4,
249            ValueType::Value,
250        ));
251
252        assert_eq!(
253            Some(InternalValue::from_components(
254                b"abc".to_vec(),
255                b"abc".to_vec(),
256                4,
257                ValueType::Value,
258            )),
259            memtable.get(b"abc", SeqNo::MAX)
260        );
261    }
262
263    #[test]
264    fn memtable_get_prefix() {
265        let memtable = Memtable::default();
266
267        memtable.insert(InternalValue::from_components(
268            b"abc0".to_vec(),
269            b"abc".to_vec(),
270            0,
271            ValueType::Value,
272        ));
273        memtable.insert(InternalValue::from_components(
274            b"abc".to_vec(),
275            b"abc".to_vec(),
276            255,
277            ValueType::Value,
278        ));
279
280        assert_eq!(
281            Some(InternalValue::from_components(
282                b"abc".to_vec(),
283                b"abc".to_vec(),
284                255,
285                ValueType::Value,
286            )),
287            memtable.get(b"abc", SeqNo::MAX)
288        );
289
290        assert_eq!(
291            Some(InternalValue::from_components(
292                b"abc0".to_vec(),
293                b"abc".to_vec(),
294                0,
295                ValueType::Value,
296            )),
297            memtable.get(b"abc0", SeqNo::MAX)
298        );
299    }
300
301    #[test]
302    fn memtable_get_old_version() {
303        let memtable = Memtable::default();
304
305        memtable.insert(InternalValue::from_components(
306            b"abc".to_vec(),
307            b"abc".to_vec(),
308            0,
309            ValueType::Value,
310        ));
311        memtable.insert(InternalValue::from_components(
312            b"abc".to_vec(),
313            b"abc".to_vec(),
314            99,
315            ValueType::Value,
316        ));
317        memtable.insert(InternalValue::from_components(
318            b"abc".to_vec(),
319            b"abc".to_vec(),
320            255,
321            ValueType::Value,
322        ));
323
324        assert_eq!(
325            Some(InternalValue::from_components(
326                b"abc".to_vec(),
327                b"abc".to_vec(),
328                255,
329                ValueType::Value,
330            )),
331            memtable.get(b"abc", SeqNo::MAX)
332        );
333
334        assert_eq!(
335            Some(InternalValue::from_components(
336                b"abc".to_vec(),
337                b"abc".to_vec(),
338                99,
339                ValueType::Value,
340            )),
341            memtable.get(b"abc", 100)
342        );
343
344        assert_eq!(
345            Some(InternalValue::from_components(
346                b"abc".to_vec(),
347                b"abc".to_vec(),
348                0,
349                ValueType::Value,
350            )),
351            memtable.get(b"abc", 50)
352        );
353    }
354}