lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::segment::block::ItemSize;
7use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
8use crossbeam_skiplist::SkipMap;
9use std::ops::RangeBounds;
10use std::sync::atomic::{AtomicU32, AtomicU64};
11
12/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
13///
14/// When the Memtable exceeds some size, it should be flushed to a disk segment.
15#[derive(Default)]
16pub struct Memtable {
17    /// The actual content, stored in a lock-free skiplist.
18    #[doc(hidden)]
19    pub items: SkipMap<InternalKey, UserValue>,
20
21    /// Approximate active memtable size.
22    ///
23    /// If this grows too large, a flush is triggered.
24    pub(crate) approximate_size: AtomicU32,
25
26    /// Highest encountered sequence number.
27    ///
28    /// This is used so that `get_highest_seqno` has O(1) complexity.
29    pub(crate) highest_seqno: AtomicU64,
30}
31
32impl Memtable {
33    /// Clears the memtable.
34    pub fn clear(&mut self) {
35        self.items.clear();
36        self.highest_seqno = AtomicU64::new(0);
37        self.approximate_size
38            .store(0, std::sync::atomic::Ordering::Release);
39    }
40
41    /// Creates an iterator over all items.
42    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
43        self.items.iter().map(|entry| InternalValue {
44            key: entry.key().clone(),
45            value: entry.value().clone(),
46        })
47    }
48
49    /// Creates an iterator over a range of items.
50    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
51        &'a self,
52        range: R,
53    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
54        self.items.range(range).map(|entry| InternalValue {
55            key: entry.key().clone(),
56            value: entry.value().clone(),
57        })
58    }
59
60    /// Returns the item by key if it exists.
61    ///
62    /// The item with the highest seqno will be returned, if `seqno` is None.
63    #[doc(hidden)]
64    pub fn get<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> Option<InternalValue> {
65        if seqno == Some(0) {
66            return None;
67        }
68
69        let key = key.as_ref();
70
71        // NOTE: This range start deserves some explanation...
72        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
73        // We search for the lowest entry that is greater or equal the user's prefix key
74        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
75        //
76        // Example: We search for "abc"
77        //
78        // key -> seqno
79        //
80        // a   -> 7
81        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
82        // abc -> 4
83        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
84        // abcdef -> 6
85        // abcdef -> 5
86        //
87        let lower_bound = InternalKey::new(
88            key,
89            match seqno {
90                Some(seqno) => seqno - 1,
91                None => SeqNo::MAX,
92            },
93            ValueType::Value,
94        );
95
96        let mut iter = self
97            .items
98            .range(lower_bound..)
99            .take_while(|entry| &*entry.key().user_key == key);
100
101        iter.next().map(|entry| InternalValue {
102            key: entry.key().clone(),
103            value: entry.value().clone(),
104        })
105    }
106
107    /// Gets approximate size of memtable in bytes.
108    pub fn size(&self) -> u32 {
109        self.approximate_size
110            .load(std::sync::atomic::Ordering::Acquire)
111    }
112
113    /// Counts the amount of items in the memtable.
114    pub fn len(&self) -> usize {
115        self.items.len()
116    }
117
118    /// Returns `true` if the memtable is empty.
119    #[must_use]
120    pub fn is_empty(&self) -> bool {
121        self.items.is_empty()
122    }
123
124    /// Inserts an item into the memtable
125    #[doc(hidden)]
126    pub fn insert(&self, item: InternalValue) -> (u32, u32) {
127        // NOTE: We know values are limited to 32-bit length
128        #[allow(clippy::cast_possible_truncation)]
129        let item_size = item.size() as u32;
130
131        let size_before = self
132            .approximate_size
133            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
134
135        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
136        self.items.insert(key, item.value);
137
138        self.highest_seqno
139            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
140
141        (item_size, size_before + item_size)
142    }
143
144    /// Returns the highest sequence number in the memtable.
145    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
146        if self.is_empty() {
147            None
148        } else {
149            Some(
150                self.highest_seqno
151                    .load(std::sync::atomic::Ordering::Acquire),
152            )
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::value::ValueType;
161    use test_log::test;
162
163    #[test]
164    #[allow(clippy::unwrap_used)]
165    fn memtable_mvcc_point_read() {
166        let memtable = Memtable::default();
167
168        memtable.insert(InternalValue::from_components(
169            *b"hello-key-999991",
170            *b"hello-value-999991",
171            0,
172            ValueType::Value,
173        ));
174
175        let item = memtable.get("hello-key-99999", None);
176        assert_eq!(None, item);
177
178        let item = memtable.get("hello-key-999991", None);
179        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
180
181        memtable.insert(InternalValue::from_components(
182            *b"hello-key-999991",
183            *b"hello-value-999991-2",
184            1,
185            ValueType::Value,
186        ));
187
188        let item = memtable.get("hello-key-99999", None);
189        assert_eq!(None, item);
190
191        let item = memtable.get("hello-key-999991", None);
192        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
193
194        let item = memtable.get("hello-key-99999", Some(1));
195        assert_eq!(None, item);
196
197        let item = memtable.get("hello-key-999991", Some(1));
198        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
199
200        let item = memtable.get("hello-key-99999", Some(2));
201        assert_eq!(None, item);
202
203        let item = memtable.get("hello-key-999991", Some(2));
204        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
205    }
206
207    #[test]
208    fn memtable_get() {
209        let memtable = Memtable::default();
210
211        let value =
212            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
213
214        memtable.insert(value.clone());
215
216        assert_eq!(Some(value), memtable.get("abc", None));
217    }
218
219    #[test]
220    fn memtable_get_highest_seqno() {
221        let memtable = Memtable::default();
222
223        memtable.insert(InternalValue::from_components(
224            b"abc".to_vec(),
225            b"abc".to_vec(),
226            0,
227            ValueType::Value,
228        ));
229        memtable.insert(InternalValue::from_components(
230            b"abc".to_vec(),
231            b"abc".to_vec(),
232            1,
233            ValueType::Value,
234        ));
235        memtable.insert(InternalValue::from_components(
236            b"abc".to_vec(),
237            b"abc".to_vec(),
238            2,
239            ValueType::Value,
240        ));
241        memtable.insert(InternalValue::from_components(
242            b"abc".to_vec(),
243            b"abc".to_vec(),
244            3,
245            ValueType::Value,
246        ));
247        memtable.insert(InternalValue::from_components(
248            b"abc".to_vec(),
249            b"abc".to_vec(),
250            4,
251            ValueType::Value,
252        ));
253
254        assert_eq!(
255            Some(InternalValue::from_components(
256                b"abc".to_vec(),
257                b"abc".to_vec(),
258                4,
259                ValueType::Value,
260            )),
261            memtable.get("abc", None)
262        );
263    }
264
265    #[test]
266    fn memtable_get_prefix() {
267        let memtable = Memtable::default();
268
269        memtable.insert(InternalValue::from_components(
270            b"abc0".to_vec(),
271            b"abc".to_vec(),
272            0,
273            ValueType::Value,
274        ));
275        memtable.insert(InternalValue::from_components(
276            b"abc".to_vec(),
277            b"abc".to_vec(),
278            255,
279            ValueType::Value,
280        ));
281
282        assert_eq!(
283            Some(InternalValue::from_components(
284                b"abc".to_vec(),
285                b"abc".to_vec(),
286                255,
287                ValueType::Value,
288            )),
289            memtable.get("abc", None)
290        );
291
292        assert_eq!(
293            Some(InternalValue::from_components(
294                b"abc0".to_vec(),
295                b"abc".to_vec(),
296                0,
297                ValueType::Value,
298            )),
299            memtable.get("abc0", None)
300        );
301    }
302
303    #[test]
304    fn memtable_get_old_version() {
305        let memtable = Memtable::default();
306
307        memtable.insert(InternalValue::from_components(
308            b"abc".to_vec(),
309            b"abc".to_vec(),
310            0,
311            ValueType::Value,
312        ));
313        memtable.insert(InternalValue::from_components(
314            b"abc".to_vec(),
315            b"abc".to_vec(),
316            99,
317            ValueType::Value,
318        ));
319        memtable.insert(InternalValue::from_components(
320            b"abc".to_vec(),
321            b"abc".to_vec(),
322            255,
323            ValueType::Value,
324        ));
325
326        assert_eq!(
327            Some(InternalValue::from_components(
328                b"abc".to_vec(),
329                b"abc".to_vec(),
330                255,
331                ValueType::Value,
332            )),
333            memtable.get("abc", None)
334        );
335
336        assert_eq!(
337            Some(InternalValue::from_components(
338                b"abc".to_vec(),
339                b"abc".to_vec(),
340                99,
341                ValueType::Value,
342            )),
343            memtable.get("abc", Some(100))
344        );
345
346        assert_eq!(
347            Some(InternalValue::from_components(
348                b"abc".to_vec(),
349                b"abc".to_vec(),
350                0,
351                ValueType::Value,
352            )),
353            memtable.get("abc", Some(50))
354        );
355    }
356}