Skip to main content

lsm_tree/memtable/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod arena;
6pub mod interval_tree;
7pub mod skiplist;
8pub mod value_store;
9
10use crate::comparator::SharedComparator;
11use crate::key::InternalKey;
12use crate::range_tombstone::RangeTombstone;
13use crate::{
14    value::{InternalValue, SeqNo},
15    UserKey, ValueType,
16};
17use std::ops::RangeBounds;
18use std::sync::atomic::{AtomicBool, AtomicU64};
19use std::sync::RwLock;
20
21pub use crate::tree::inner::MemtableId;
22
23/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
24///
25/// When the Memtable exceeds some size, it should be flushed to a table.
26pub struct Memtable {
27    #[doc(hidden)]
28    pub id: MemtableId,
29
30    /// The user key comparator used for ordering entries.
31    pub(crate) comparator: SharedComparator,
32
33    /// The actual content, stored in an arena-based skiplist with lock-free traversal.
34    ///
35    /// Nodes are allocated from a contiguous byte arena for cache locality
36    /// and O(1) bulk deallocation when the memtable is dropped.  Traversal of
37    /// the skiplist index uses atomic loads and CAS for inserts.
38    pub(crate) items: skiplist::SkipMap,
39
40    /// Range tombstones stored in an interval tree.
41    ///
42    /// Protected by `RwLock` — read-heavy suppression queries (`query_suppression`,
43    /// `range_tombstones_sorted`) take a shared read lock, while `insert_range_tombstone`
44    /// takes an exclusive write lock. After a rotation has been requested via
45    /// `requested_rotation`, the interval tree is treated as read-only by convention,
46    /// and only readers are expected to access this field (the `RwLock` is still used
47    /// for synchronization, but there should be no further writes).
48    ///
49    /// `std::sync::RwLock` may be reader-biased on some platforms, but writer
50    /// starvation is not a concern here: range deletes are rare, the write-side
51    /// critical section is O(log n) with n typically small, and the memtable
52    /// rotates (becoming read-only) well before contention could accumulate.
53    // NOTE: The interval tree uses lexicographic `Ord` on `UserKey` for
54    // containment queries. With a custom comparator, RT suppression in
55    // the memtable may produce incorrect results for non-lexicographic
56    // orderings. Threading the comparator into the AVL tree is tracked
57    // as a follow-up issue.
58    pub(crate) range_tombstones: RwLock<interval_tree::IntervalTree>,
59
60    /// Approximate active memtable size.
61    ///
62    /// If this grows too large, a flush is triggered.
63    pub(crate) approximate_size: AtomicU64,
64
65    /// Highest encountered sequence number.
66    ///
67    /// This is used so that `get_highest_seqno` has O(1) complexity.
68    pub(crate) highest_seqno: AtomicU64,
69
70    pub(crate) requested_rotation: AtomicBool,
71}
72
73impl Memtable {
74    /// Returns the memtable ID.
75    pub fn id(&self) -> MemtableId {
76        self.id
77    }
78
79    /// Returns `true` if the memtable was already flagged for rotation.
80    pub fn is_flagged_for_rotation(&self) -> bool {
81        self.requested_rotation
82            .load(std::sync::atomic::Ordering::Relaxed)
83    }
84
85    /// Flags the memtable as requested for rotation.
86    pub fn flag_rotated(&self) {
87        self.requested_rotation
88            .store(true, std::sync::atomic::Ordering::Relaxed);
89    }
90
91    // `pub` + `#[doc(hidden)]`: used by the host crate (fjall) to construct
92    // ephemeral memtables. Not part of the semver-stable API.
93    // The comparator parameter is mandatory because memtable ordering must
94    // match the tree's comparator; a default would silently produce wrong order.
95    #[doc(hidden)]
96    #[must_use]
97    pub fn new(id: MemtableId, comparator: SharedComparator) -> Self {
98        Self {
99            id,
100            items: skiplist::SkipMap::new(comparator.clone()),
101            comparator,
102            range_tombstones: RwLock::new(interval_tree::IntervalTree::new()),
103            approximate_size: AtomicU64::default(),
104            highest_seqno: AtomicU64::default(),
105            requested_rotation: AtomicBool::default(),
106        }
107    }
108
109    /// Creates an iterator over all items.
110    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
111        self.items.iter().map(|entry| InternalValue {
112            key: entry.key(),
113            value: entry.value(),
114        })
115    }
116
117    /// Creates an iterator over a range of items.
118    ///
119    /// Accepts `InternalKey`-based bounds.
120    pub(crate) fn range_internal<'a, R: RangeBounds<InternalKey> + 'a>(
121        &'a self,
122        range: R,
123    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
124        self.items.range(range).map(|entry| InternalValue {
125            key: entry.key(),
126            value: entry.value(),
127        })
128    }
129
130    /// Returns the item by key if it exists.
131    ///
132    /// Returns the version with the highest seqno that is strictly less than
133    /// the given `seqno`.  Pass [`MAX_SEQNO`](crate::MAX_SEQNO) to retrieve the latest version.
134    #[doc(hidden)]
135    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
136        if seqno == 0 {
137            return None;
138        }
139
140        // NOTE: This range start deserves some explanation...
141        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
142        // We search for the lowest entry that is greater or equal the user's prefix key
143        // and has the seqno (or lower) we want (because the seqno is stored in reverse order)
144        //
145        // Example: We search for "abc"
146        //
147        // key -> seqno
148        //
149        // a   -> 7
150        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=MAX
151        // abc -> 4
152        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
153        // abcdef -> 6
154        // abcdef -> 5
155        //
156        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
157
158        let cmp = self.comparator.as_ref();
159
160        let mut iter = self.items.range(lower_bound..).take_while(|entry| {
161            cmp.compare(entry.user_key_bytes(), key) == std::cmp::Ordering::Equal
162        });
163
164        iter.next().map(|entry| InternalValue {
165            key: entry.key(),
166            value: entry.value(),
167        })
168    }
169
170    /// Gets approximate size of memtable in bytes.
171    pub fn size(&self) -> u64 {
172        self.approximate_size
173            .load(std::sync::atomic::Ordering::Acquire)
174    }
175
176    /// Counts the number of items in the memtable.
177    pub fn len(&self) -> usize {
178        self.items.len()
179    }
180
181    /// Returns `true` if the memtable has no KV items and no range tombstones.
182    #[must_use]
183    pub fn is_empty(&self) -> bool {
184        self.items.is_empty() && self.range_tombstone_count() == 0
185    }
186
187    /// Inserts an item into the memtable
188    #[doc(hidden)]
189    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
190        #[expect(
191            clippy::expect_used,
192            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
193        )]
194        // Account for MemtableKey overhead (InternalKey + Arc<dyn UserComparator>)
195        let item_size = (item.key.user_key.len()
196            + item.value.len()
197            + std::mem::size_of::<InternalValue>()
198            + std::mem::size_of::<SharedComparator>())
199        .try_into()
200        .expect("should fit into u64");
201
202        let size_before = self
203            .approximate_size
204            .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
205
206        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
207        self.items.insert(&key, &item.value);
208
209        self.highest_seqno
210            .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
211
212        (item_size, size_before + item_size)
213    }
214
215    /// Inserts a range tombstone covering `[start, end)` at the given seqno.
216    ///
217    /// Returns the approximate size added to the memtable.
218    ///
219    /// Returns 0 if `start >= end` or if either bound exceeds `u16::MAX` bytes.
220    ///
221    /// # Panics
222    ///
223    /// Panics if the internal `RwLock` is poisoned.
224    #[must_use]
225    pub fn insert_range_tombstone(&self, start: UserKey, end: UserKey, seqno: SeqNo) -> u64 {
226        // flag_rotated() (which sets requested_rotation) is called by the host
227        // crate (fjall) before rotation; this crate never sets it directly.
228        // The assert catches misuse by callers
229        // in debug builds — intentionally debug-only because post-rotation writes
230        // are structurally prevented by the host (sealed memtables are behind Arc
231        // with no write path exposed), and an atomic load here would add overhead
232        // on the hot insert path in release builds for no practical benefit.
233        debug_assert!(
234            !self.is_flagged_for_rotation(),
235            "insert_range_tombstone called after memtable was flagged for rotation"
236        );
237
238        // Reject invalid intervals in release builds (debug_assert is not enough)
239        if start >= end {
240            return 0;
241        }
242
243        // On-disk RT format writes key lengths as u16, enforce at insertion time.
244        // Emit a warning when rejecting an oversized bound so this failure is diagnosable.
245        if u16::try_from(start.len()).is_err() || u16::try_from(end.len()).is_err() {
246            log::warn!(
247                "insert_range_tombstone: rejecting oversized range tombstone \
248                 bounds (start_len = {}, end_len = {}, max = {})",
249                start.len(),
250                end.len(),
251                u16::MAX,
252            );
253            return 0;
254        }
255
256        let size = (start.len() + end.len() + std::mem::size_of::<RangeTombstone>()) as u64;
257
258        // Panic on poison is intentional — a poisoned lock indicates a prior panic
259        // during a write, leaving the tree in an unknown state. Recovery would
260        // require validating AVL invariants which is not worth the complexity.
261        // This pattern is consistent with the original Mutex implementation.
262        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
263        self.range_tombstones
264            .write()
265            .expect("lock is poisoned")
266            .insert(RangeTombstone::new(start, end, seqno));
267
268        self.approximate_size
269            .fetch_add(size, std::sync::atomic::Ordering::AcqRel);
270
271        self.highest_seqno
272            .fetch_max(seqno, std::sync::atomic::Ordering::AcqRel);
273
274        size
275    }
276
277    /// Returns `true` if the key at `key_seqno` is suppressed by a range tombstone
278    /// visible at `read_seqno`.
279    ///
280    /// # Panics
281    ///
282    /// Panics if the internal `RwLock` is poisoned.
283    pub(crate) fn is_key_suppressed_by_range_tombstone(
284        &self,
285        key: &[u8],
286        key_seqno: SeqNo,
287        read_seqno: SeqNo,
288    ) -> bool {
289        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
290        self.range_tombstones
291            .read()
292            .expect("lock is poisoned")
293            .query_suppression(key, key_seqno, read_seqno)
294    }
295
296    /// Returns all range tombstones in sorted order (for flush).
297    ///
298    /// # Panics
299    ///
300    /// Panics if the internal `RwLock` is poisoned.
301    pub(crate) fn range_tombstones_sorted(&self) -> Vec<RangeTombstone> {
302        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
303        self.range_tombstones
304            .read()
305            .expect("lock is poisoned")
306            .iter_sorted()
307    }
308
309    /// Returns the number of range tombstones.
310    ///
311    /// # Panics
312    ///
313    /// Panics if the internal `RwLock` is poisoned.
314    #[must_use]
315    pub fn range_tombstone_count(&self) -> usize {
316        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
317        self.range_tombstones
318            .read()
319            .expect("lock is poisoned")
320            .len()
321    }
322
323    /// Returns the highest sequence number in the memtable.
324    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
325        if self.is_empty() {
326            None
327        } else {
328            Some(
329                self.highest_seqno
330                    .load(std::sync::atomic::Ordering::Acquire),
331            )
332        }
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::comparator::default_comparator;
340    use crate::ValueType;
341    use std::sync::{Arc, Barrier};
342    use test_log::test;
343
344    fn new_memtable(id: MemtableId) -> Memtable {
345        Memtable::new(id, default_comparator())
346    }
347
348    #[test]
349    #[expect(
350        clippy::expect_used,
351        reason = "tests use expect for lock and thread join"
352    )]
353    fn rwlock_read_while_read_held_succeeds() {
354        let mt = new_memtable(0);
355        let _ = mt.insert_range_tombstone(b"a".to_vec().into(), b"z".to_vec().into(), 10);
356
357        // Two one-way channels avoid Barrier entirely — if either side
358        // panics, the sender drops and recv() returns Err, unblocking the
359        // peer so thread::scope can join without hanging.
360        let (held_tx, held_rx) = std::sync::mpsc::channel::<()>();
361        let (release_tx, release_rx) = std::sync::mpsc::channel::<()>();
362        let rt_ref = &mt.range_tombstones;
363        std::thread::scope(|s| {
364            s.spawn(move || {
365                let _guard = rt_ref.read().expect("lock is poisoned");
366                let _ = held_tx.send(()); // signal: guard held
367                let _ = release_rx.recv(); // wait: main thread done
368            });
369
370            held_rx
371                .recv()
372                .expect("spawned thread panicked before acquiring guard");
373            let guard2 = mt.range_tombstones.try_read();
374            assert!(
375                guard2.is_ok(),
376                "second read lock must succeed while first is held"
377            );
378            drop(guard2);
379            drop(release_tx); // signal: done
380        });
381    }
382
383    #[test]
384    #[expect(clippy::expect_used, reason = "tests use expect for thread join")]
385    fn suppression_queries_concurrent_readers_no_panic() {
386        let mt = Arc::new(new_memtable(0));
387
388        let _ = mt.insert_range_tombstone(b"a".to_vec().into(), b"z".to_vec().into(), 10);
389        for i in 0u8..100 {
390            let key = vec![b'a' + (i % 25)];
391            mt.insert(InternalValue::from_components(
392                key,
393                b"v".to_vec(),
394                u64::from(i),
395                ValueType::Value,
396            ));
397        }
398
399        let handles: Vec<_> = (0..8)
400            .map(|t| {
401                let mt = Arc::clone(&mt);
402                std::thread::spawn(move || {
403                    for i in 0u8..200 {
404                        let key = vec![b'a' + ((t + i) % 25)];
405                        let _ = mt.is_key_suppressed_by_range_tombstone(&key, 5, SeqNo::MAX);
406                        let _ = mt.range_tombstone_count();
407                    }
408                })
409            })
410            .collect();
411
412        for h in handles {
413            h.join().expect("reader thread panicked");
414        }
415    }
416
417    #[test]
418    #[expect(clippy::expect_used, reason = "tests use expect for thread join")]
419    fn range_tombstones_concurrent_read_write_writers_observable() {
420        let mt = Arc::new(new_memtable(0));
421        // Barrier ensures all 6 threads start simultaneously.
422        let start = Arc::new(Barrier::new(6));
423
424        let _ = mt.insert_range_tombstone(b"a".to_vec().into(), b"m".to_vec().into(), 10);
425
426        let readers: Vec<_> = (0..4)
427            .map(|_| {
428                let mt = Arc::clone(&mt);
429                let start = Arc::clone(&start);
430                std::thread::spawn(move || {
431                    start.wait();
432                    for _ in 0..500 {
433                        let suppressed =
434                            mt.is_key_suppressed_by_range_tombstone(b"f", 5, SeqNo::MAX);
435                        assert!(
436                            suppressed,
437                            "key 'f' at seqno=5 must be suppressed by RT [a,m)@10"
438                        );
439                    }
440                })
441            })
442            .collect();
443
444        let writers: Vec<_> = (0..2)
445            .map(|t| {
446                let mt = Arc::clone(&mt);
447                let start = Arc::clone(&start);
448                std::thread::spawn(move || {
449                    start.wait();
450                    let start_key: UserKey = b"n".to_vec().into();
451                    let end_key: UserKey = b"z".to_vec().into();
452                    for i in 0u64..100 {
453                        let seqno = 100 + t * 1000 + i;
454                        let _ =
455                            mt.insert_range_tombstone(start_key.clone(), end_key.clone(), seqno);
456                    }
457                })
458            })
459            .collect();
460
461        for h in readers {
462            h.join().expect("reader panicked");
463        }
464        for h in writers {
465            h.join().expect("writer panicked");
466        }
467
468        // We intentionally do not assert that any reader observed a
469        // writer-inserted tombstone mid-loop. `std::sync::RwLock` may be
470        // reader-biased, so writers are allowed to be blocked until all
471        // readers have finished, which would make such an assertion flaky.
472        // Instead, validate post-join visibility: writers insert [n,z) at
473        // seqnos starting from 100, so keys in this range must be suppressed.
474        assert!(mt.is_key_suppressed_by_range_tombstone(b"n", 50, SeqNo::MAX));
475        assert!(mt.is_key_suppressed_by_range_tombstone(b"y", 150, SeqNo::MAX));
476    }
477
478    #[test]
479    #[expect(clippy::expect_used, reason = "tests use expect for thread join")]
480    fn range_tombstones_populated_tree_concurrent_reads_succeed() {
481        let mt = Arc::new(new_memtable(0));
482
483        for i in 0u8..50 {
484            let start = vec![b'a' + (i % 25)];
485            let end = vec![b'a' + (i % 25) + 1];
486            let _ = mt.insert_range_tombstone(start.into(), end.into(), u64::from(i));
487        }
488
489        let handles: Vec<_> = (0..8)
490            .map(|_| {
491                let mt = Arc::clone(&mt);
492                std::thread::spawn(move || {
493                    for _ in 0..500 {
494                        let _ = mt.is_key_suppressed_by_range_tombstone(b"c", 5, SeqNo::MAX);
495                        let sorted = mt.range_tombstones_sorted();
496                        assert!(!sorted.is_empty());
497                        let count = mt.range_tombstone_count();
498                        assert!(count > 0);
499                    }
500                })
501            })
502            .collect();
503
504        for h in handles {
505            h.join().expect("reader thread panicked");
506        }
507    }
508
509    #[test]
510    #[expect(clippy::unwrap_used)]
511    fn memtable_mvcc_point_read() {
512        let memtable = new_memtable(0);
513
514        memtable.insert(InternalValue::from_components(
515            *b"hello-key-999991",
516            *b"hello-value-999991",
517            0,
518            ValueType::Value,
519        ));
520
521        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
522        assert_eq!(None, item);
523
524        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
525        assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
526
527        memtable.insert(InternalValue::from_components(
528            *b"hello-key-999991",
529            *b"hello-value-999991-2",
530            1,
531            ValueType::Value,
532        ));
533
534        let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
535        assert_eq!(None, item);
536
537        let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
538        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
539
540        let item = memtable.get(b"hello-key-99999", 1);
541        assert_eq!(None, item);
542
543        let item = memtable.get(b"hello-key-999991", 1);
544        assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
545
546        let item = memtable.get(b"hello-key-99999", 2);
547        assert_eq!(None, item);
548
549        let item = memtable.get(b"hello-key-999991", 2);
550        assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
551    }
552
553    #[test]
554    fn memtable_get() {
555        let memtable = new_memtable(0);
556
557        let value =
558            InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
559
560        memtable.insert(value.clone());
561
562        assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
563    }
564
565    #[test]
566    fn memtable_get_highest_seqno() {
567        let memtable = new_memtable(0);
568
569        memtable.insert(InternalValue::from_components(
570            b"abc".to_vec(),
571            b"abc".to_vec(),
572            0,
573            ValueType::Value,
574        ));
575        memtable.insert(InternalValue::from_components(
576            b"abc".to_vec(),
577            b"abc".to_vec(),
578            1,
579            ValueType::Value,
580        ));
581        memtable.insert(InternalValue::from_components(
582            b"abc".to_vec(),
583            b"abc".to_vec(),
584            2,
585            ValueType::Value,
586        ));
587        memtable.insert(InternalValue::from_components(
588            b"abc".to_vec(),
589            b"abc".to_vec(),
590            3,
591            ValueType::Value,
592        ));
593        memtable.insert(InternalValue::from_components(
594            b"abc".to_vec(),
595            b"abc".to_vec(),
596            4,
597            ValueType::Value,
598        ));
599
600        assert_eq!(
601            Some(InternalValue::from_components(
602                b"abc".to_vec(),
603                b"abc".to_vec(),
604                4,
605                ValueType::Value,
606            )),
607            memtable.get(b"abc", SeqNo::MAX)
608        );
609    }
610
611    #[test]
612    fn memtable_get_prefix() {
613        let memtable = new_memtable(0);
614
615        memtable.insert(InternalValue::from_components(
616            b"abc0".to_vec(),
617            b"abc".to_vec(),
618            0,
619            ValueType::Value,
620        ));
621        memtable.insert(InternalValue::from_components(
622            b"abc".to_vec(),
623            b"abc".to_vec(),
624            255,
625            ValueType::Value,
626        ));
627
628        assert_eq!(
629            Some(InternalValue::from_components(
630                b"abc".to_vec(),
631                b"abc".to_vec(),
632                255,
633                ValueType::Value,
634            )),
635            memtable.get(b"abc", SeqNo::MAX)
636        );
637
638        assert_eq!(
639            Some(InternalValue::from_components(
640                b"abc0".to_vec(),
641                b"abc".to_vec(),
642                0,
643                ValueType::Value,
644            )),
645            memtable.get(b"abc0", SeqNo::MAX)
646        );
647    }
648
649    #[test]
650    fn memtable_get_old_version() {
651        let memtable = new_memtable(0);
652
653        memtable.insert(InternalValue::from_components(
654            b"abc".to_vec(),
655            b"abc".to_vec(),
656            0,
657            ValueType::Value,
658        ));
659        memtable.insert(InternalValue::from_components(
660            b"abc".to_vec(),
661            b"abc".to_vec(),
662            99,
663            ValueType::Value,
664        ));
665        memtable.insert(InternalValue::from_components(
666            b"abc".to_vec(),
667            b"abc".to_vec(),
668            255,
669            ValueType::Value,
670        ));
671
672        assert_eq!(
673            Some(InternalValue::from_components(
674                b"abc".to_vec(),
675                b"abc".to_vec(),
676                255,
677                ValueType::Value,
678            )),
679            memtable.get(b"abc", SeqNo::MAX)
680        );
681
682        assert_eq!(
683            Some(InternalValue::from_components(
684                b"abc".to_vec(),
685                b"abc".to_vec(),
686                99,
687                ValueType::Value,
688            )),
689            memtable.get(b"abc", 100)
690        );
691
692        assert_eq!(
693            Some(InternalValue::from_components(
694                b"abc".to_vec(),
695                b"abc".to_vec(),
696                0,
697                ValueType::Value,
698            )),
699            memtable.get(b"abc", 50)
700        );
701    }
702}