lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::tree::inner::MemtableId;
7use crate::{
8    value::{InternalValue, SeqNo, UserValue},
9    ValueType,
10};
11use crossbeam_skiplist::SkipMap;
12use std::ops::RangeBounds;
13use std::sync::atomic::AtomicU64;
14
15/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
16///
17/// When the Memtable exceeds some size, it should be flushed to a table.
18pub struct Memtable {
19    #[doc(hidden)]
20    pub id: MemtableId,
21
22    /// The actual content, stored in a lock-free skiplist.
23    #[doc(hidden)]
24    pub items: SkipMap<InternalKey, UserValue>,
25
26    /// Approximate active memtable size.
27    ///
28    /// If this grows too large, a flush is triggered.
29    pub(crate) approximate_size: AtomicU64,
30
31    /// Highest encountered sequence number.
32    ///
33    /// This is used so that `get_highest_seqno` has O(1) complexity.
34    pub(crate) highest_seqno: AtomicU64,
35}
36
37impl Memtable {
38    #[doc(hidden)]
39    #[must_use]
40    pub fn new(id: MemtableId) -> Self {
41        Self {
42            id,
43            items: SkipMap::default(),
44            approximate_size: AtomicU64::default(),
45            highest_seqno: AtomicU64::default(),
46        }
47    }
48
49    /// Creates an iterator over all items.
50    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
51        self.items.iter().map(|entry| InternalValue {
52            key: entry.key().clone(),
53            value: entry.value().clone(),
54        })
55    }
56
57    /// Creates an iterator over a range of items.
58    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
59        &'a self,
60        range: R,
61    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
62        self.items.range(range).map(|entry| InternalValue {
63            key: entry.key().clone(),
64            value: entry.value().clone(),
65        })
66    }
67
68    /// Returns the item by key if it exists.
69    ///
70    /// The item with the highest seqno will be returned, if `seqno` is None.
71    #[doc(hidden)]
72    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
73        if seqno == 0 {
74            return None;
75        }
76
77        // NOTE: This range start deserves some explanation...
78        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
79        // We search for the lowest entry that is greater or equal the user's prefix key
80        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
81        //
82        // Example: We search for "abc"
83        //
84        // key -> seqno
85        //
86        // a   -> 7
87        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
88        // abc -> 4
89        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
90        // abcdef -> 6
91        // abcdef -> 5
92        //
93        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
94
95        let mut iter = self
96            .items
97            .range(lower_bound..)
98            .take_while(|entry| &*entry.key().user_key == key);
99
100        iter.next().map(|entry| InternalValue {
101            key: entry.key().clone(),
102            value: entry.value().clone(),
103        })
104    }
105
106    /// Gets approximate size of memtable in bytes.
107    pub fn size(&self) -> u64 {
108        self.approximate_size
109            .load(std::sync::atomic::Ordering::Acquire)
110    }
111
112    /// Counts the number of items in the memtable.
113    pub fn len(&self) -> usize {
114        self.items.len()
115    }
116
117    /// Returns `true` if the memtable is empty.
118    #[must_use]
119    pub fn is_empty(&self) -> bool {
120        self.items.is_empty()
121    }
122
123    /// Inserts an item into the memtable
124    #[doc(hidden)]
125    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
126        #[expect(
127            clippy::expect_used,
128            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
129        )]
130        let item_size =
131            (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
132                .try_into()
133                .expect("should fit into u64");
134
135        let size_before = self
136            .approximate_size
137            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
138
139        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
140        self.items.insert(key, item.value);
141
142        self.highest_seqno
143            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
144
145        (item_size, size_before + item_size)
146    }
147
148    /// Returns the highest sequence number in the memtable.
149    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
150        if self.is_empty() {
151            None
152        } else {
153            Some(
154                self.highest_seqno
155                    .load(std::sync::atomic::Ordering::Acquire),
156            )
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use crate::ValueType;
165    use test_log::test;
166
167    #[test]
168    #[expect(clippy::unwrap_used)]
169    fn memtable_mvcc_point_read() {
170        let memtable = Memtable::new(0);
171
172        memtable.insert(InternalValue::from_components(
173            *b"hello-key-999991",
174            *b"hello-value-999991",
175            0,
176            ValueType::Value,
177        ));
178
179        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
180        assert_eq!(None, item);
181
182        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
183        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
184
185        memtable.insert(InternalValue::from_components(
186            *b"hello-key-999991",
187            *b"hello-value-999991-2",
188            1,
189            ValueType::Value,
190        ));
191
192        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
193        assert_eq!(None, item);
194
195        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
196        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
197
198        let item = memtable.get(b"hello-key-99999", 1);
199        assert_eq!(None, item);
200
201        let item = memtable.get(b"hello-key-999991", 1);
202        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
203
204        let item = memtable.get(b"hello-key-99999", 2);
205        assert_eq!(None, item);
206
207        let item = memtable.get(b"hello-key-999991", 2);
208        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
209    }
210
211    #[test]
212    fn memtable_get() {
213        let memtable = Memtable::new(0);
214
215        let value =
216            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
217
218        memtable.insert(value.clone());
219
220        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
221    }
222
223    #[test]
224    fn memtable_get_highest_seqno() {
225        let memtable = Memtable::new(0);
226
227        memtable.insert(InternalValue::from_components(
228            b"abc".to_vec(),
229            b"abc".to_vec(),
230            0,
231            ValueType::Value,
232        ));
233        memtable.insert(InternalValue::from_components(
234            b"abc".to_vec(),
235            b"abc".to_vec(),
236            1,
237            ValueType::Value,
238        ));
239        memtable.insert(InternalValue::from_components(
240            b"abc".to_vec(),
241            b"abc".to_vec(),
242            2,
243            ValueType::Value,
244        ));
245        memtable.insert(InternalValue::from_components(
246            b"abc".to_vec(),
247            b"abc".to_vec(),
248            3,
249            ValueType::Value,
250        ));
251        memtable.insert(InternalValue::from_components(
252            b"abc".to_vec(),
253            b"abc".to_vec(),
254            4,
255            ValueType::Value,
256        ));
257
258        assert_eq!(
259            Some(InternalValue::from_components(
260                b"abc".to_vec(),
261                b"abc".to_vec(),
262                4,
263                ValueType::Value,
264            )),
265            memtable.get(b"abc", SeqNo::MAX)
266        );
267    }
268
269    #[test]
270    fn memtable_get_prefix() {
271        let memtable = Memtable::new(0);
272
273        memtable.insert(InternalValue::from_components(
274            b"abc0".to_vec(),
275            b"abc".to_vec(),
276            0,
277            ValueType::Value,
278        ));
279        memtable.insert(InternalValue::from_components(
280            b"abc".to_vec(),
281            b"abc".to_vec(),
282            255,
283            ValueType::Value,
284        ));
285
286        assert_eq!(
287            Some(InternalValue::from_components(
288                b"abc".to_vec(),
289                b"abc".to_vec(),
290                255,
291                ValueType::Value,
292            )),
293            memtable.get(b"abc", SeqNo::MAX)
294        );
295
296        assert_eq!(
297            Some(InternalValue::from_components(
298                b"abc0".to_vec(),
299                b"abc".to_vec(),
300                0,
301                ValueType::Value,
302            )),
303            memtable.get(b"abc0", SeqNo::MAX)
304        );
305    }
306
307    #[test]
308    fn memtable_get_old_version() {
309        let memtable = Memtable::new(0);
310
311        memtable.insert(InternalValue::from_components(
312            b"abc".to_vec(),
313            b"abc".to_vec(),
314            0,
315            ValueType::Value,
316        ));
317        memtable.insert(InternalValue::from_components(
318            b"abc".to_vec(),
319            b"abc".to_vec(),
320            99,
321            ValueType::Value,
322        ));
323        memtable.insert(InternalValue::from_components(
324            b"abc".to_vec(),
325            b"abc".to_vec(),
326            255,
327            ValueType::Value,
328        ));
329
330        assert_eq!(
331            Some(InternalValue::from_components(
332                b"abc".to_vec(),
333                b"abc".to_vec(),
334                255,
335                ValueType::Value,
336            )),
337            memtable.get(b"abc", SeqNo::MAX)
338        );
339
340        assert_eq!(
341            Some(InternalValue::from_components(
342                b"abc".to_vec(),
343                b"abc".to_vec(),
344                99,
345                ValueType::Value,
346            )),
347            memtable.get(b"abc", 100)
348        );
349
350        assert_eq!(
351            Some(InternalValue::from_components(
352                b"abc".to_vec(),
353                b"abc".to_vec(),
354                0,
355                ValueType::Value,
356            )),
357            memtable.get(b"abc", 50)
358        );
359    }
360}