lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
7use crossbeam_skiplist::SkipMap;
8use std::ops::RangeBounds;
9use std::sync::atomic::AtomicU64;
10
11/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
12///
13/// When the Memtable exceeds some size, it should be flushed to a disk segment.
14#[derive(Default)]
15pub struct Memtable {
16    /// The actual content, stored in a lock-free skiplist.
17    #[doc(hidden)]
18    pub items: SkipMap<InternalKey, UserValue>,
19
20    /// Approximate active memtable size.
21    ///
22    /// If this grows too large, a flush is triggered.
23    pub(crate) approximate_size: AtomicU64,
24
25    /// Highest encountered sequence number.
26    ///
27    /// This is used so that `get_highest_seqno` has O(1) complexity.
28    pub(crate) highest_seqno: AtomicU64,
29}
30
31impl Memtable {
32    /// Clears the memtable.
33    pub fn clear(&mut self) {
34        self.items.clear();
35        self.highest_seqno = AtomicU64::new(0);
36        self.approximate_size
37            .store(0, std::sync::atomic::Ordering::Release);
38    }
39
40    /// Creates an iterator over all items.
41    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
42        self.items.iter().map(|entry| InternalValue {
43            key: entry.key().clone(),
44            value: entry.value().clone(),
45        })
46    }
47
48    /// Creates an iterator over a range of items.
49    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
50        &'a self,
51        range: R,
52    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
53        self.items.range(range).map(|entry| InternalValue {
54            key: entry.key().clone(),
55            value: entry.value().clone(),
56        })
57    }
58
59    /// Returns the item by key if it exists.
60    ///
61    /// The item with the highest seqno will be returned, if `seqno` is None.
62    #[doc(hidden)]
63    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
64        if seqno == 0 {
65            return None;
66        }
67
68        // NOTE: This range start deserves some explanation...
69        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
70        // We search for the lowest entry that is greater or equal the user's prefix key
71        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
72        //
73        // Example: We search for "abc"
74        //
75        // key -> seqno
76        //
77        // a   -> 7
78        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
79        // abc -> 4
80        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
81        // abcdef -> 6
82        // abcdef -> 5
83        //
84        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
85
86        let mut iter = self
87            .items
88            .range(lower_bound..)
89            .take_while(|entry| &*entry.key().user_key == key);
90
91        iter.next().map(|entry| InternalValue {
92            key: entry.key().clone(),
93            value: entry.value().clone(),
94        })
95    }
96
97    /// Gets approximate size of memtable in bytes.
98    pub fn size(&self) -> u64 {
99        self.approximate_size
100            .load(std::sync::atomic::Ordering::Acquire)
101    }
102
103    /// Counts the number of items in the memtable.
104    pub fn len(&self) -> usize {
105        self.items.len()
106    }
107
108    /// Returns `true` if the memtable is empty.
109    #[must_use]
110    pub fn is_empty(&self) -> bool {
111        self.items.is_empty()
112    }
113
114    /// Inserts an item into the memtable
115    #[doc(hidden)]
116    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
117        // NOTE: We know keys are limited to 16-bit length + values are limited to 32-bit length
118        #[allow(clippy::cast_possible_truncation, clippy::expect_used)]
119        let item_size =
120            (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
121                .try_into()
122                .expect("should fit into u64");
123
124        let size_before = self
125            .approximate_size
126            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
127
128        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
129        self.items.insert(key, item.value);
130
131        self.highest_seqno
132            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
133
134        (item_size, size_before + item_size)
135    }
136
137    /// Returns the highest sequence number in the memtable.
138    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
139        if self.is_empty() {
140            None
141        } else {
142            Some(
143                self.highest_seqno
144                    .load(std::sync::atomic::Ordering::Acquire),
145            )
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::value::ValueType;
154    use test_log::test;
155
156    #[test]
157    #[allow(clippy::unwrap_used)]
158    fn memtable_mvcc_point_read() {
159        let memtable = Memtable::default();
160
161        memtable.insert(InternalValue::from_components(
162            *b"hello-key-999991",
163            *b"hello-value-999991",
164            0,
165            ValueType::Value,
166        ));
167
168        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
169        assert_eq!(None, item);
170
171        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
172        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
173
174        memtable.insert(InternalValue::from_components(
175            *b"hello-key-999991",
176            *b"hello-value-999991-2",
177            1,
178            ValueType::Value,
179        ));
180
181        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
182        assert_eq!(None, item);
183
184        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
185        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
186
187        let item = memtable.get(b"hello-key-99999", 1);
188        assert_eq!(None, item);
189
190        let item = memtable.get(b"hello-key-999991", 1);
191        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
192
193        let item = memtable.get(b"hello-key-99999", 2);
194        assert_eq!(None, item);
195
196        let item = memtable.get(b"hello-key-999991", 2);
197        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
198    }
199
200    #[test]
201    fn memtable_get() {
202        let memtable = Memtable::default();
203
204        let value =
205            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
206
207        memtable.insert(value.clone());
208
209        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
210    }
211
212    #[test]
213    fn memtable_get_highest_seqno() {
214        let memtable = Memtable::default();
215
216        memtable.insert(InternalValue::from_components(
217            b"abc".to_vec(),
218            b"abc".to_vec(),
219            0,
220            ValueType::Value,
221        ));
222        memtable.insert(InternalValue::from_components(
223            b"abc".to_vec(),
224            b"abc".to_vec(),
225            1,
226            ValueType::Value,
227        ));
228        memtable.insert(InternalValue::from_components(
229            b"abc".to_vec(),
230            b"abc".to_vec(),
231            2,
232            ValueType::Value,
233        ));
234        memtable.insert(InternalValue::from_components(
235            b"abc".to_vec(),
236            b"abc".to_vec(),
237            3,
238            ValueType::Value,
239        ));
240        memtable.insert(InternalValue::from_components(
241            b"abc".to_vec(),
242            b"abc".to_vec(),
243            4,
244            ValueType::Value,
245        ));
246
247        assert_eq!(
248            Some(InternalValue::from_components(
249                b"abc".to_vec(),
250                b"abc".to_vec(),
251                4,
252                ValueType::Value,
253            )),
254            memtable.get(b"abc", SeqNo::MAX)
255        );
256    }
257
258    #[test]
259    fn memtable_get_prefix() {
260        let memtable = Memtable::default();
261
262        memtable.insert(InternalValue::from_components(
263            b"abc0".to_vec(),
264            b"abc".to_vec(),
265            0,
266            ValueType::Value,
267        ));
268        memtable.insert(InternalValue::from_components(
269            b"abc".to_vec(),
270            b"abc".to_vec(),
271            255,
272            ValueType::Value,
273        ));
274
275        assert_eq!(
276            Some(InternalValue::from_components(
277                b"abc".to_vec(),
278                b"abc".to_vec(),
279                255,
280                ValueType::Value,
281            )),
282            memtable.get(b"abc", SeqNo::MAX)
283        );
284
285        assert_eq!(
286            Some(InternalValue::from_components(
287                b"abc0".to_vec(),
288                b"abc".to_vec(),
289                0,
290                ValueType::Value,
291            )),
292            memtable.get(b"abc0", SeqNo::MAX)
293        );
294    }
295
296    #[test]
297    fn memtable_get_old_version() {
298        let memtable = Memtable::default();
299
300        memtable.insert(InternalValue::from_components(
301            b"abc".to_vec(),
302            b"abc".to_vec(),
303            0,
304            ValueType::Value,
305        ));
306        memtable.insert(InternalValue::from_components(
307            b"abc".to_vec(),
308            b"abc".to_vec(),
309            99,
310            ValueType::Value,
311        ));
312        memtable.insert(InternalValue::from_components(
313            b"abc".to_vec(),
314            b"abc".to_vec(),
315            255,
316            ValueType::Value,
317        ));
318
319        assert_eq!(
320            Some(InternalValue::from_components(
321                b"abc".to_vec(),
322                b"abc".to_vec(),
323                255,
324                ValueType::Value,
325            )),
326            memtable.get(b"abc", SeqNo::MAX)
327        );
328
329        assert_eq!(
330            Some(InternalValue::from_components(
331                b"abc".to_vec(),
332                b"abc".to_vec(),
333                99,
334                ValueType::Value,
335            )),
336            memtable.get(b"abc", 100)
337        );
338
339        assert_eq!(
340            Some(InternalValue::from_components(
341                b"abc".to_vec(),
342                b"abc".to_vec(),
343                0,
344                ValueType::Value,
345            )),
346            memtable.get(b"abc", 50)
347        );
348    }
349}