lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::{
7    value::{InternalValue, SeqNo, UserValue},
8    ValueType,
9};
10use crossbeam_skiplist::SkipMap;
11use std::ops::RangeBounds;
12use std::sync::atomic::{AtomicBool, AtomicU64};
13
14pub use crate::tree::inner::MemtableId;
15
16/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
17///
18/// When the Memtable exceeds some size, it should be flushed to a table.
19pub struct Memtable {
20    #[doc(hidden)]
21    pub id: MemtableId,
22
23    /// The actual content, stored in a lock-free skiplist.
24    #[doc(hidden)]
25    pub items: SkipMap<InternalKey, UserValue>,
26
27    /// Approximate active memtable size.
28    ///
29    /// If this grows too large, a flush is triggered.
30    pub(crate) approximate_size: AtomicU64,
31
32    /// Highest encountered sequence number.
33    ///
34    /// This is used so that `get_highest_seqno` has O(1) complexity.
35    pub(crate) highest_seqno: AtomicU64,
36
37    pub(crate) requested_rotation: AtomicBool,
38}
39
40impl Memtable {
41    /// Returns the memtable ID.
42    pub fn id(&self) -> MemtableId {
43        self.id
44    }
45
46    /// Returns `true` if the memtable was already flagged for rotation.
47    pub fn is_flagged_for_rotation(&self) -> bool {
48        self.requested_rotation
49            .load(std::sync::atomic::Ordering::Relaxed)
50    }
51
52    /// Flags the memtable as requested for rotation.
53    pub fn flag_rotated(&self) {
54        self.requested_rotation
55            .store(true, std::sync::atomic::Ordering::Relaxed);
56    }
57
58    #[doc(hidden)]
59    #[must_use]
60    pub fn new(id: MemtableId) -> Self {
61        Self {
62            id,
63            items: SkipMap::default(),
64            approximate_size: AtomicU64::default(),
65            highest_seqno: AtomicU64::default(),
66            requested_rotation: AtomicBool::default(),
67        }
68    }
69
70    /// Creates an iterator over all items.
71    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
72        self.items.iter().map(|entry| InternalValue {
73            key: entry.key().clone(),
74            value: entry.value().clone(),
75        })
76    }
77
78    /// Creates an iterator over a range of items.
79    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
80        &'a self,
81        range: R,
82    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
83        self.items.range(range).map(|entry| InternalValue {
84            key: entry.key().clone(),
85            value: entry.value().clone(),
86        })
87    }
88
89    /// Returns the item by key if it exists.
90    ///
91    /// The item with the highest seqno will be returned, if `seqno` is None.
92    #[doc(hidden)]
93    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
94        if seqno == 0 {
95            return None;
96        }
97
98        // NOTE: This range start deserves some explanation...
99        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
100        // We search for the lowest entry that is greater or equal the user's prefix key
101        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
102        //
103        // Example: We search for "abc"
104        //
105        // key -> seqno
106        //
107        // a   -> 7
108        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
109        // abc -> 4
110        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
111        // abcdef -> 6
112        // abcdef -> 5
113        //
114        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
115
116        let mut iter = self
117            .items
118            .range(lower_bound..)
119            .take_while(|entry| &*entry.key().user_key == key);
120
121        iter.next().map(|entry| InternalValue {
122            key: entry.key().clone(),
123            value: entry.value().clone(),
124        })
125    }
126
127    /// Gets approximate size of memtable in bytes.
128    pub fn size(&self) -> u64 {
129        self.approximate_size
130            .load(std::sync::atomic::Ordering::Acquire)
131    }
132
133    /// Counts the number of items in the memtable.
134    pub fn len(&self) -> usize {
135        self.items.len()
136    }
137
138    /// Returns `true` if the memtable is empty.
139    #[must_use]
140    pub fn is_empty(&self) -> bool {
141        self.items.is_empty()
142    }
143
144    /// Inserts an item into the memtable
145    #[doc(hidden)]
146    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
147        #[expect(
148            clippy::expect_used,
149            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
150        )]
151        let item_size =
152            (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
153                .try_into()
154                .expect("should fit into u64");
155
156        let size_before = self
157            .approximate_size
158            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
159
160        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
161        self.items.insert(key, item.value);
162
163        self.highest_seqno
164            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
165
166        (item_size, size_before + item_size)
167    }
168
169    /// Returns the highest sequence number in the memtable.
170    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
171        if self.is_empty() {
172            None
173        } else {
174            Some(
175                self.highest_seqno
176                    .load(std::sync::atomic::Ordering::Acquire),
177            )
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::ValueType;
186    use test_log::test;
187
188    #[test]
189    #[expect(clippy::unwrap_used)]
190    fn memtable_mvcc_point_read() {
191        let memtable = Memtable::new(0);
192
193        memtable.insert(InternalValue::from_components(
194            *b"hello-key-999991",
195            *b"hello-value-999991",
196            0,
197            ValueType::Value,
198        ));
199
200        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
201        assert_eq!(None, item);
202
203        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
204        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
205
206        memtable.insert(InternalValue::from_components(
207            *b"hello-key-999991",
208            *b"hello-value-999991-2",
209            1,
210            ValueType::Value,
211        ));
212
213        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
214        assert_eq!(None, item);
215
216        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
217        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
218
219        let item = memtable.get(b"hello-key-99999", 1);
220        assert_eq!(None, item);
221
222        let item = memtable.get(b"hello-key-999991", 1);
223        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
224
225        let item = memtable.get(b"hello-key-99999", 2);
226        assert_eq!(None, item);
227
228        let item = memtable.get(b"hello-key-999991", 2);
229        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
230    }
231
232    #[test]
233    fn memtable_get() {
234        let memtable = Memtable::new(0);
235
236        let value =
237            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
238
239        memtable.insert(value.clone());
240
241        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
242    }
243
244    #[test]
245    fn memtable_get_highest_seqno() {
246        let memtable = Memtable::new(0);
247
248        memtable.insert(InternalValue::from_components(
249            b"abc".to_vec(),
250            b"abc".to_vec(),
251            0,
252            ValueType::Value,
253        ));
254        memtable.insert(InternalValue::from_components(
255            b"abc".to_vec(),
256            b"abc".to_vec(),
257            1,
258            ValueType::Value,
259        ));
260        memtable.insert(InternalValue::from_components(
261            b"abc".to_vec(),
262            b"abc".to_vec(),
263            2,
264            ValueType::Value,
265        ));
266        memtable.insert(InternalValue::from_components(
267            b"abc".to_vec(),
268            b"abc".to_vec(),
269            3,
270            ValueType::Value,
271        ));
272        memtable.insert(InternalValue::from_components(
273            b"abc".to_vec(),
274            b"abc".to_vec(),
275            4,
276            ValueType::Value,
277        ));
278
279        assert_eq!(
280            Some(InternalValue::from_components(
281                b"abc".to_vec(),
282                b"abc".to_vec(),
283                4,
284                ValueType::Value,
285            )),
286            memtable.get(b"abc", SeqNo::MAX)
287        );
288    }
289
290    #[test]
291    fn memtable_get_prefix() {
292        let memtable = Memtable::new(0);
293
294        memtable.insert(InternalValue::from_components(
295            b"abc0".to_vec(),
296            b"abc".to_vec(),
297            0,
298            ValueType::Value,
299        ));
300        memtable.insert(InternalValue::from_components(
301            b"abc".to_vec(),
302            b"abc".to_vec(),
303            255,
304            ValueType::Value,
305        ));
306
307        assert_eq!(
308            Some(InternalValue::from_components(
309                b"abc".to_vec(),
310                b"abc".to_vec(),
311                255,
312                ValueType::Value,
313            )),
314            memtable.get(b"abc", SeqNo::MAX)
315        );
316
317        assert_eq!(
318            Some(InternalValue::from_components(
319                b"abc0".to_vec(),
320                b"abc".to_vec(),
321                0,
322                ValueType::Value,
323            )),
324            memtable.get(b"abc0", SeqNo::MAX)
325        );
326    }
327
328    #[test]
329    fn memtable_get_old_version() {
330        let memtable = Memtable::new(0);
331
332        memtable.insert(InternalValue::from_components(
333            b"abc".to_vec(),
334            b"abc".to_vec(),
335            0,
336            ValueType::Value,
337        ));
338        memtable.insert(InternalValue::from_components(
339            b"abc".to_vec(),
340            b"abc".to_vec(),
341            99,
342            ValueType::Value,
343        ));
344        memtable.insert(InternalValue::from_components(
345            b"abc".to_vec(),
346            b"abc".to_vec(),
347            255,
348            ValueType::Value,
349        ));
350
351        assert_eq!(
352            Some(InternalValue::from_components(
353                b"abc".to_vec(),
354                b"abc".to_vec(),
355                255,
356                ValueType::Value,
357            )),
358            memtable.get(b"abc", SeqNo::MAX)
359        );
360
361        assert_eq!(
362            Some(InternalValue::from_components(
363                b"abc".to_vec(),
364                b"abc".to_vec(),
365                99,
366                ValueType::Value,
367            )),
368            memtable.get(b"abc", 100)
369        );
370
371        assert_eq!(
372            Some(InternalValue::from_components(
373                b"abc".to_vec(),
374                b"abc".to_vec(),
375                0,
376                ValueType::Value,
377            )),
378            memtable.get(b"abc", 50)
379        );
380    }
381}