lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::segment::block::ItemSize;
7use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
8use crossbeam_skiplist::SkipMap;
9use std::ops::RangeBounds;
10use std::sync::atomic::{AtomicU32, AtomicU64};
11
12/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
13///
14/// When the Memtable exceeds some size, it should be flushed to a disk segment.
15#[derive(Default)]
16pub struct Memtable {
17    /// The actual content, stored in a lock-free skiplist.
18    #[doc(hidden)]
19    pub items: SkipMap<InternalKey, UserValue>,
20
21    /// Approximate active memtable size.
22    ///
23    /// If this grows too large, a flush is triggered.
24    pub(crate) approximate_size: AtomicU32,
25
26    /// Highest encountered sequence number.
27    ///
28    /// This is used so that `get_highest_seqno` has O(1) complexity.
29    pub(crate) highest_seqno: AtomicU64,
30}
31
32impl Memtable {
33    /// Clears the memtable.
34    pub fn clear(&mut self) {
35        self.items.clear();
36        self.highest_seqno = AtomicU64::new(0);
37        self.approximate_size
38            .store(0, std::sync::atomic::Ordering::Release);
39    }
40
41    /// Creates an iterator over all items.
42    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
43        self.items.iter().map(|entry| InternalValue {
44            key: entry.key().clone(),
45            value: entry.value().clone(),
46        })
47    }
48
49    /// Creates an iterator over a range of items.
50    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
51        &'a self,
52        range: R,
53    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
54        self.items.range(range).map(|entry| InternalValue {
55            key: entry.key().clone(),
56            value: entry.value().clone(),
57        })
58    }
59
60    /// Returns the item by key if it exists.
61    ///
62    /// The item with the highest seqno will be returned, if `seqno` is None.
63    #[doc(hidden)]
64    pub fn get(&self, key: &[u8], seqno: Option<SeqNo>) -> Option<InternalValue> {
65        if seqno == Some(0) {
66            return None;
67        }
68
69        // NOTE: This range start deserves some explanation...
70        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
71        // We search for the lowest entry that is greater or equal the user's prefix key
72        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
73        //
74        // Example: We search for "abc"
75        //
76        // key -> seqno
77        //
78        // a   -> 7
79        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
80        // abc -> 4
81        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
82        // abcdef -> 6
83        // abcdef -> 5
84        //
85        let lower_bound = InternalKey::new(
86            key,
87            match seqno {
88                Some(seqno) => seqno - 1,
89                None => SeqNo::MAX,
90            },
91            ValueType::Value,
92        );
93
94        let mut iter = self
95            .items
96            .range(lower_bound..)
97            .take_while(|entry| &*entry.key().user_key == key);
98
99        iter.next().map(|entry| InternalValue {
100            key: entry.key().clone(),
101            value: entry.value().clone(),
102        })
103    }
104
105    /// Gets approximate size of memtable in bytes.
106    pub fn size(&self) -> u32 {
107        self.approximate_size
108            .load(std::sync::atomic::Ordering::Acquire)
109    }
110
111    /// Counts the amount of items in the memtable.
112    pub fn len(&self) -> usize {
113        self.items.len()
114    }
115
116    /// Returns `true` if the memtable is empty.
117    #[must_use]
118    pub fn is_empty(&self) -> bool {
119        self.items.is_empty()
120    }
121
122    /// Inserts an item into the memtable
123    #[doc(hidden)]
124    pub fn insert(&self, item: InternalValue) -> (u32, u32) {
125        // NOTE: We know values are limited to 32-bit length
126        #[allow(clippy::cast_possible_truncation)]
127        let item_size = item.size() as u32;
128
129        let size_before = self
130            .approximate_size
131            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
132
133        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
134        self.items.insert(key, item.value);
135
136        self.highest_seqno
137            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
138
139        (item_size, size_before + item_size)
140    }
141
142    /// Returns the highest sequence number in the memtable.
143    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
144        if self.is_empty() {
145            None
146        } else {
147            Some(
148                self.highest_seqno
149                    .load(std::sync::atomic::Ordering::Acquire),
150            )
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::value::ValueType;
159    use test_log::test;
160
161    #[test]
162    #[allow(clippy::unwrap_used)]
163    fn memtable_mvcc_point_read() {
164        let memtable = Memtable::default();
165
166        memtable.insert(InternalValue::from_components(
167            *b"hello-key-999991",
168            *b"hello-value-999991",
169            0,
170            ValueType::Value,
171        ));
172
173        let item = memtable.get(b"hello-key-99999", None);
174        assert_eq!(None, item);
175
176        let item = memtable.get(b"hello-key-999991", None);
177        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
178
179        memtable.insert(InternalValue::from_components(
180            *b"hello-key-999991",
181            *b"hello-value-999991-2",
182            1,
183            ValueType::Value,
184        ));
185
186        let item = memtable.get(b"hello-key-99999", None);
187        assert_eq!(None, item);
188
189        let item = memtable.get(b"hello-key-999991", None);
190        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
191
192        let item = memtable.get(b"hello-key-99999", Some(1));
193        assert_eq!(None, item);
194
195        let item = memtable.get(b"hello-key-999991", Some(1));
196        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
197
198        let item = memtable.get(b"hello-key-99999", Some(2));
199        assert_eq!(None, item);
200
201        let item = memtable.get(b"hello-key-999991", Some(2));
202        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
203    }
204
205    #[test]
206    fn memtable_get() {
207        let memtable = Memtable::default();
208
209        let value =
210            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
211
212        memtable.insert(value.clone());
213
214        assert_eq!(Some(value), memtable.get(b"abc", None));
215    }
216
217    #[test]
218    fn memtable_get_highest_seqno() {
219        let memtable = Memtable::default();
220
221        memtable.insert(InternalValue::from_components(
222            b"abc".to_vec(),
223            b"abc".to_vec(),
224            0,
225            ValueType::Value,
226        ));
227        memtable.insert(InternalValue::from_components(
228            b"abc".to_vec(),
229            b"abc".to_vec(),
230            1,
231            ValueType::Value,
232        ));
233        memtable.insert(InternalValue::from_components(
234            b"abc".to_vec(),
235            b"abc".to_vec(),
236            2,
237            ValueType::Value,
238        ));
239        memtable.insert(InternalValue::from_components(
240            b"abc".to_vec(),
241            b"abc".to_vec(),
242            3,
243            ValueType::Value,
244        ));
245        memtable.insert(InternalValue::from_components(
246            b"abc".to_vec(),
247            b"abc".to_vec(),
248            4,
249            ValueType::Value,
250        ));
251
252        assert_eq!(
253            Some(InternalValue::from_components(
254                b"abc".to_vec(),
255                b"abc".to_vec(),
256                4,
257                ValueType::Value,
258            )),
259            memtable.get(b"abc", None)
260        );
261    }
262
263    #[test]
264    fn memtable_get_prefix() {
265        let memtable = Memtable::default();
266
267        memtable.insert(InternalValue::from_components(
268            b"abc0".to_vec(),
269            b"abc".to_vec(),
270            0,
271            ValueType::Value,
272        ));
273        memtable.insert(InternalValue::from_components(
274            b"abc".to_vec(),
275            b"abc".to_vec(),
276            255,
277            ValueType::Value,
278        ));
279
280        assert_eq!(
281            Some(InternalValue::from_components(
282                b"abc".to_vec(),
283                b"abc".to_vec(),
284                255,
285                ValueType::Value,
286            )),
287            memtable.get(b"abc", None)
288        );
289
290        assert_eq!(
291            Some(InternalValue::from_components(
292                b"abc0".to_vec(),
293                b"abc".to_vec(),
294                0,
295                ValueType::Value,
296            )),
297            memtable.get(b"abc0", None)
298        );
299    }
300
301    #[test]
302    fn memtable_get_old_version() {
303        let memtable = Memtable::default();
304
305        memtable.insert(InternalValue::from_components(
306            b"abc".to_vec(),
307            b"abc".to_vec(),
308            0,
309            ValueType::Value,
310        ));
311        memtable.insert(InternalValue::from_components(
312            b"abc".to_vec(),
313            b"abc".to_vec(),
314            99,
315            ValueType::Value,
316        ));
317        memtable.insert(InternalValue::from_components(
318            b"abc".to_vec(),
319            b"abc".to_vec(),
320            255,
321            ValueType::Value,
322        ));
323
324        assert_eq!(
325            Some(InternalValue::from_components(
326                b"abc".to_vec(),
327                b"abc".to_vec(),
328                255,
329                ValueType::Value,
330            )),
331            memtable.get(b"abc", None)
332        );
333
334        assert_eq!(
335            Some(InternalValue::from_components(
336                b"abc".to_vec(),
337                b"abc".to_vec(),
338                99,
339                ValueType::Value,
340            )),
341            memtable.get(b"abc", Some(100))
342        );
343
344        assert_eq!(
345            Some(InternalValue::from_components(
346                b"abc".to_vec(),
347                b"abc".to_vec(),
348                0,
349                ValueType::Value,
350            )),
351            memtable.get(b"abc", Some(50))
352        );
353    }
354}