lsm_tree/memtable/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5pub mod arena;
6pub mod interval_tree;
7pub mod skiplist;
8pub mod value_store;
9
10use crate::comparator::SharedComparator;
11use crate::key::InternalKey;
12use crate::range_tombstone::RangeTombstone;
13use crate::{
14 UserKey, ValueType,
15 value::{InternalValue, SeqNo},
16};
17#[cfg(not(feature = "std"))]
18use alloc::vec::Vec;
19use core::ops::RangeBounds;
20use core::sync::atomic::AtomicBool;
21use portable_atomic::AtomicU64;
22// `parking_lot::RwLock` (std: small, userspace fast-path, no poisoning) /
23// `spin::RwLock` (no_std). Neither poisons on a panicked holder, so the read/
24// write guards are taken without a `LockResult` unwrap.
25#[cfg(feature = "std")]
26use parking_lot::RwLock;
27#[cfg(not(feature = "std"))]
28use spin::RwLock;
29
30pub use crate::tree::inner::MemtableId;
31
32/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
33///
34/// When the Memtable exceeds some size, it should be flushed to a table.
35pub struct Memtable {
36 #[doc(hidden)]
37 pub id: MemtableId,
38
39 /// The user key comparator used for ordering entries.
40 pub(crate) comparator: SharedComparator,
41
42 /// The actual content, stored in an arena-based skiplist with lock-free traversal.
43 ///
44 /// Nodes are allocated from a contiguous byte arena for cache locality
45 /// and O(1) bulk deallocation when the memtable is dropped. Traversal of
46 /// the skiplist index uses atomic loads and CAS for inserts.
47 pub(crate) items: skiplist::SkipMap,
48
49 /// Range tombstones stored in an interval tree.
50 ///
51 /// Protected by `RwLock` — read-heavy suppression queries (`query_suppression`,
52 /// `range_tombstones_sorted`) take a shared read lock, while `insert_range_tombstone`
53 /// takes an exclusive write lock. After a rotation has been requested via
54 /// `requested_rotation`, the interval tree is treated as read-only by convention,
55 /// and only readers are expected to access this field (the `RwLock` is still used
56 /// for synchronization, but there should be no further writes).
57 ///
58 /// `std::sync::RwLock` may be reader-biased on some platforms, but writer
59 /// starvation is not a concern here: range deletes are rare, the write-side
60 /// critical section is O(log n) with n typically small, and the memtable
61 /// rotates (becoming read-only) well before contention could accumulate.
62 pub(crate) range_tombstones: RwLock<interval_tree::IntervalTree>,
63
64 /// Approximate active memtable size.
65 ///
66 /// If this grows too large, a flush is triggered.
67 pub(crate) approximate_size: AtomicU64,
68
69 /// Highest encountered sequence number.
70 ///
71 /// This is used so that `get_highest_seqno` has O(1) complexity.
72 pub(crate) highest_seqno: AtomicU64,
73
74 pub(crate) requested_rotation: AtomicBool,
75
76 /// Whether any insert-time per-KV digest (`KvChecksumComputePoint::AtInsert`)
77 /// has been stored in this memtable. Set on the first digest-bearing insert
78 /// and read once at flush by [`Self::verify_kv_residence`] to skip walking
79 /// the nodes entirely when there is nothing to verify (the default `Off` /
80 /// `AtBlockCompile` path). The per-node digest carries its own algorithm, so
81 /// no memtable-wide algorithm is tracked here.
82 has_at_insert_digests: AtomicBool,
83}
84
85impl Memtable {
86 /// Returns the memtable ID.
87 pub fn id(&self) -> MemtableId {
88 self.id
89 }
90
91 /// Returns `true` if the memtable was already flagged for rotation.
92 pub fn is_flagged_for_rotation(&self) -> bool {
93 self.requested_rotation
94 .load(core::sync::atomic::Ordering::Relaxed)
95 }
96
97 /// Flags the memtable as requested for rotation.
98 pub fn flag_rotated(&self) {
99 self.requested_rotation
100 .store(true, core::sync::atomic::Ordering::Relaxed);
101 }
102
103 // `pub` + `#[doc(hidden)]`: used by the host crate (fjall) to construct
104 // ephemeral memtables. Not part of the semver-stable API.
105 // Keep the comparator by-value for hidden-public API compatibility while
106 // still requiring callers to pass the tree comparator explicitly.
107 #[doc(hidden)]
108 #[expect(
109 clippy::needless_pass_by_value,
110 reason = "hidden-public constructor keeps the preexisting by-value signature for compatibility"
111 )]
112 #[must_use]
113 pub fn new(id: MemtableId, comparator: SharedComparator) -> Self {
114 Self {
115 id,
116 items: skiplist::SkipMap::new(comparator.clone()),
117 comparator: comparator.clone(),
118 range_tombstones: RwLock::new(interval_tree::IntervalTree::new_with_comparator(
119 comparator.clone(),
120 )),
121 approximate_size: AtomicU64::default(),
122 highest_seqno: AtomicU64::default(),
123 requested_rotation: AtomicBool::default(),
124 has_at_insert_digests: AtomicBool::default(),
125 }
126 }
127
128 /// Creates an iterator over all items.
129 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
130 self.items.iter().map(|entry| InternalValue {
131 key: entry.key(),
132 value: entry.value(),
133 })
134 }
135
136 /// Creates an iterator over a range of items.
137 ///
138 /// Accepts `InternalKey`-based bounds.
139 pub(crate) fn range_internal<'a, R: RangeBounds<InternalKey> + 'a>(
140 &'a self,
141 range: R,
142 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
143 self.items.range(range).map(|entry| InternalValue {
144 key: entry.key(),
145 value: entry.value(),
146 })
147 }
148
149 /// Returns the item by key if it exists.
150 ///
151 /// Returns the version with the highest seqno that is strictly less than
152 /// the given `seqno`. Pass [`MAX_SEQNO`](crate::MAX_SEQNO) to retrieve the latest version.
153 #[doc(hidden)]
154 pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
155 if seqno == 0 {
156 return None;
157 }
158
159 // NOTE: This range start deserves some explanation...
160 // InternalKeys are multi-sorted by 2 categories: user_key and Reverse(seqno). (tombstone doesn't really matter)
161 // We search for the lowest entry that is greater or equal the user's prefix key
162 // and has the seqno (or lower) we want (because the seqno is stored in reverse order)
163 //
164 // Example: We search for "abc"
165 //
166 // key -> seqno
167 //
168 // a -> 7
169 // abc -> 5 <<< This is the lowest key (highest seqno) that matches the key with seqno=MAX
170 // abc -> 4
171 // abc -> 3 <<< If searching for abc and seqno=4, we would get this
172 // abcdef -> 6
173 // abcdef -> 5
174 //
175 let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
176
177 let cmp = self.comparator.as_ref();
178
179 let mut iter = self.items.range(lower_bound..).take_while(|entry| {
180 cmp.compare(entry.user_key_bytes(), key) == core::cmp::Ordering::Equal
181 });
182
183 iter.next().map(|entry| InternalValue {
184 key: entry.key(),
185 value: entry.value(),
186 })
187 }
188
189 /// Gets approximate size of memtable in bytes.
190 pub fn size(&self) -> u64 {
191 self.approximate_size
192 .load(core::sync::atomic::Ordering::Acquire)
193 }
194
195 /// Counts the number of items in the memtable.
196 pub fn len(&self) -> usize {
197 self.items.len()
198 }
199
200 /// Returns `true` if the memtable has no KV items and no range tombstones.
201 #[must_use]
202 pub fn is_empty(&self) -> bool {
203 self.items.is_empty() && self.range_tombstone_count() == 0
204 }
205
206 /// Inserts multiple items into the memtable in bulk.
207 ///
208 /// More efficient than calling [`Memtable::insert`] in a loop because it
209 /// performs a single `fetch_add` for the total size and a single
210 /// `fetch_max` for the highest seqno.
211 ///
212 /// Returns `(total_bytes_added, new_memtable_size)`.
213 #[doc(hidden)]
214 pub fn insert_batch(&self, items: Vec<InternalValue>) -> (u64, u64) {
215 self.insert_batch_with_kv_algo(items, None)
216 }
217
218 /// Bulk insert, optionally computing an insert-time per-KV digest per item
219 /// under `kv_algo` (`KvChecksumComputePoint::AtInsert`).
220 ///
221 /// `kv_algo` is `Some(algo)` (a 4-byte algorithm) to fix each entry's
222 /// digest at insert for the flush-time residence check, or `None` for the
223 /// plain bulk path. Same single-`fetch_add` / single-`fetch_max` accounting
224 /// as [`Self::insert_batch`].
225 #[doc(hidden)]
226 pub fn insert_batch_with_kv_algo(
227 &self,
228 items: Vec<InternalValue>,
229 kv_algo: Option<crate::runtime_config::ChecksumAlgorithm>,
230 ) -> (u64, u64) {
231 if items.is_empty() {
232 let size = self
233 .approximate_size
234 .load(core::sync::atomic::Ordering::Acquire);
235 return (0, size);
236 }
237
238 let mut total_size: u64 = 0;
239 let mut max_seqno: u64 = 0;
240
241 let overhead =
242 core::mem::size_of::<InternalValue>() + core::mem::size_of::<SharedComparator>();
243
244 for item in &items {
245 #[expect(
246 clippy::expect_used,
247 reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
248 )]
249 let item_size: u64 = (item.key.user_key.len() + item.value.len() + overhead)
250 .try_into()
251 .expect("should fit into u64");
252
253 // Running memtable byte total, bounded by the in-memory data size;
254 // a plain add cannot overflow u64.
255 total_size += item_size;
256
257 if item.key.seqno > max_seqno {
258 max_seqno = item.key.seqno;
259 }
260 }
261
262 let size_before = self
263 .approximate_size
264 .fetch_add(total_size, core::sync::atomic::Ordering::AcqRel);
265
266 if kv_algo.is_some() {
267 // Flag that this memtable carries residence digests for the
268 // flush-time verify. The algorithm lives per node.
269 self.has_at_insert_digests
270 .store(true, core::sync::atomic::Ordering::Relaxed);
271 }
272
273 for item in items {
274 let digest = kv_algo.and_then(|algo| {
275 crate::table::block::kv_checksum::kv_digest(&item, algo).map(|d| {
276 #[expect(
277 clippy::cast_possible_truncation,
278 reason = "AtInsert is config-validated to a 4-byte algorithm; the digest fits u32"
279 )]
280 let lo = d as u32;
281 (lo, algo)
282 })
283 });
284 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
285 self.items.insert_with_kv_digest(&key, &item.value, digest);
286 }
287
288 self.highest_seqno
289 .fetch_max(max_seqno, core::sync::atomic::Ordering::AcqRel);
290
291 // fetch_add returns value BEFORE the add, so size_before + total_size
292 // = value AFTER add = new memtable size. Same pattern as Memtable::insert().
293 (total_size, size_before + total_size)
294 }
295
296 /// Inserts an item into the memtable
297 #[doc(hidden)]
298 pub fn insert(&self, item: InternalValue) -> (u64, u64) {
299 #[expect(
300 clippy::expect_used,
301 reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
302 )]
303 // Account for MemtableKey overhead (InternalKey + Arc<dyn UserComparator>)
304 let item_size = (item.key.user_key.len()
305 + item.value.len()
306 + core::mem::size_of::<InternalValue>()
307 + core::mem::size_of::<SharedComparator>())
308 .try_into()
309 .expect("should fit into u64");
310
311 let size_before = self
312 .approximate_size
313 .fetch_add(item_size, core::sync::atomic::Ordering::AcqRel);
314
315 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
316 self.items.insert(&key, &item.value);
317
318 self.highest_seqno
319 .fetch_max(item.key.seqno, core::sync::atomic::Ordering::AcqRel);
320
321 (item_size, size_before + item_size)
322 }
323
324 /// Inserts an item, optionally carrying a precomputed insert-time per-KV
325 /// digest (`KvChecksumComputePoint::AtInsert`).
326 ///
327 /// `kv_digest` is `Some((digest, algo))` when the caller computed the
328 /// entry's 4-byte logical-content digest at insert (under `AtInsert` with a
329 /// 4-byte algorithm), or `None` for the plain path. When present, the digest
330 /// and its algorithm are stored in the skiplist node (per node, so a later
331 /// config change cannot misverify it) and the memtable flags that it carries
332 /// at least one digest so [`Self::verify_kv_residence`] knows to walk at
333 /// flush. Mixed inserts (some with, some without a digest) are supported for
334 /// the `Off` -> `AtInsert` live toggle.
335 #[doc(hidden)]
336 pub fn insert_with_kv_digest(
337 &self,
338 item: InternalValue,
339 kv_digest: Option<(u32, crate::runtime_config::ChecksumAlgorithm)>,
340 ) -> (u64, u64) {
341 #[expect(
342 clippy::expect_used,
343 reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
344 )]
345 let item_size = (item.key.user_key.len()
346 + item.value.len()
347 + core::mem::size_of::<InternalValue>()
348 + core::mem::size_of::<SharedComparator>())
349 .try_into()
350 .expect("should fit into u64");
351
352 let size_before = self
353 .approximate_size
354 .fetch_add(item_size, core::sync::atomic::Ordering::AcqRel);
355
356 if kv_digest.is_some() {
357 // Flag that this memtable carries at least one residence digest so
358 // the flush-time verify walks the nodes. The algorithm lives per
359 // node, not here.
360 self.has_at_insert_digests
361 .store(true, core::sync::atomic::Ordering::Relaxed);
362 }
363
364 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
365 self.items
366 .insert_with_kv_digest(&key, &item.value, kv_digest);
367
368 self.highest_seqno
369 .fetch_max(item.key.seqno, core::sync::atomic::Ordering::AcqRel);
370
371 (item_size, size_before + item_size)
372 }
373
374 /// Verifies every insert-time per-KV digest in this memtable against a
375 /// recompute over the entry's current bytes (the
376 /// [`KvChecksumComputePoint::AtInsert`](crate::runtime_config::KvChecksumComputePoint::AtInsert)
377 /// residence check), called once at flush.
378 ///
379 /// Returns `Ok` immediately when no `AtInsert` digest was ever inserted, so
380 /// the default path pays nothing.
381 ///
382 /// # Errors
383 ///
384 /// - [`crate::Error::MemtableKvChecksumMismatch`] when an entry's stored
385 /// digest diverges from the recompute (a RAM bit-flip during residence).
386 /// - [`crate::Error::FeatureUnsupported`] when a node's algorithm is not
387 /// compiled into this build.
388 pub fn verify_kv_residence(&self) -> crate::Result<()> {
389 if !self
390 .has_at_insert_digests
391 .load(core::sync::atomic::Ordering::Relaxed)
392 {
393 return Ok(());
394 }
395 self.items.verify_kv_digests()
396 }
397
398 /// Inserts a range tombstone covering `[start, end)` at the given seqno.
399 ///
400 /// Returns the approximate size added to the memtable.
401 ///
402 /// Returns 0 if `start >= end` or if either bound exceeds `u16::MAX` bytes.
403 ///
404 /// # Panics
405 ///
406 /// Panics if the internal `RwLock` is poisoned.
407 #[must_use]
408 pub fn insert_range_tombstone(&self, start: UserKey, end: UserKey, seqno: SeqNo) -> u64 {
409 // flag_rotated() (which sets requested_rotation) is called by the host
410 // crate (fjall) before rotation; this crate never sets it directly.
411 // The assert catches misuse by callers
412 // in debug builds — intentionally debug-only because post-rotation writes
413 // are structurally prevented by the host (sealed memtables are behind Arc
414 // with no write path exposed), and an atomic load here would add overhead
415 // on the hot insert path in release builds for no practical benefit.
416 debug_assert!(
417 !self.is_flagged_for_rotation(),
418 "insert_range_tombstone called after memtable was flagged for rotation"
419 );
420
421 // Reject invalid intervals in release builds (debug_assert is not enough)
422 if self.comparator.compare(&start, &end) != core::cmp::Ordering::Less {
423 return 0;
424 }
425
426 // On-disk RT format writes key lengths as u16, enforce at insertion time.
427 // Emit a warning when rejecting an oversized bound so this failure is diagnosable.
428 if u16::try_from(start.len()).is_err() || u16::try_from(end.len()).is_err() {
429 log::warn!(
430 "insert_range_tombstone: rejecting oversized range tombstone \
431 bounds (start_len = {}, end_len = {}, max = {})",
432 start.len(),
433 end.len(),
434 u16::MAX,
435 );
436 return 0;
437 }
438
439 let size = (start.len() + end.len() + core::mem::size_of::<RangeTombstone>()) as u64;
440
441 self.range_tombstones
442 .write()
443 .insert(RangeTombstone::new(start, end, seqno));
444
445 self.approximate_size
446 .fetch_add(size, core::sync::atomic::Ordering::AcqRel);
447
448 self.highest_seqno
449 .fetch_max(seqno, core::sync::atomic::Ordering::AcqRel);
450
451 size
452 }
453
454 /// Returns `true` if the key at `key_seqno` is suppressed by a range tombstone
455 /// visible at `read_seqno`.
456 pub(crate) fn is_key_suppressed_by_range_tombstone(
457 &self,
458 key: &[u8],
459 key_seqno: SeqNo,
460 read_seqno: SeqNo,
461 ) -> bool {
462 self.range_tombstones
463 .read()
464 .query_suppression(key, key_seqno, read_seqno)
465 }
466
467 /// Returns all range tombstones in sorted order (for flush).
468 pub(crate) fn range_tombstones_sorted(&self) -> Vec<RangeTombstone> {
469 self.range_tombstones.read().iter_sorted()
470 }
471
472 /// Returns the number of range tombstones.
473 #[must_use]
474 pub fn range_tombstone_count(&self) -> usize {
475 self.range_tombstones.read().len()
476 }
477
478 /// Returns the highest sequence number in the memtable.
479 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
480 if self.is_empty() {
481 None
482 } else {
483 Some(
484 self.highest_seqno
485 .load(core::sync::atomic::Ordering::Acquire),
486 )
487 }
488 }
489}
490
491#[cfg(test)]
492mod tests;