Skip to main content

lsm_tree/memtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5pub mod arena;
6pub mod interval_tree;
7pub mod skiplist;
8pub mod value_store;
9
10use crate::comparator::SharedComparator;
11use crate::key::InternalKey;
12use crate::range_tombstone::RangeTombstone;
13use crate::{
14    UserKey, ValueType,
15    value::{InternalValue, SeqNo},
16};
17#[cfg(not(feature = "std"))]
18use alloc::vec::Vec;
19use core::ops::RangeBounds;
20use core::sync::atomic::AtomicBool;
21use portable_atomic::AtomicU64;
22// `parking_lot::RwLock` (std: small, userspace fast-path, no poisoning) /
23// `spin::RwLock` (no_std). Neither poisons on a panicked holder, so the read/
24// write guards are taken without a `LockResult` unwrap.
25#[cfg(feature = "std")]
26use parking_lot::RwLock;
27#[cfg(not(feature = "std"))]
28use spin::RwLock;
29
30pub use crate::tree::inner::MemtableId;
31
32/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
33///
34/// When the Memtable exceeds some size, it should be flushed to a table.
35pub struct Memtable {
36    #[doc(hidden)]
37    pub id: MemtableId,
38
39    /// The user key comparator used for ordering entries.
40    pub(crate) comparator: SharedComparator,
41
42    /// The actual content, stored in an arena-based skiplist with lock-free traversal.
43    ///
44    /// Nodes are allocated from a contiguous byte arena for cache locality
45    /// and O(1) bulk deallocation when the memtable is dropped.  Traversal of
46    /// the skiplist index uses atomic loads and CAS for inserts.
47    pub(crate) items: skiplist::SkipMap,
48
49    /// Range tombstones stored in an interval tree.
50    ///
51    /// Protected by `RwLock` — read-heavy suppression queries (`query_suppression`,
52    /// `range_tombstones_sorted`) take a shared read lock, while `insert_range_tombstone`
53    /// takes an exclusive write lock. After a rotation has been requested via
54    /// `requested_rotation`, the interval tree is treated as read-only by convention,
55    /// and only readers are expected to access this field (the `RwLock` is still used
56    /// for synchronization, but there should be no further writes).
57    ///
58    /// `std::sync::RwLock` may be reader-biased on some platforms, but writer
59    /// starvation is not a concern here: range deletes are rare, the write-side
60    /// critical section is O(log n) with n typically small, and the memtable
61    /// rotates (becoming read-only) well before contention could accumulate.
62    pub(crate) range_tombstones: RwLock<interval_tree::IntervalTree>,
63
64    /// Approximate active memtable size.
65    ///
66    /// If this grows too large, a flush is triggered.
67    pub(crate) approximate_size: AtomicU64,
68
69    /// Highest encountered sequence number.
70    ///
71    /// This is used so that `get_highest_seqno` has O(1) complexity.
72    pub(crate) highest_seqno: AtomicU64,
73
74    pub(crate) requested_rotation: AtomicBool,
75
76    /// Whether any insert-time per-KV digest (`KvChecksumComputePoint::AtInsert`)
77    /// has been stored in this memtable. Set on the first digest-bearing insert
78    /// and read once at flush by [`Self::verify_kv_residence`] to skip walking
79    /// the nodes entirely when there is nothing to verify (the default `Off` /
80    /// `AtBlockCompile` path). The per-node digest carries its own algorithm, so
81    /// no memtable-wide algorithm is tracked here.
82    has_at_insert_digests: AtomicBool,
83}
84
85impl Memtable {
86    /// Returns the memtable ID.
87    pub fn id(&self) -> MemtableId {
88        self.id
89    }
90
91    /// Returns `true` if the memtable was already flagged for rotation.
92    pub fn is_flagged_for_rotation(&self) -> bool {
93        self.requested_rotation
94            .load(core::sync::atomic::Ordering::Relaxed)
95    }
96
97    /// Flags the memtable as requested for rotation.
98    pub fn flag_rotated(&self) {
99        self.requested_rotation
100            .store(true, core::sync::atomic::Ordering::Relaxed);
101    }
102
103    // `pub` + `#[doc(hidden)]`: used by the host crate (fjall) to construct
104    // ephemeral memtables. Not part of the semver-stable API.
105    // Keep the comparator by-value for hidden-public API compatibility while
106    // still requiring callers to pass the tree comparator explicitly.
107    #[doc(hidden)]
108    #[expect(
109        clippy::needless_pass_by_value,
110        reason = "hidden-public constructor keeps the preexisting by-value signature for compatibility"
111    )]
112    #[must_use]
113    pub fn new(id: MemtableId, comparator: SharedComparator) -> Self {
114        Self {
115            id,
116            items: skiplist::SkipMap::new(comparator.clone()),
117            comparator: comparator.clone(),
118            range_tombstones: RwLock::new(interval_tree::IntervalTree::new_with_comparator(
119                comparator.clone(),
120            )),
121            approximate_size: AtomicU64::default(),
122            highest_seqno: AtomicU64::default(),
123            requested_rotation: AtomicBool::default(),
124            has_at_insert_digests: AtomicBool::default(),
125        }
126    }
127
128    /// Creates an iterator over all items.
129    pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
130        self.items.iter().map(|entry| InternalValue {
131            key: entry.key(),
132            value: entry.value(),
133        })
134    }
135
136    /// Creates an iterator over a range of items.
137    ///
138    /// Accepts `InternalKey`-based bounds.
139    pub(crate) fn range_internal<'a, R: RangeBounds<InternalKey> + 'a>(
140        &'a self,
141        range: R,
142    ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
143        self.items.range(range).map(|entry| InternalValue {
144            key: entry.key(),
145            value: entry.value(),
146        })
147    }
148
149    /// Returns the item by key if it exists.
150    ///
151    /// Returns the version with the highest seqno that is strictly less than
152    /// the given `seqno`.  Pass [`MAX_SEQNO`](crate::MAX_SEQNO) to retrieve the latest version.
153    #[doc(hidden)]
154    pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
155        if seqno == 0 {
156            return None;
157        }
158
159        // NOTE: This range start deserves some explanation...
160        // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
161        // We search for the lowest entry that is greater or equal the user's prefix key
162        // and has the seqno (or lower) we want (because the seqno is stored in reverse order)
163        //
164        // Example: We search for "abc"
165        //
166        // key -> seqno
167        //
168        // a   -> 7
169        // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=MAX
170        // abc -> 4
171        // abc -> 3 <<< If searching for abc and seqno=4, we would get this
172        // abcdef -> 6
173        // abcdef -> 5
174        //
175        let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
176
177        let cmp = self.comparator.as_ref();
178
179        let mut iter = self.items.range(lower_bound..).take_while(|entry| {
180            cmp.compare(entry.user_key_bytes(), key) == core::cmp::Ordering::Equal
181        });
182
183        iter.next().map(|entry| InternalValue {
184            key: entry.key(),
185            value: entry.value(),
186        })
187    }
188
189    /// Gets approximate size of memtable in bytes.
190    pub fn size(&self) -> u64 {
191        self.approximate_size
192            .load(core::sync::atomic::Ordering::Acquire)
193    }
194
195    /// Counts the number of items in the memtable.
196    pub fn len(&self) -> usize {
197        self.items.len()
198    }
199
200    /// Returns `true` if the memtable has no KV items and no range tombstones.
201    #[must_use]
202    pub fn is_empty(&self) -> bool {
203        self.items.is_empty() && self.range_tombstone_count() == 0
204    }
205
206    /// Inserts multiple items into the memtable in bulk.
207    ///
208    /// More efficient than calling [`Memtable::insert`] in a loop because it
209    /// performs a single `fetch_add` for the total size and a single
210    /// `fetch_max` for the highest seqno.
211    ///
212    /// Returns `(total_bytes_added, new_memtable_size)`.
213    #[doc(hidden)]
214    pub fn insert_batch(&self, items: Vec<InternalValue>) -> (u64, u64) {
215        self.insert_batch_with_kv_algo(items, None)
216    }
217
218    /// Bulk insert, optionally computing an insert-time per-KV digest per item
219    /// under `kv_algo` (`KvChecksumComputePoint::AtInsert`).
220    ///
221    /// `kv_algo` is `Some(algo)` (a 4-byte algorithm) to fix each entry's
222    /// digest at insert for the flush-time residence check, or `None` for the
223    /// plain bulk path. Same single-`fetch_add` / single-`fetch_max` accounting
224    /// as [`Self::insert_batch`].
225    #[doc(hidden)]
226    pub fn insert_batch_with_kv_algo(
227        &self,
228        items: Vec<InternalValue>,
229        kv_algo: Option<crate::runtime_config::ChecksumAlgorithm>,
230    ) -> (u64, u64) {
231        if items.is_empty() {
232            let size = self
233                .approximate_size
234                .load(core::sync::atomic::Ordering::Acquire);
235            return (0, size);
236        }
237
238        let mut total_size: u64 = 0;
239        let mut max_seqno: u64 = 0;
240
241        let overhead =
242            core::mem::size_of::<InternalValue>() + core::mem::size_of::<SharedComparator>();
243
244        for item in &items {
245            #[expect(
246                clippy::expect_used,
247                reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
248            )]
249            let item_size: u64 = (item.key.user_key.len() + item.value.len() + overhead)
250                .try_into()
251                .expect("should fit into u64");
252
253            // Running memtable byte total, bounded by the in-memory data size;
254            // a plain add cannot overflow u64.
255            total_size += item_size;
256
257            if item.key.seqno > max_seqno {
258                max_seqno = item.key.seqno;
259            }
260        }
261
262        let size_before = self
263            .approximate_size
264            .fetch_add(total_size, core::sync::atomic::Ordering::AcqRel);
265
266        if kv_algo.is_some() {
267            // Flag that this memtable carries residence digests for the
268            // flush-time verify. The algorithm lives per node.
269            self.has_at_insert_digests
270                .store(true, core::sync::atomic::Ordering::Relaxed);
271        }
272
273        for item in items {
274            let digest = kv_algo.and_then(|algo| {
275                crate::table::block::kv_checksum::kv_digest(&item, algo).map(|d| {
276                    #[expect(
277                        clippy::cast_possible_truncation,
278                        reason = "AtInsert is config-validated to a 4-byte algorithm; the digest fits u32"
279                    )]
280                    let lo = d as u32;
281                    (lo, algo)
282                })
283            });
284            let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
285            self.items.insert_with_kv_digest(&key, &item.value, digest);
286        }
287
288        self.highest_seqno
289            .fetch_max(max_seqno, core::sync::atomic::Ordering::AcqRel);
290
291        // fetch_add returns value BEFORE the add, so size_before + total_size
292        // = value AFTER add = new memtable size. Same pattern as Memtable::insert().
293        (total_size, size_before + total_size)
294    }
295
296    /// Inserts an item into the memtable
297    #[doc(hidden)]
298    pub fn insert(&self, item: InternalValue) -> (u64, u64) {
299        #[expect(
300            clippy::expect_used,
301            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
302        )]
303        // Account for MemtableKey overhead (InternalKey + Arc<dyn UserComparator>)
304        let item_size = (item.key.user_key.len()
305            + item.value.len()
306            + core::mem::size_of::<InternalValue>()
307            + core::mem::size_of::<SharedComparator>())
308        .try_into()
309        .expect("should fit into u64");
310
311        let size_before = self
312            .approximate_size
313            .fetch_add(item_size, core::sync::atomic::Ordering::AcqRel);
314
315        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
316        self.items.insert(&key, &item.value);
317
318        self.highest_seqno
319            .fetch_max(item.key.seqno, core::sync::atomic::Ordering::AcqRel);
320
321        (item_size, size_before + item_size)
322    }
323
324    /// Inserts an item, optionally carrying a precomputed insert-time per-KV
325    /// digest (`KvChecksumComputePoint::AtInsert`).
326    ///
327    /// `kv_digest` is `Some((digest, algo))` when the caller computed the
328    /// entry's 4-byte logical-content digest at insert (under `AtInsert` with a
329    /// 4-byte algorithm), or `None` for the plain path. When present, the digest
330    /// and its algorithm are stored in the skiplist node (per node, so a later
331    /// config change cannot misverify it) and the memtable flags that it carries
332    /// at least one digest so [`Self::verify_kv_residence`] knows to walk at
333    /// flush. Mixed inserts (some with, some without a digest) are supported for
334    /// the `Off` -> `AtInsert` live toggle.
335    #[doc(hidden)]
336    pub fn insert_with_kv_digest(
337        &self,
338        item: InternalValue,
339        kv_digest: Option<(u32, crate::runtime_config::ChecksumAlgorithm)>,
340    ) -> (u64, u64) {
341        #[expect(
342            clippy::expect_used,
343            reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
344        )]
345        let item_size = (item.key.user_key.len()
346            + item.value.len()
347            + core::mem::size_of::<InternalValue>()
348            + core::mem::size_of::<SharedComparator>())
349        .try_into()
350        .expect("should fit into u64");
351
352        let size_before = self
353            .approximate_size
354            .fetch_add(item_size, core::sync::atomic::Ordering::AcqRel);
355
356        if kv_digest.is_some() {
357            // Flag that this memtable carries at least one residence digest so
358            // the flush-time verify walks the nodes. The algorithm lives per
359            // node, not here.
360            self.has_at_insert_digests
361                .store(true, core::sync::atomic::Ordering::Relaxed);
362        }
363
364        let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
365        self.items
366            .insert_with_kv_digest(&key, &item.value, kv_digest);
367
368        self.highest_seqno
369            .fetch_max(item.key.seqno, core::sync::atomic::Ordering::AcqRel);
370
371        (item_size, size_before + item_size)
372    }
373
374    /// Verifies every insert-time per-KV digest in this memtable against a
375    /// recompute over the entry's current bytes (the
376    /// [`KvChecksumComputePoint::AtInsert`](crate::runtime_config::KvChecksumComputePoint::AtInsert)
377    /// residence check), called once at flush.
378    ///
379    /// Returns `Ok` immediately when no `AtInsert` digest was ever inserted, so
380    /// the default path pays nothing.
381    ///
382    /// # Errors
383    ///
384    /// - [`crate::Error::MemtableKvChecksumMismatch`] when an entry's stored
385    ///   digest diverges from the recompute (a RAM bit-flip during residence).
386    /// - [`crate::Error::FeatureUnsupported`] when a node's algorithm is not
387    ///   compiled into this build.
388    pub fn verify_kv_residence(&self) -> crate::Result<()> {
389        if !self
390            .has_at_insert_digests
391            .load(core::sync::atomic::Ordering::Relaxed)
392        {
393            return Ok(());
394        }
395        self.items.verify_kv_digests()
396    }
397
398    /// Inserts a range tombstone covering `[start, end)` at the given seqno.
399    ///
400    /// Returns the approximate size added to the memtable.
401    ///
402    /// Returns 0 if `start >= end` or if either bound exceeds `u16::MAX` bytes.
403    ///
404    /// # Panics
405    ///
406    /// Panics if the internal `RwLock` is poisoned.
407    #[must_use]
408    pub fn insert_range_tombstone(&self, start: UserKey, end: UserKey, seqno: SeqNo) -> u64 {
409        // flag_rotated() (which sets requested_rotation) is called by the host
410        // crate (fjall) before rotation; this crate never sets it directly.
411        // The assert catches misuse by callers
412        // in debug builds — intentionally debug-only because post-rotation writes
413        // are structurally prevented by the host (sealed memtables are behind Arc
414        // with no write path exposed), and an atomic load here would add overhead
415        // on the hot insert path in release builds for no practical benefit.
416        debug_assert!(
417            !self.is_flagged_for_rotation(),
418            "insert_range_tombstone called after memtable was flagged for rotation"
419        );
420
421        // Reject invalid intervals in release builds (debug_assert is not enough)
422        if self.comparator.compare(&start, &end) != core::cmp::Ordering::Less {
423            return 0;
424        }
425
426        // On-disk RT format writes key lengths as u16, enforce at insertion time.
427        // Emit a warning when rejecting an oversized bound so this failure is diagnosable.
428        if u16::try_from(start.len()).is_err() || u16::try_from(end.len()).is_err() {
429            log::warn!(
430                "insert_range_tombstone: rejecting oversized range tombstone \
431                 bounds (start_len = {}, end_len = {}, max = {})",
432                start.len(),
433                end.len(),
434                u16::MAX,
435            );
436            return 0;
437        }
438
439        let size = (start.len() + end.len() + core::mem::size_of::<RangeTombstone>()) as u64;
440
441        self.range_tombstones
442            .write()
443            .insert(RangeTombstone::new(start, end, seqno));
444
445        self.approximate_size
446            .fetch_add(size, core::sync::atomic::Ordering::AcqRel);
447
448        self.highest_seqno
449            .fetch_max(seqno, core::sync::atomic::Ordering::AcqRel);
450
451        size
452    }
453
454    /// Returns `true` if the key at `key_seqno` is suppressed by a range tombstone
455    /// visible at `read_seqno`.
456    pub(crate) fn is_key_suppressed_by_range_tombstone(
457        &self,
458        key: &[u8],
459        key_seqno: SeqNo,
460        read_seqno: SeqNo,
461    ) -> bool {
462        self.range_tombstones
463            .read()
464            .query_suppression(key, key_seqno, read_seqno)
465    }
466
467    /// Returns all range tombstones in sorted order (for flush).
468    pub(crate) fn range_tombstones_sorted(&self) -> Vec<RangeTombstone> {
469        self.range_tombstones.read().iter_sorted()
470    }
471
472    /// Returns the number of range tombstones.
473    #[must_use]
474    pub fn range_tombstone_count(&self) -> usize {
475        self.range_tombstones.read().len()
476    }
477
478    /// Returns the highest sequence number in the memtable.
479    pub fn get_highest_seqno(&self) -> Option<SeqNo> {
480        if self.is_empty() {
481            None
482        } else {
483            Some(
484                self.highest_seqno
485                    .load(core::sync::atomic::Ordering::Acquire),
486            )
487        }
488    }
489}
490
491#[cfg(test)]
492mod tests;