lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::{
7    value::{InternalValue, SeqNo, UserValue},
8    ValueType,
9};
10use crossbeam_skiplist::SkipMap;
11use std::ops::RangeBounds;
12use std::sync::atomic::AtomicU64;
13
14/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
15///
16/// When the Memtable exceeds some size, it should be flushed to a table.
17#[derive(Default)]
18pub struct Memtable {
19    /// The actual content, stored in a lock-free skiplist.
20    #[doc(hidden)]
21    pub items: SkipMap<InternalKey, UserValue>,
22
23    /// Approximate active memtable size.
24    ///
25    /// If this grows too large, a flush is triggered.
26    pub(crate) approximate_size: AtomicU64,
27
28    /// Highest encountered sequence number.
29    ///
30    /// This is used so that `get_highest_seqno` has O(1) complexity.
31    pub(crate) highest_seqno: AtomicU64,
32}
33
34impl Memtable {
35    /// Clears the memtable.
36    pub fn clear(&mut self) {
37        self.items.clear();
38        self.highest_seqno = AtomicU64::new(0);
39        self.approximate_size
40            .store(0, std::sync::atomic::Ordering::Release);
41    }
42
43    /// Creates an iterator over all items.
44    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
45        self.items.iter().map(|entry| InternalValue {
46            key: entry.key().clone(),
47            value: entry.value().clone(),
48        })
49    }
50
51    /// Creates an iterator over a range of items.
52    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
53        &'a self,
54        range: R,
55    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
56        self.items.range(range).map(|entry| InternalValue {
57            key: entry.key().clone(),
58            value: entry.value().clone(),
59        })
60    }
61
62    /// Returns the item by key if it exists.
63    ///
64    /// The item with the highest seqno will be returned, if `seqno` is None.
65    #[doc(hidden)]
66    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
67        if seqno == 0 {
68            return None;
69        }
70
71        // NOTE: This range start deserves some explanation...
72        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
73        // We search for the lowest entry that is greater or equal the user's prefix key
74        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
75        //
76        // Example: We search for "abc"
77        //
78        // key -> seqno
79        //
80        // a   -> 7
81        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
82        // abc -> 4
83        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
84        // abcdef -> 6
85        // abcdef -> 5
86        //
87        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
88
89        let mut iter = self
90            .items
91            .range(lower_bound..)
92            .take_while(|entry| &*entry.key().user_key == key);
93
94        iter.next().map(|entry| InternalValue {
95            key: entry.key().clone(),
96            value: entry.value().clone(),
97        })
98    }
99
100    /// Gets approximate size of memtable in bytes.
101    pub fn size(&self) -> u64 {
102        self.approximate_size
103            .load(std::sync::atomic::Ordering::Acquire)
104    }
105
106    /// Counts the number of items in the memtable.
107    pub fn len(&self) -> usize {
108        self.items.len()
109    }
110
111    /// Returns `true` if the memtable is empty.
112    #[must_use]
113    pub fn is_empty(&self) -> bool {
114        self.items.is_empty()
115    }
116
117    /// Inserts an item into the memtable
118    #[doc(hidden)]
119    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
120        // NOTE: We know keys are limited to 16-bit length + values are limited to 32-bit length
121        #[allow(clippy::cast_possible_truncation, clippy::expect_used)]
122        let item_size =
123            (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
124                .try_into()
125                .expect("should fit into u64");
126
127        let size_before = self
128            .approximate_size
129            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
130
131        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
132        self.items.insert(key, item.value);
133
134        self.highest_seqno
135            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
136
137        (item_size, size_before + item_size)
138    }
139
140    /// Returns the highest sequence number in the memtable.
141    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
142        if self.is_empty() {
143            None
144        } else {
145            Some(
146                self.highest_seqno
147                    .load(std::sync::atomic::Ordering::Acquire),
148            )
149        }
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::ValueType;
157    use test_log::test;
158
159    #[test]
160    #[allow(clippy::unwrap_used)]
161    fn memtable_mvcc_point_read() {
162        let memtable = Memtable::default();
163
164        memtable.insert(InternalValue::from_components(
165            *b"hello-key-999991",
166            *b"hello-value-999991",
167            0,
168            ValueType::Value,
169        ));
170
171        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
172        assert_eq!(None, item);
173
174        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
175        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
176
177        memtable.insert(InternalValue::from_components(
178            *b"hello-key-999991",
179            *b"hello-value-999991-2",
180            1,
181            ValueType::Value,
182        ));
183
184        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
185        assert_eq!(None, item);
186
187        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
188        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
189
190        let item = memtable.get(b"hello-key-99999", 1);
191        assert_eq!(None, item);
192
193        let item = memtable.get(b"hello-key-999991", 1);
194        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
195
196        let item = memtable.get(b"hello-key-99999", 2);
197        assert_eq!(None, item);
198
199        let item = memtable.get(b"hello-key-999991", 2);
200        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
201    }
202
203    #[test]
204    fn memtable_get() {
205        let memtable = Memtable::default();
206
207        let value =
208            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
209
210        memtable.insert(value.clone());
211
212        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
213    }
214
215    #[test]
216    fn memtable_get_highest_seqno() {
217        let memtable = Memtable::default();
218
219        memtable.insert(InternalValue::from_components(
220            b"abc".to_vec(),
221            b"abc".to_vec(),
222            0,
223            ValueType::Value,
224        ));
225        memtable.insert(InternalValue::from_components(
226            b"abc".to_vec(),
227            b"abc".to_vec(),
228            1,
229            ValueType::Value,
230        ));
231        memtable.insert(InternalValue::from_components(
232            b"abc".to_vec(),
233            b"abc".to_vec(),
234            2,
235            ValueType::Value,
236        ));
237        memtable.insert(InternalValue::from_components(
238            b"abc".to_vec(),
239            b"abc".to_vec(),
240            3,
241            ValueType::Value,
242        ));
243        memtable.insert(InternalValue::from_components(
244            b"abc".to_vec(),
245            b"abc".to_vec(),
246            4,
247            ValueType::Value,
248        ));
249
250        assert_eq!(
251            Some(InternalValue::from_components(
252                b"abc".to_vec(),
253                b"abc".to_vec(),
254                4,
255                ValueType::Value,
256            )),
257            memtable.get(b"abc", SeqNo::MAX)
258        );
259    }
260
261    #[test]
262    fn memtable_get_prefix() {
263        let memtable = Memtable::default();
264
265        memtable.insert(InternalValue::from_components(
266            b"abc0".to_vec(),
267            b"abc".to_vec(),
268            0,
269            ValueType::Value,
270        ));
271        memtable.insert(InternalValue::from_components(
272            b"abc".to_vec(),
273            b"abc".to_vec(),
274            255,
275            ValueType::Value,
276        ));
277
278        assert_eq!(
279            Some(InternalValue::from_components(
280                b"abc".to_vec(),
281                b"abc".to_vec(),
282                255,
283                ValueType::Value,
284            )),
285            memtable.get(b"abc", SeqNo::MAX)
286        );
287
288        assert_eq!(
289            Some(InternalValue::from_components(
290                b"abc0".to_vec(),
291                b"abc".to_vec(),
292                0,
293                ValueType::Value,
294            )),
295            memtable.get(b"abc0", SeqNo::MAX)
296        );
297    }
298
299    #[test]
300    fn memtable_get_old_version() {
301        let memtable = Memtable::default();
302
303        memtable.insert(InternalValue::from_components(
304            b"abc".to_vec(),
305            b"abc".to_vec(),
306            0,
307            ValueType::Value,
308        ));
309        memtable.insert(InternalValue::from_components(
310            b"abc".to_vec(),
311            b"abc".to_vec(),
312            99,
313            ValueType::Value,
314        ));
315        memtable.insert(InternalValue::from_components(
316            b"abc".to_vec(),
317            b"abc".to_vec(),
318            255,
319            ValueType::Value,
320        ));
321
322        assert_eq!(
323            Some(InternalValue::from_components(
324                b"abc".to_vec(),
325                b"abc".to_vec(),
326                255,
327                ValueType::Value,
328            )),
329            memtable.get(b"abc", SeqNo::MAX)
330        );
331
332        assert_eq!(
333            Some(InternalValue::from_components(
334                b"abc".to_vec(),
335                b"abc".to_vec(),
336                99,
337                ValueType::Value,
338            )),
339            memtable.get(b"abc", 100)
340        );
341
342        assert_eq!(
343            Some(InternalValue::from_components(
344                b"abc".to_vec(),
345                b"abc".to_vec(),
346                0,
347                ValueType::Value,
348            )),
349            memtable.get(b"abc", 50)
350        );
351    }
352}