lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::key::InternalKey;
6use crate::tree::inner::MemtableId;
7use crate::{
8    value::{InternalValue, SeqNo, UserValue},
9    ValueType,
10};
11use crossbeam_skiplist::SkipMap;
12use std::ops::RangeBounds;
13use std::sync::atomic::AtomicU64;
14
15/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
16///
17/// When the Memtable exceeds some size, it should be flushed to a table.
18pub struct Memtable {
19    #[doc(hidden)]
20    pub id: MemtableId,
21
22    /// The actual content, stored in a lock-free skiplist.
23    #[doc(hidden)]
24    pub items: SkipMap<InternalKey, UserValue>,
25
26    /// Approximate active memtable size.
27    ///
28    /// If this grows too large, a flush is triggered.
29    pub(crate) approximate_size: AtomicU64,
30
31    /// Highest encountered sequence number.
32    ///
33    /// This is used so that `get_highest_seqno` has O(1) complexity.
34    pub(crate) highest_seqno: AtomicU64,
35}
36
37impl Memtable {
38    #[doc(hidden)]
39    #[must_use]
40    pub fn new(id: MemtableId) -> Self {
41        Self {
42            id,
43            items: SkipMap::default(),
44            approximate_size: AtomicU64::default(),
45            highest_seqno: AtomicU64::default(),
46        }
47    }
48
49    /// Clears the memtable.
50    pub fn clear(&mut self) {
51        self.items.clear();
52        self.highest_seqno = AtomicU64::new(0);
53        self.approximate_size
54            .store(0, std::sync::atomic::Ordering::Release);
55    }
56
57    /// Creates an iterator over all items.
58    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
59        self.items.iter().map(|entry| InternalValue {
60            key: entry.key().clone(),
61            value: entry.value().clone(),
62        })
63    }
64
65    /// Creates an iterator over a range of items.
66    pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
67        &'a self,
68        range: R,
69    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
70        self.items.range(range).map(|entry| InternalValue {
71            key: entry.key().clone(),
72            value: entry.value().clone(),
73        })
74    }
75
76    /// Returns the item by key if it exists.
77    ///
78    /// The item with the highest seqno will be returned, if `seqno` is None.
79    #[doc(hidden)]
80    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
81        if seqno == 0 {
82            return None;
83        }
84
85        // NOTE: This range start deserves some explanation...
86        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
87        // We search for the lowest entry that is greater or equal the user's prefix key
88        // and has the seqno (or lower) we want  (because the seqno is stored in reverse order)
89        //
90        // Example: We search for "abc"
91        //
92        // key -> seqno
93        //
94        // a   -> 7
95        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=None
96        // abc -> 4
97        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
98        // abcdef -> 6
99        // abcdef -> 5
100        //
101        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
102
103        let mut iter = self
104            .items
105            .range(lower_bound..)
106            .take_while(|entry| &*entry.key().user_key == key);
107
108        iter.next().map(|entry| InternalValue {
109            key: entry.key().clone(),
110            value: entry.value().clone(),
111        })
112    }
113
114    /// Gets approximate size of memtable in bytes.
115    pub fn size(&self) -> u64 {
116        self.approximate_size
117            .load(std::sync::atomic::Ordering::Acquire)
118    }
119
120    /// Counts the number of items in the memtable.
121    pub fn len(&self) -> usize {
122        self.items.len()
123    }
124
125    /// Returns `true` if the memtable is empty.
126    #[must_use]
127    pub fn is_empty(&self) -> bool {
128        self.items.is_empty()
129    }
130
131    /// Inserts an item into the memtable
132    #[doc(hidden)]
133    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
134        #[expect(
135            clippy::expect_used,
136            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
137        )]
138        let item_size =
139            (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
140                .try_into()
141                .expect("should fit into u64");
142
143        let size_before = self
144            .approximate_size
145            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
146
147        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
148        self.items.insert(key, item.value);
149
150        self.highest_seqno
151            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
152
153        (item_size, size_before + item_size)
154    }
155
156    /// Returns the highest sequence number in the memtable.
157    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
158        if self.is_empty() {
159            None
160        } else {
161            Some(
162                self.highest_seqno
163                    .load(std::sync::atomic::Ordering::Acquire),
164            )
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::ValueType;
173    use test_log::test;
174
175    #[test]
176    #[expect(clippy::unwrap_used)]
177    fn memtable_mvcc_point_read() {
178        let memtable = Memtable::new(0);
179
180        memtable.insert(InternalValue::from_components(
181            *b"hello-key-999991",
182            *b"hello-value-999991",
183            0,
184            ValueType::Value,
185        ));
186
187        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
188        assert_eq!(None, item);
189
190        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
191        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
192
193        memtable.insert(InternalValue::from_components(
194            *b"hello-key-999991",
195            *b"hello-value-999991-2",
196            1,
197            ValueType::Value,
198        ));
199
200        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
201        assert_eq!(None, item);
202
203        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
204        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
205
206        let item = memtable.get(b"hello-key-99999", 1);
207        assert_eq!(None, item);
208
209        let item = memtable.get(b"hello-key-999991", 1);
210        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
211
212        let item = memtable.get(b"hello-key-99999", 2);
213        assert_eq!(None, item);
214
215        let item = memtable.get(b"hello-key-999991", 2);
216        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
217    }
218
219    #[test]
220    fn memtable_get() {
221        let memtable = Memtable::new(0);
222
223        let value =
224            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
225
226        memtable.insert(value.clone());
227
228        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
229    }
230
231    #[test]
232    fn memtable_get_highest_seqno() {
233        let memtable = Memtable::new(0);
234
235        memtable.insert(InternalValue::from_components(
236            b"abc".to_vec(),
237            b"abc".to_vec(),
238            0,
239            ValueType::Value,
240        ));
241        memtable.insert(InternalValue::from_components(
242            b"abc".to_vec(),
243            b"abc".to_vec(),
244            1,
245            ValueType::Value,
246        ));
247        memtable.insert(InternalValue::from_components(
248            b"abc".to_vec(),
249            b"abc".to_vec(),
250            2,
251            ValueType::Value,
252        ));
253        memtable.insert(InternalValue::from_components(
254            b"abc".to_vec(),
255            b"abc".to_vec(),
256            3,
257            ValueType::Value,
258        ));
259        memtable.insert(InternalValue::from_components(
260            b"abc".to_vec(),
261            b"abc".to_vec(),
262            4,
263            ValueType::Value,
264        ));
265
266        assert_eq!(
267            Some(InternalValue::from_components(
268                b"abc".to_vec(),
269                b"abc".to_vec(),
270                4,
271                ValueType::Value,
272            )),
273            memtable.get(b"abc", SeqNo::MAX)
274        );
275    }
276
277    #[test]
278    fn memtable_get_prefix() {
279        let memtable = Memtable::new(0);
280
281        memtable.insert(InternalValue::from_components(
282            b"abc0".to_vec(),
283            b"abc".to_vec(),
284            0,
285            ValueType::Value,
286        ));
287        memtable.insert(InternalValue::from_components(
288            b"abc".to_vec(),
289            b"abc".to_vec(),
290            255,
291            ValueType::Value,
292        ));
293
294        assert_eq!(
295            Some(InternalValue::from_components(
296                b"abc".to_vec(),
297                b"abc".to_vec(),
298                255,
299                ValueType::Value,
300            )),
301            memtable.get(b"abc", SeqNo::MAX)
302        );
303
304        assert_eq!(
305            Some(InternalValue::from_components(
306                b"abc0".to_vec(),
307                b"abc".to_vec(),
308                0,
309                ValueType::Value,
310            )),
311            memtable.get(b"abc0", SeqNo::MAX)
312        );
313    }
314
315    #[test]
316    fn memtable_get_old_version() {
317        let memtable = Memtable::new(0);
318
319        memtable.insert(InternalValue::from_components(
320            b"abc".to_vec(),
321            b"abc".to_vec(),
322            0,
323            ValueType::Value,
324        ));
325        memtable.insert(InternalValue::from_components(
326            b"abc".to_vec(),
327            b"abc".to_vec(),
328            99,
329            ValueType::Value,
330        ));
331        memtable.insert(InternalValue::from_components(
332            b"abc".to_vec(),
333            b"abc".to_vec(),
334            255,
335            ValueType::Value,
336        ));
337
338        assert_eq!(
339            Some(InternalValue::from_components(
340                b"abc".to_vec(),
341                b"abc".to_vec(),
342                255,
343                ValueType::Value,
344            )),
345            memtable.get(b"abc", SeqNo::MAX)
346        );
347
348        assert_eq!(
349            Some(InternalValue::from_components(
350                b"abc".to_vec(),
351                b"abc".to_vec(),
352                99,
353                ValueType::Value,
354            )),
355            memtable.get(b"abc", 100)
356        );
357
358        assert_eq!(
359            Some(InternalValue::from_components(
360                b"abc".to_vec(),
361                b"abc".to_vec(),
362                0,
363                ValueType::Value,
364            )),
365            memtable.get(b"abc", 50)
366        );
367    }
368}