1pub mod arena;
6pub mod interval_tree;
7pub mod skiplist;
8pub mod value_store;
9
10use crate::comparator::SharedComparator;
11use crate::key::InternalKey;
12use crate::range_tombstone::RangeTombstone;
13use crate::{
14 value::{InternalValue, SeqNo},
15 UserKey, ValueType,
16};
17use std::ops::RangeBounds;
18use std::sync::atomic::{AtomicBool, AtomicU64};
19use std::sync::RwLock;
20
21pub use crate::tree::inner::MemtableId;
22
23pub struct Memtable {
27 #[doc(hidden)]
28 pub id: MemtableId,
29
30 pub(crate) comparator: SharedComparator,
32
33 pub(crate) items: skiplist::SkipMap,
39
40 pub(crate) range_tombstones: RwLock<interval_tree::IntervalTree>,
59
60 pub(crate) approximate_size: AtomicU64,
64
65 pub(crate) highest_seqno: AtomicU64,
69
70 pub(crate) requested_rotation: AtomicBool,
71}
72
73impl Memtable {
74 pub fn id(&self) -> MemtableId {
76 self.id
77 }
78
79 pub fn is_flagged_for_rotation(&self) -> bool {
81 self.requested_rotation
82 .load(std::sync::atomic::Ordering::Relaxed)
83 }
84
85 pub fn flag_rotated(&self) {
87 self.requested_rotation
88 .store(true, std::sync::atomic::Ordering::Relaxed);
89 }
90
91 #[doc(hidden)]
96 #[must_use]
97 pub fn new(id: MemtableId, comparator: SharedComparator) -> Self {
98 Self {
99 id,
100 items: skiplist::SkipMap::new(comparator.clone()),
101 comparator,
102 range_tombstones: RwLock::new(interval_tree::IntervalTree::new()),
103 approximate_size: AtomicU64::default(),
104 highest_seqno: AtomicU64::default(),
105 requested_rotation: AtomicBool::default(),
106 }
107 }
108
109 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
111 self.items.iter().map(|entry| InternalValue {
112 key: entry.key(),
113 value: entry.value(),
114 })
115 }
116
117 pub(crate) fn range_internal<'a, R: RangeBounds<InternalKey> + 'a>(
121 &'a self,
122 range: R,
123 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
124 self.items.range(range).map(|entry| InternalValue {
125 key: entry.key(),
126 value: entry.value(),
127 })
128 }
129
130 #[doc(hidden)]
135 pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
136 if seqno == 0 {
137 return None;
138 }
139
140 let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
157
158 let cmp = self.comparator.as_ref();
159
160 let mut iter = self.items.range(lower_bound..).take_while(|entry| {
161 cmp.compare(entry.user_key_bytes(), key) == std::cmp::Ordering::Equal
162 });
163
164 iter.next().map(|entry| InternalValue {
165 key: entry.key(),
166 value: entry.value(),
167 })
168 }
169
170 pub fn size(&self) -> u64 {
172 self.approximate_size
173 .load(std::sync::atomic::Ordering::Acquire)
174 }
175
176 pub fn len(&self) -> usize {
178 self.items.len()
179 }
180
181 #[must_use]
183 pub fn is_empty(&self) -> bool {
184 self.items.is_empty() && self.range_tombstone_count() == 0
185 }
186
187 #[doc(hidden)]
189 pub fn insert(&self, item: InternalValue) -> (u64, u64) {
190 #[expect(
191 clippy::expect_used,
192 reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
193 )]
194 let item_size = (item.key.user_key.len()
196 + item.value.len()
197 + std::mem::size_of::<InternalValue>()
198 + std::mem::size_of::<SharedComparator>())
199 .try_into()
200 .expect("should fit into u64");
201
202 let size_before = self
203 .approximate_size
204 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
205
206 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
207 self.items.insert(&key, &item.value);
208
209 self.highest_seqno
210 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
211
212 (item_size, size_before + item_size)
213 }
214
215 #[must_use]
225 pub fn insert_range_tombstone(&self, start: UserKey, end: UserKey, seqno: SeqNo) -> u64 {
226 debug_assert!(
234 !self.is_flagged_for_rotation(),
235 "insert_range_tombstone called after memtable was flagged for rotation"
236 );
237
238 if start >= end {
240 return 0;
241 }
242
243 if u16::try_from(start.len()).is_err() || u16::try_from(end.len()).is_err() {
246 log::warn!(
247 "insert_range_tombstone: rejecting oversized range tombstone \
248 bounds (start_len = {}, end_len = {}, max = {})",
249 start.len(),
250 end.len(),
251 u16::MAX,
252 );
253 return 0;
254 }
255
256 let size = (start.len() + end.len() + std::mem::size_of::<RangeTombstone>()) as u64;
257
258 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
263 self.range_tombstones
264 .write()
265 .expect("lock is poisoned")
266 .insert(RangeTombstone::new(start, end, seqno));
267
268 self.approximate_size
269 .fetch_add(size, std::sync::atomic::Ordering::AcqRel);
270
271 self.highest_seqno
272 .fetch_max(seqno, std::sync::atomic::Ordering::AcqRel);
273
274 size
275 }
276
277 pub(crate) fn is_key_suppressed_by_range_tombstone(
284 &self,
285 key: &[u8],
286 key_seqno: SeqNo,
287 read_seqno: SeqNo,
288 ) -> bool {
289 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
290 self.range_tombstones
291 .read()
292 .expect("lock is poisoned")
293 .query_suppression(key, key_seqno, read_seqno)
294 }
295
296 pub(crate) fn range_tombstones_sorted(&self) -> Vec<RangeTombstone> {
302 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
303 self.range_tombstones
304 .read()
305 .expect("lock is poisoned")
306 .iter_sorted()
307 }
308
309 #[must_use]
315 pub fn range_tombstone_count(&self) -> usize {
316 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
317 self.range_tombstones
318 .read()
319 .expect("lock is poisoned")
320 .len()
321 }
322
323 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
325 if self.is_empty() {
326 None
327 } else {
328 Some(
329 self.highest_seqno
330 .load(std::sync::atomic::Ordering::Acquire),
331 )
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use crate::comparator::default_comparator;
340 use crate::ValueType;
341 use std::sync::{Arc, Barrier};
342 use test_log::test;
343
344 fn new_memtable(id: MemtableId) -> Memtable {
345 Memtable::new(id, default_comparator())
346 }
347
348 #[test]
349 #[expect(
350 clippy::expect_used,
351 reason = "tests use expect for lock and thread join"
352 )]
353 fn rwlock_read_while_read_held_succeeds() {
354 let mt = new_memtable(0);
355 let _ = mt.insert_range_tombstone(b"a".to_vec().into(), b"z".to_vec().into(), 10);
356
357 let (held_tx, held_rx) = std::sync::mpsc::channel::<()>();
361 let (release_tx, release_rx) = std::sync::mpsc::channel::<()>();
362 let rt_ref = &mt.range_tombstones;
363 std::thread::scope(|s| {
364 s.spawn(move || {
365 let _guard = rt_ref.read().expect("lock is poisoned");
366 let _ = held_tx.send(()); let _ = release_rx.recv(); });
369
370 held_rx
371 .recv()
372 .expect("spawned thread panicked before acquiring guard");
373 let guard2 = mt.range_tombstones.try_read();
374 assert!(
375 guard2.is_ok(),
376 "second read lock must succeed while first is held"
377 );
378 drop(guard2);
379 drop(release_tx); });
381 }
382
383 #[test]
384 #[expect(clippy::expect_used, reason = "tests use expect for thread join")]
385 fn suppression_queries_concurrent_readers_no_panic() {
386 let mt = Arc::new(new_memtable(0));
387
388 let _ = mt.insert_range_tombstone(b"a".to_vec().into(), b"z".to_vec().into(), 10);
389 for i in 0u8..100 {
390 let key = vec![b'a' + (i % 25)];
391 mt.insert(InternalValue::from_components(
392 key,
393 b"v".to_vec(),
394 u64::from(i),
395 ValueType::Value,
396 ));
397 }
398
399 let handles: Vec<_> = (0..8)
400 .map(|t| {
401 let mt = Arc::clone(&mt);
402 std::thread::spawn(move || {
403 for i in 0u8..200 {
404 let key = vec![b'a' + ((t + i) % 25)];
405 let _ = mt.is_key_suppressed_by_range_tombstone(&key, 5, SeqNo::MAX);
406 let _ = mt.range_tombstone_count();
407 }
408 })
409 })
410 .collect();
411
412 for h in handles {
413 h.join().expect("reader thread panicked");
414 }
415 }
416
417 #[test]
418 #[expect(clippy::expect_used, reason = "tests use expect for thread join")]
419 fn range_tombstones_concurrent_read_write_writers_observable() {
420 let mt = Arc::new(new_memtable(0));
421 let start = Arc::new(Barrier::new(6));
423
424 let _ = mt.insert_range_tombstone(b"a".to_vec().into(), b"m".to_vec().into(), 10);
425
426 let readers: Vec<_> = (0..4)
427 .map(|_| {
428 let mt = Arc::clone(&mt);
429 let start = Arc::clone(&start);
430 std::thread::spawn(move || {
431 start.wait();
432 for _ in 0..500 {
433 let suppressed =
434 mt.is_key_suppressed_by_range_tombstone(b"f", 5, SeqNo::MAX);
435 assert!(
436 suppressed,
437 "key 'f' at seqno=5 must be suppressed by RT [a,m)@10"
438 );
439 }
440 })
441 })
442 .collect();
443
444 let writers: Vec<_> = (0..2)
445 .map(|t| {
446 let mt = Arc::clone(&mt);
447 let start = Arc::clone(&start);
448 std::thread::spawn(move || {
449 start.wait();
450 let start_key: UserKey = b"n".to_vec().into();
451 let end_key: UserKey = b"z".to_vec().into();
452 for i in 0u64..100 {
453 let seqno = 100 + t * 1000 + i;
454 let _ =
455 mt.insert_range_tombstone(start_key.clone(), end_key.clone(), seqno);
456 }
457 })
458 })
459 .collect();
460
461 for h in readers {
462 h.join().expect("reader panicked");
463 }
464 for h in writers {
465 h.join().expect("writer panicked");
466 }
467
468 assert!(mt.is_key_suppressed_by_range_tombstone(b"n", 50, SeqNo::MAX));
475 assert!(mt.is_key_suppressed_by_range_tombstone(b"y", 150, SeqNo::MAX));
476 }
477
478 #[test]
479 #[expect(clippy::expect_used, reason = "tests use expect for thread join")]
480 fn range_tombstones_populated_tree_concurrent_reads_succeed() {
481 let mt = Arc::new(new_memtable(0));
482
483 for i in 0u8..50 {
484 let start = vec![b'a' + (i % 25)];
485 let end = vec![b'a' + (i % 25) + 1];
486 let _ = mt.insert_range_tombstone(start.into(), end.into(), u64::from(i));
487 }
488
489 let handles: Vec<_> = (0..8)
490 .map(|_| {
491 let mt = Arc::clone(&mt);
492 std::thread::spawn(move || {
493 for _ in 0..500 {
494 let _ = mt.is_key_suppressed_by_range_tombstone(b"c", 5, SeqNo::MAX);
495 let sorted = mt.range_tombstones_sorted();
496 assert!(!sorted.is_empty());
497 let count = mt.range_tombstone_count();
498 assert!(count > 0);
499 }
500 })
501 })
502 .collect();
503
504 for h in handles {
505 h.join().expect("reader thread panicked");
506 }
507 }
508
509 #[test]
510 #[expect(clippy::unwrap_used)]
511 fn memtable_mvcc_point_read() {
512 let memtable = new_memtable(0);
513
514 memtable.insert(InternalValue::from_components(
515 *b"hello-key-999991",
516 *b"hello-value-999991",
517 0,
518 ValueType::Value,
519 ));
520
521 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
522 assert_eq!(None, item);
523
524 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
525 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
526
527 memtable.insert(InternalValue::from_components(
528 *b"hello-key-999991",
529 *b"hello-value-999991-2",
530 1,
531 ValueType::Value,
532 ));
533
534 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
535 assert_eq!(None, item);
536
537 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
538 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
539
540 let item = memtable.get(b"hello-key-99999", 1);
541 assert_eq!(None, item);
542
543 let item = memtable.get(b"hello-key-999991", 1);
544 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
545
546 let item = memtable.get(b"hello-key-99999", 2);
547 assert_eq!(None, item);
548
549 let item = memtable.get(b"hello-key-999991", 2);
550 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
551 }
552
553 #[test]
554 fn memtable_get() {
555 let memtable = new_memtable(0);
556
557 let value =
558 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
559
560 memtable.insert(value.clone());
561
562 assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
563 }
564
565 #[test]
566 fn memtable_get_highest_seqno() {
567 let memtable = new_memtable(0);
568
569 memtable.insert(InternalValue::from_components(
570 b"abc".to_vec(),
571 b"abc".to_vec(),
572 0,
573 ValueType::Value,
574 ));
575 memtable.insert(InternalValue::from_components(
576 b"abc".to_vec(),
577 b"abc".to_vec(),
578 1,
579 ValueType::Value,
580 ));
581 memtable.insert(InternalValue::from_components(
582 b"abc".to_vec(),
583 b"abc".to_vec(),
584 2,
585 ValueType::Value,
586 ));
587 memtable.insert(InternalValue::from_components(
588 b"abc".to_vec(),
589 b"abc".to_vec(),
590 3,
591 ValueType::Value,
592 ));
593 memtable.insert(InternalValue::from_components(
594 b"abc".to_vec(),
595 b"abc".to_vec(),
596 4,
597 ValueType::Value,
598 ));
599
600 assert_eq!(
601 Some(InternalValue::from_components(
602 b"abc".to_vec(),
603 b"abc".to_vec(),
604 4,
605 ValueType::Value,
606 )),
607 memtable.get(b"abc", SeqNo::MAX)
608 );
609 }
610
611 #[test]
612 fn memtable_get_prefix() {
613 let memtable = new_memtable(0);
614
615 memtable.insert(InternalValue::from_components(
616 b"abc0".to_vec(),
617 b"abc".to_vec(),
618 0,
619 ValueType::Value,
620 ));
621 memtable.insert(InternalValue::from_components(
622 b"abc".to_vec(),
623 b"abc".to_vec(),
624 255,
625 ValueType::Value,
626 ));
627
628 assert_eq!(
629 Some(InternalValue::from_components(
630 b"abc".to_vec(),
631 b"abc".to_vec(),
632 255,
633 ValueType::Value,
634 )),
635 memtable.get(b"abc", SeqNo::MAX)
636 );
637
638 assert_eq!(
639 Some(InternalValue::from_components(
640 b"abc0".to_vec(),
641 b"abc".to_vec(),
642 0,
643 ValueType::Value,
644 )),
645 memtable.get(b"abc0", SeqNo::MAX)
646 );
647 }
648
649 #[test]
650 fn memtable_get_old_version() {
651 let memtable = new_memtable(0);
652
653 memtable.insert(InternalValue::from_components(
654 b"abc".to_vec(),
655 b"abc".to_vec(),
656 0,
657 ValueType::Value,
658 ));
659 memtable.insert(InternalValue::from_components(
660 b"abc".to_vec(),
661 b"abc".to_vec(),
662 99,
663 ValueType::Value,
664 ));
665 memtable.insert(InternalValue::from_components(
666 b"abc".to_vec(),
667 b"abc".to_vec(),
668 255,
669 ValueType::Value,
670 ));
671
672 assert_eq!(
673 Some(InternalValue::from_components(
674 b"abc".to_vec(),
675 b"abc".to_vec(),
676 255,
677 ValueType::Value,
678 )),
679 memtable.get(b"abc", SeqNo::MAX)
680 );
681
682 assert_eq!(
683 Some(InternalValue::from_components(
684 b"abc".to_vec(),
685 b"abc".to_vec(),
686 99,
687 ValueType::Value,
688 )),
689 memtable.get(b"abc", 100)
690 );
691
692 assert_eq!(
693 Some(InternalValue::from_components(
694 b"abc".to_vec(),
695 b"abc".to_vec(),
696 0,
697 ValueType::Value,
698 )),
699 memtable.get(b"abc", 50)
700 );
701 }
702}