1use std::borrow::BorrowMut;
4use std::collections::{BTreeMap, HashSet, VecDeque};
5use std::marker::PhantomData;
6use std::ops::Bound;
7use std::sync::Arc;
8
9use fail::fail_point;
10use hashbrown::HashMap;
11use log::{error, warn};
12use parking_lot::{Mutex, RwLock};
13
14use crate::config::Config;
15use crate::file_pipe_log::ReplayMachine;
16use crate::log_batch::{
17 AtomicGroupStatus, Command, CompressionType, KeyValue, LogBatch, LogItem, LogItemBatch,
18 LogItemContent, OpType,
19};
20use crate::metrics::MEMORY_USAGE;
21use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue};
22use crate::util::{hash_u64, Factory};
23use crate::{Error, GlobalStats, Result};
24
25#[cfg(feature = "swap")]
26mod swap_conditional_imports {
27 use crate::swappy_allocator::SwappyAllocator;
28 use std::convert::TryFrom;
29 use std::path::Path;
30
31 pub trait AllocatorTrait: std::alloc::Allocator + Clone + Send + Sync {}
32 impl<T: std::alloc::Allocator + Clone + Send + Sync> AllocatorTrait for T {}
33
34 pub type VacantAllocator = std::alloc::Global;
35 pub type SelectedAllocator = SwappyAllocator<std::alloc::Global>;
36
37 pub fn new_vacant_allocator() -> VacantAllocator {
38 std::alloc::Global
39 }
40 pub fn new_allocator(cfg: &crate::Config) -> SelectedAllocator {
41 let memory_limit =
42 usize::try_from(cfg.memory_limit.map_or(u64::MAX, |l| l.0)).unwrap_or(usize::MAX);
43 let path = Path::new(&cfg.dir).join("swap");
44 SwappyAllocator::new(&path, memory_limit)
45 }
46}
47
48#[cfg(not(feature = "swap"))]
49mod swap_conditional_imports {
50 pub trait AllocatorTrait: Clone + Send + Sync {}
51
52 #[derive(Clone)]
53 pub struct DummyAllocator;
54 impl AllocatorTrait for DummyAllocator {}
55
56 pub type VacantAllocator = DummyAllocator;
57 pub type SelectedAllocator = DummyAllocator;
58
59 pub fn new_vacant_allocator() -> VacantAllocator {
60 DummyAllocator
61 }
62 pub fn new_allocator(_: &crate::Config) -> SelectedAllocator {
63 DummyAllocator
64 }
65}
66
67use swap_conditional_imports::*;
68
69const CAPACITY_SHRINK_THRESHOLD: usize = 1024 - 1;
71const CAPACITY_INIT: usize = 32 - 1;
72const MEMTABLE_SLOT_COUNT: usize = 128;
74
75#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub struct EntryIndex {
78 pub index: u64,
80
81 pub entries: Option<FileBlockHandle>,
83 pub compression_type: CompressionType,
85
86 pub entry_offset: u32,
88 pub entry_len: u32,
90}
91
92impl Default for EntryIndex {
93 fn default() -> EntryIndex {
94 EntryIndex {
95 index: 0,
96 entries: None,
97 compression_type: CompressionType::None,
98 entry_offset: 0,
99 entry_len: 0,
100 }
101 }
102}
103
104impl EntryIndex {
105 fn from_thin(index: u64, e: ThinEntryIndex) -> Self {
106 Self {
107 index,
108 entries: e.entries,
109 compression_type: e.compression_type,
110 entry_offset: e.entry_offset,
111 entry_len: e.entry_len,
112 }
113 }
114}
115
116#[derive(Debug, Copy, Clone, PartialEq, Eq)]
117struct ThinEntryIndex {
118 entries: Option<FileBlockHandle>,
119 compression_type: CompressionType,
120 entry_offset: u32,
121 entry_len: u32,
122}
123
124impl From<&EntryIndex> for ThinEntryIndex {
125 fn from(e: &EntryIndex) -> Self {
126 Self {
127 entries: e.entries,
128 compression_type: e.compression_type,
129 entry_offset: e.entry_offset,
130 entry_len: e.entry_len,
131 }
132 }
133}
134
135pub struct MemTable<A: AllocatorTrait> {
140 region_id: u64,
142
143 #[cfg(feature = "swap")]
146 entry_indexes: VecDeque<ThinEntryIndex, A>,
147 #[cfg(not(feature = "swap"))]
148 entry_indexes: VecDeque<ThinEntryIndex>,
149 first_index: u64,
151 rewrite_count: usize,
154
155 kvs: BTreeMap<Vec<u8>, (Vec<u8>, FileId)>,
157
158 atomic_group: Option<(FileSeq, FileSeq)>,
167
168 global_stats: Arc<GlobalStats>,
170
171 _phantom: PhantomData<A>,
172}
173
174impl MemTable<VacantAllocator> {
175 #[allow(dead_code)]
176 fn new(region_id: u64, global_stats: Arc<GlobalStats>) -> MemTable<VacantAllocator> {
177 Self::with_allocator(region_id, global_stats, &new_vacant_allocator())
178 }
179}
180
181impl<A: AllocatorTrait> MemTable<A> {
182 fn with_allocator(
183 region_id: u64,
184 global_stats: Arc<GlobalStats>,
185 _allocator: &A,
186 ) -> MemTable<A> {
187 MemTable {
188 region_id,
189 #[cfg(feature = "swap")]
190 entry_indexes: VecDeque::with_capacity_in(CAPACITY_INIT, _allocator.clone()),
191 #[cfg(not(feature = "swap"))]
192 entry_indexes: VecDeque::with_capacity(CAPACITY_INIT),
193 first_index: 0,
194 rewrite_count: 0,
195 kvs: BTreeMap::default(),
196 atomic_group: None,
197 global_stats,
198 _phantom: PhantomData,
199 }
200 }
201
202 pub fn merge_newer_neighbor(&mut self, rhs: &mut Self) {
206 debug_assert_eq!(self.region_id, rhs.region_id);
207 if let Some((rhs_first, _)) = rhs.span() {
208 self.prepare_append(
209 rhs_first,
210 rhs.rewrite_count > 0, true, );
217 self.global_stats.add(
218 rhs.entry_indexes[0].entries.unwrap().id.queue,
219 rhs.entry_indexes.len(),
220 );
221 self.rewrite_count += rhs.rewrite_count;
222 self.entry_indexes.append(&mut rhs.entry_indexes);
223 rhs.rewrite_count = 0;
224 }
225
226 for (key, (value, file_id)) in rhs.kvs.iter() {
227 self.put(key.clone(), value.clone(), *file_id);
228 }
229
230 if let Some(g) = rhs.atomic_group.take() {
231 assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0));
232 self.atomic_group = Some(g);
233 }
234
235 let deleted = rhs.global_stats.deleted_rewrite_entries();
236 self.global_stats.add(LogQueue::Rewrite, deleted);
237 self.global_stats.delete(LogQueue::Rewrite, deleted);
238 }
239
240 pub fn merge_append_table(&mut self, rhs: &mut Self) {
245 debug_assert_eq!(self.region_id, rhs.region_id);
246 debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
247 debug_assert_eq!(rhs.rewrite_count, 0);
248
249 if let Some((first, _)) = rhs.span() {
250 self.prepare_append(
251 first,
252 true, true, );
259 self.global_stats.add(
260 rhs.entry_indexes[0].entries.unwrap().id.queue,
261 rhs.entry_indexes.len(),
262 );
263 self.entry_indexes.append(&mut rhs.entry_indexes);
264 }
265
266 for (key, (value, file_id)) in rhs.kvs.iter() {
267 self.put(key.clone(), value.clone(), *file_id);
268 }
269
270 assert!(rhs.atomic_group.is_none());
271
272 let deleted = rhs.global_stats.deleted_rewrite_entries();
273 self.global_stats.add(LogQueue::Rewrite, deleted);
274 self.global_stats.delete(LogQueue::Rewrite, deleted);
275 }
276
277 pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
279 self.kvs.get(key).map(|v| v.0.clone())
280 }
281
282 pub fn scan<F>(
285 &self,
286 start_key: Option<&[u8]>,
287 end_key: Option<&[u8]>,
288 reverse: bool,
289 mut f: F,
290 ) -> Result<()>
291 where
292 F: FnMut(&[u8], &[u8]) -> bool,
293 {
294 let lower = start_key.map(Bound::Included).unwrap_or(Bound::Unbounded);
295 let upper = end_key.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
296 let iter = self.kvs.range::<[u8], _>((lower, upper));
297 if reverse {
298 for (key, (value, _)) in iter.rev() {
299 if !f(key, value) {
300 break;
301 }
302 }
303 } else {
304 for (key, (value, _)) in iter {
305 if !f(key, value) {
306 break;
307 }
308 }
309 }
310 Ok(())
311 }
312
313 pub fn delete(&mut self, key: &[u8]) {
315 if let Some(value) = self.kvs.remove(key) {
316 self.global_stats.delete(value.1.queue, 1);
317 }
318 }
319
320 pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>, file_id: FileId) {
323 if let Some(origin) = self.kvs.insert(key, (value, file_id)) {
324 self.global_stats.delete(origin.1.queue, 1);
325 }
326 self.global_stats.add(file_id.queue, 1);
327 }
328
329 pub fn rewrite_key(&mut self, key: Vec<u8>, gate: Option<FileSeq>, seq: FileSeq) {
335 self.global_stats.add(LogQueue::Rewrite, 1);
336 if let Some(origin) = self.kvs.get_mut(&key) {
337 if origin.1.queue == LogQueue::Append {
338 if let Some(gate) = gate {
339 if origin.1.seq <= gate {
340 origin.1 = FileId {
341 queue: LogQueue::Rewrite,
342 seq,
343 };
344 self.global_stats.delete(LogQueue::Append, 1);
345 return;
346 }
347 }
348 } else {
349 assert!(origin.1.seq <= seq);
350 origin.1.seq = seq;
351 }
352 }
353 self.global_stats.delete(LogQueue::Rewrite, 1);
354 }
355
356 pub fn get_entry(&self, index: u64) -> Option<EntryIndex> {
358 if let Some((first, last)) = self.span() {
359 if index < first || index > last {
360 return None;
361 }
362
363 let ioffset = (index - first) as usize;
364 let entry_index = self.entry_indexes[ioffset];
365 Some(EntryIndex::from_thin(index, entry_index))
366 } else {
367 None
368 }
369 }
370
371 pub fn append(&mut self, entry_indexes: Vec<EntryIndex>) {
383 let len = entry_indexes.len();
384 if len > 0 {
385 self.prepare_append(
386 entry_indexes[0].index,
387 false, false, );
390 self.global_stats.add(LogQueue::Append, len);
391 for ei in &entry_indexes {
392 self.entry_indexes.push_back(ei.into());
393 }
394 }
395 }
396
397 pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
402 let len = entry_indexes.len();
403 if len > 0 {
404 debug_assert_eq!(self.rewrite_count, 0);
405 self.prepare_append(
406 entry_indexes[0].index,
407 false, true, );
411 self.global_stats.add(LogQueue::Append, len);
412 for ei in &entry_indexes {
413 debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
414 self.entry_indexes.push_back(ei.into());
415 }
416 }
417 }
418
419 pub fn rewrite(&mut self, rewrite_indexes: Vec<EntryIndex>, gate: Option<FileSeq>) {
429 if rewrite_indexes.is_empty() {
430 return;
431 }
432 self.global_stats
433 .add(LogQueue::Rewrite, rewrite_indexes.len());
434
435 let len = self.entry_indexes.len();
436 if len == 0 {
437 self.global_stats
438 .delete(LogQueue::Rewrite, rewrite_indexes.len());
439 return;
440 }
441
442 let first = self.first_index;
443 let last = self.first_index + len as u64 - 1;
444 let rewrite_first = std::cmp::max(rewrite_indexes[0].index, first);
445 let rewrite_last = std::cmp::min(rewrite_indexes[rewrite_indexes.len() - 1].index, last);
446 let mut rewrite_len = (rewrite_last + 1).saturating_sub(rewrite_first) as usize;
447 if rewrite_len == 0 {
448 self.global_stats
449 .delete(LogQueue::Rewrite, rewrite_indexes.len());
450 return;
451 }
452
453 let pos = (rewrite_first - first) as usize;
454 assert!(
456 pos == 0 || self.entry_indexes[pos - 1].entries.unwrap().id.queue == LogQueue::Rewrite
457 );
458 let rewrite_pos = (rewrite_first - rewrite_indexes[0].index) as usize;
459
460 for (i, rindex) in rewrite_indexes[rewrite_pos..rewrite_pos + rewrite_len]
461 .iter()
462 .enumerate()
463 {
464 let index = &mut self.entry_indexes[i + pos];
465 if let Some(gate) = gate {
466 debug_assert_eq!(index.entries.unwrap().id.queue, LogQueue::Append);
467 if index.entries.unwrap().id.seq > gate {
468 rewrite_len = i;
470 break;
471 }
472 } else if index.entries.unwrap().id.queue == LogQueue::Append {
473 rewrite_len = i;
475 break;
476 }
477
478 *index = rindex.into();
479 }
480
481 if gate.is_none() {
482 self.global_stats
485 .delete(LogQueue::Rewrite, rewrite_indexes.len());
486 self.rewrite_count = std::cmp::max(self.rewrite_count, pos + rewrite_len);
488 } else {
489 self.global_stats.delete(LogQueue::Append, rewrite_len);
490 self.global_stats
491 .delete(LogQueue::Rewrite, rewrite_indexes.len() - rewrite_len);
492 assert!(pos + rewrite_len >= self.rewrite_count);
494 self.rewrite_count = pos + rewrite_len;
495 }
496 }
497
498 pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
503 let len = entry_indexes.len();
504 if len > 0 {
505 debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
506 self.prepare_append(
507 entry_indexes[0].index,
508 true, true, );
515 self.global_stats.add(LogQueue::Rewrite, len);
516 for ei in &entry_indexes {
517 self.entry_indexes.push_back(ei.into());
518 }
519 self.rewrite_count = self.entry_indexes.len();
520 }
521 }
522
523 pub fn compact_to(&mut self, index: u64) -> u64 {
526 if self.entry_indexes.is_empty() {
527 return 0;
528 }
529 let first = self.first_index;
530 if index <= first {
531 return 0;
532 }
533 let count = std::cmp::min((index - first) as usize, self.entry_indexes.len());
534 self.first_index = index;
535 self.entry_indexes.drain(..count);
536 self.maybe_shrink_entry_indexes();
537
538 let compacted_rewrite = std::cmp::min(count, self.rewrite_count);
539 self.rewrite_count -= compacted_rewrite;
540 self.global_stats
541 .delete(LogQueue::Rewrite, compacted_rewrite);
542 self.global_stats
543 .delete(LogQueue::Append, count - compacted_rewrite);
544 count as u64
545 }
546
547 pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
548 assert!(self.atomic_group.map_or(true, |(_, b)| b <= start));
549 self.atomic_group = Some((start, end));
550 }
551
552 fn unsafe_truncate_back(&mut self, first: u64, index: u64, last: u64) -> usize {
557 debug_assert!(index <= last);
558 let len = self.entry_indexes.len();
559 debug_assert_eq!(len as u64, last - first + 1);
560 self.entry_indexes
561 .truncate(index.saturating_sub(first) as usize);
562 let new_len = self.entry_indexes.len();
563 let truncated = len - new_len;
564
565 if self.rewrite_count > new_len {
566 let truncated_rewrite = self.rewrite_count - new_len;
567 self.rewrite_count = new_len;
568 self.global_stats
569 .delete(LogQueue::Rewrite, truncated_rewrite);
570 self.global_stats
571 .delete(LogQueue::Append, truncated - truncated_rewrite);
572 } else {
573 self.global_stats.delete(LogQueue::Append, truncated);
574 }
575 truncated
576 }
577
578 #[inline]
589 fn prepare_append(
590 &mut self,
591 first_index_to_add: u64,
592 allow_hole: bool,
593 allow_overwrite_compacted: bool,
594 ) {
595 if let Some((first, last)) = self.span() {
596 if first_index_to_add < first {
597 if allow_overwrite_compacted {
598 self.unsafe_truncate_back(first, 0, last);
599 } else {
600 panic!(
601 "attempt to overwrite compacted entries in {}",
602 self.region_id
603 );
604 }
605 self.first_index = first_index_to_add;
606 } else if last + 1 < first_index_to_add {
607 if allow_hole {
608 self.unsafe_truncate_back(first, 0, last);
609 } else {
610 panic!("memtable {} has a hole", self.region_id);
611 }
612 self.first_index = first_index_to_add;
613 } else if first_index_to_add != last + 1 {
614 self.unsafe_truncate_back(first, first_index_to_add, last);
615 }
616 } else {
617 self.first_index = first_index_to_add;
618 }
619 }
620
621 #[inline]
622 fn maybe_shrink_entry_indexes(&mut self) {
623 if self.entry_indexes.capacity() >= CAPACITY_SHRINK_THRESHOLD {
624 self.entry_indexes.shrink_to_fit();
625 }
626 }
627
628 pub fn fetch_entries_to(
634 &self,
635 begin: u64,
636 end: u64,
637 max_size: Option<usize>,
638 vec_idx: &mut Vec<EntryIndex>,
639 ) -> Result<()> {
640 if end <= begin {
641 return Ok(());
642 }
643 let len = self.entry_indexes.len();
644 if len == 0 {
645 return Err(Error::EntryNotFound);
646 }
647 let first = self.first_index;
648 if begin < first {
649 return Err(Error::EntryCompacted);
650 }
651 if end > self.first_index + len as u64 {
652 return Err(Error::EntryNotFound);
653 }
654
655 let start_pos = (begin - first) as usize;
656 let end_pos = (end - begin) as usize + start_pos;
657
658 let mut total_size = 0;
659 let mut index = begin;
660 for idx in self.entry_indexes.range(start_pos..end_pos) {
661 total_size += idx.entry_len;
662 if let Some(max_size) = max_size {
664 if total_size as usize > max_size && total_size > idx.entry_len {
665 break;
666 }
667 }
668 vec_idx.push(EntryIndex::from_thin(index, *idx));
669 index += 1;
670 }
671 Ok(())
672 }
673
674 pub fn fetch_entry_indexes_before(
677 &self,
678 gate: FileSeq,
679 vec_idx: &mut Vec<EntryIndex>,
680 ) -> Result<()> {
681 if let Some((first, last)) = self.span() {
682 let mut i = self.rewrite_count;
683 while first + i as u64 <= last && self.entry_indexes[i].entries.unwrap().id.seq <= gate
684 {
685 vec_idx.push(EntryIndex::from_thin(
686 first + i as u64,
687 self.entry_indexes[i],
688 ));
689 i += 1;
690 }
691 }
692 Ok(())
693 }
694
695 pub fn fetch_rewritten_entry_indexes(&self, vec_idx: &mut Vec<EntryIndex>) -> Result<()> {
697 if self.rewrite_count > 0 {
698 let first = self.first_index;
699 let end = self.first_index + self.rewrite_count as u64;
700 self.fetch_entries_to(first, end, None, vec_idx)
701 } else {
702 Ok(())
703 }
704 }
705
706 pub fn fetch_kvs_before(&self, gate: FileSeq, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
709 for (key, (value, file_id)) in &self.kvs {
710 if file_id.queue == LogQueue::Append && file_id.seq <= gate {
711 vec.push((key.clone(), value.clone()));
712 }
713 }
714 }
715
716 pub fn fetch_rewritten_kvs(&self, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
718 for (key, (value, file_id)) in &self.kvs {
719 if file_id.queue == LogQueue::Rewrite {
720 vec.push((key.clone(), value.clone()));
721 }
722 }
723 }
724
725 pub fn min_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
728 let entry = match queue {
729 LogQueue::Append => self.entry_indexes.get(self.rewrite_count),
730 LogQueue::Rewrite if self.rewrite_count == 0 => None,
731 LogQueue::Rewrite => self.entry_indexes.front(),
732 };
733 let ents_min = entry.map(|e| e.entries.unwrap().id.seq);
734 let kvs_min = self
735 .kvs
736 .values()
737 .filter(|v| v.1.queue == queue)
738 .fold(None, |min, v| {
739 if let Some(min) = min {
740 Some(std::cmp::min(min, v.1.seq))
741 } else {
742 Some(v.1.seq)
743 }
744 });
745 let res = match (ents_min, kvs_min) {
746 (Some(ents_min), Some(kvs_min)) => std::cmp::min(kvs_min, ents_min),
747 (Some(ents_min), None) => ents_min,
748 (None, Some(kvs_min)) => kvs_min,
749 (None, None) => return None,
750 };
751 if queue == LogQueue::Rewrite {
752 if let Some((start, end)) = self.atomic_group {
753 if res <= end {
754 return Some(std::cmp::min(start, res));
755 }
756 }
757 }
758 Some(res)
759 }
760
761 #[inline]
762 pub fn has_at_least_some_entries_before(&self, gate: FileId, count: usize) -> bool {
763 debug_assert!(count > 0);
764 self.entry_indexes
765 .get(count - 1)
766 .map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
767 }
768
769 pub fn region_id(&self) -> u64 {
771 self.region_id
772 }
773
774 pub(crate) fn rewrite_count(&self) -> usize {
775 self.rewrite_count
776 }
777
778 pub fn first_index(&self) -> Option<u64> {
780 self.span().map(|s| s.0)
781 }
782
783 pub fn last_index(&self) -> Option<u64> {
785 self.span().map(|s| s.1)
786 }
787
788 #[allow(dead_code)]
789 fn heap_size(&self) -> usize {
790 self.entry_indexes.capacity() * std::mem::size_of::<EntryIndex>()
792 }
793
794 #[inline]
796 fn span(&self) -> Option<(u64, u64)> {
797 let len = self.entry_indexes.len();
798 if len > 0 {
799 Some((self.first_index, self.first_index + len as u64 - 1))
800 } else {
801 None
802 }
803 }
804
805 #[cfg(test)]
806 fn consistency_check(&self) {
807 let mut seen_append = false;
808 for idx in self.entry_indexes.iter() {
809 let queue = idx.entries.unwrap().id.queue;
811 if queue == LogQueue::Append {
812 seen_append = true;
813 }
814 assert_eq!(
815 queue,
816 if seen_append {
817 LogQueue::Append
818 } else {
819 LogQueue::Rewrite
820 }
821 );
822 }
823 }
824}
825
826impl<A: AllocatorTrait> Drop for MemTable<A> {
827 fn drop(&mut self) {
828 let mut append_kvs = 0;
829 let mut rewrite_kvs = 0;
830 for (_v, id) in self.kvs.values() {
831 match id.queue {
832 LogQueue::Rewrite => rewrite_kvs += 1,
833 LogQueue::Append => append_kvs += 1,
834 }
835 }
836
837 self.global_stats
838 .delete(LogQueue::Rewrite, self.rewrite_count + rewrite_kvs);
839 self.global_stats.delete(
840 LogQueue::Append,
841 self.entry_indexes.len() - self.rewrite_count + append_kvs,
842 );
843 }
844}
845
846type MemTableMap<A> = HashMap<u64, Arc<RwLock<MemTable<A>>>>;
847pub type MemTableHandle = Arc<RwLock<MemTable<SelectedAllocator>>>;
848pub type MemTables = MemTableAccessor<SelectedAllocator>;
849
850#[derive(Clone)]
855pub struct MemTableAccessor<A: AllocatorTrait> {
856 global_stats: Arc<GlobalStats>,
857 allocator: A,
858
859 slots: Vec<Arc<RwLock<MemTableMap<A>>>>,
861 removed_memtables: Arc<Mutex<VecDeque<u64>>>,
863}
864
865impl MemTableAccessor<VacantAllocator> {
866 pub fn new(global_stats: Arc<GlobalStats>) -> MemTableAccessor<VacantAllocator> {
867 let mut slots = Vec::with_capacity(MEMTABLE_SLOT_COUNT);
868 for _ in 0..MEMTABLE_SLOT_COUNT {
869 slots.push(Arc::new(RwLock::new(MemTableMap::default())));
870 }
871 MemTableAccessor {
872 global_stats,
873 allocator: new_vacant_allocator(),
874 slots,
875 removed_memtables: Default::default(),
876 }
877 }
878}
879
880impl MemTableAccessor<SelectedAllocator> {
881 pub fn memory_usage(&self) -> usize {
882 #[cfg(not(feature = "swap"))]
883 {
884 let mut total = 0;
885 for tables in &self.slots {
886 tables.read().values().for_each(|t| {
887 total += t.read().heap_size();
888 });
889 }
890 total
891 }
892 #[cfg(feature = "swap")]
893 {
894 self.allocator.memory_usage()
895 }
896 }
897
898 pub(crate) fn flush_metrics(&self) {
899 MEMORY_USAGE.set(self.memory_usage() as i64);
900 }
901}
902
903impl<A: AllocatorTrait> MemTableAccessor<A> {
904 pub fn new_with_allocator(global_stats: Arc<GlobalStats>, allocator: A) -> MemTableAccessor<A> {
905 let mut slots = Vec::with_capacity(MEMTABLE_SLOT_COUNT);
906 for _ in 0..MEMTABLE_SLOT_COUNT {
907 slots.push(Arc::new(RwLock::new(MemTableMap::default())));
908 }
909 MemTableAccessor {
910 global_stats,
911 allocator,
912 slots,
913 removed_memtables: Default::default(),
914 }
915 }
916
917 pub fn get_or_insert(&self, raft_group_id: u64) -> Arc<RwLock<MemTable<A>>> {
918 let global_stats = self.global_stats.clone();
919 let mut memtables = self.slots[Self::slot_index(raft_group_id)].write();
920 let memtable = memtables.entry(raft_group_id).or_insert_with(|| {
921 let memtable =
922 MemTable::with_allocator(raft_group_id, global_stats.clone(), &self.allocator);
923 Arc::new(RwLock::new(memtable))
924 });
925 memtable.clone()
926 }
927
928 pub fn get(&self, raft_group_id: u64) -> Option<Arc<RwLock<MemTable<A>>>> {
929 self.slots[Self::slot_index(raft_group_id)]
930 .read()
931 .get(&raft_group_id)
932 .cloned()
933 }
934
935 pub fn insert(&self, raft_group_id: u64, memtable: Arc<RwLock<MemTable<A>>>) {
936 self.slots[Self::slot_index(raft_group_id)]
937 .write()
938 .insert(raft_group_id, memtable);
939 }
940
941 pub fn remove(&self, raft_group_id: u64, record_tombstone: bool) {
942 self.slots[Self::slot_index(raft_group_id)]
943 .write()
944 .remove(&raft_group_id);
945 if record_tombstone {
946 let mut removed_memtables = self.removed_memtables.lock();
947 removed_memtables.push_back(raft_group_id);
948 }
949 }
950
951 pub fn fold<B, F: Fn(B, &MemTable<A>) -> B>(&self, mut init: B, fold: F) -> B {
952 for tables in &self.slots {
953 for memtable in tables.read().values() {
954 init = fold(init, &*memtable.read());
955 }
956 }
957 init
958 }
959
960 pub fn collect<F: FnMut(&MemTable<A>) -> bool>(
961 &self,
962 mut condition: F,
963 ) -> Vec<Arc<RwLock<MemTable<A>>>> {
964 let mut memtables = Vec::new();
965 for tables in &self.slots {
966 memtables.extend(tables.read().values().filter_map(|t| {
967 if condition(&*t.read()) {
968 return Some(t.clone());
969 }
970 None
971 }));
972 }
973 memtables
974 }
975
976 pub fn take_cleaned_region_logs(&self) -> LogBatch {
980 let mut log_batch = LogBatch::default();
981 let mut removed_memtables = self.removed_memtables.lock();
982 for id in removed_memtables.drain(..) {
983 log_batch.add_command(id, Command::Clean);
984 }
985 log_batch
986 }
987
988 #[cfg(test)]
993 pub fn cleaned_region_ids(&self) -> HashSet<u64> {
994 let mut ids = HashSet::default();
995 let removed_memtables = self.removed_memtables.lock();
996 for raft_id in removed_memtables.iter() {
997 ids.insert(*raft_id);
998 }
999 ids
1000 }
1001
1002 pub fn is_empty(&self) -> bool {
1004 for i in 0..MEMTABLE_SLOT_COUNT {
1005 if !self.slots[i].read().is_empty() {
1006 return false;
1007 }
1008 }
1009 true
1010 }
1011
1012 pub fn merge_newer_neighbor(&self, mut rhs: Self) {
1016 for slot in rhs.slots.iter_mut() {
1017 for (raft_group_id, memtable) in slot.write().drain() {
1018 self.get_or_insert(raft_group_id)
1019 .write()
1020 .merge_newer_neighbor(memtable.write().borrow_mut());
1021 }
1022 }
1023 }
1026
1027 pub fn merge_append_table(&self, mut rhs: Self) {
1032 for slot in rhs.slots.iter_mut() {
1033 for (id, memtable) in std::mem::take(&mut *slot.write()) {
1034 if let Some(existing_memtable) = self.get(id) {
1035 existing_memtable
1036 .write()
1037 .merge_append_table(&mut *memtable.write());
1038 } else {
1039 self.insert(id, memtable);
1040 }
1041 }
1042 }
1043 debug_assert_eq!(
1045 self.removed_memtables.lock().len(),
1046 rhs.removed_memtables.lock().len()
1047 );
1048 }
1049
1050 pub fn apply_append_writes(&self, log_items: impl Iterator<Item = LogItem>) {
1052 for item in log_items {
1053 if has_internal_key(&item) {
1054 continue;
1055 }
1056 let raft = item.raft_group_id;
1057 let memtable = self.get_or_insert(raft);
1058 fail_point!(
1059 "memtable_accessor::apply_append_writes::region_3",
1060 raft == 3,
1061 |_| {}
1062 );
1063 match item.content {
1064 LogItemContent::EntryIndexes(entries_to_add) => {
1065 memtable.write().append(entries_to_add.0);
1066 }
1067 LogItemContent::Command(Command::Clean) => {
1068 self.remove(raft, true );
1069 }
1070 LogItemContent::Command(Command::Compact { index }) => {
1071 memtable.write().compact_to(index);
1072 }
1073 LogItemContent::Kv(kv) => match kv.op_type {
1074 OpType::Put => {
1075 let value = kv.value.unwrap();
1076 memtable.write().put(kv.key, value, kv.file_id.unwrap());
1077 }
1078 OpType::Del => {
1079 let key = kv.key;
1080 memtable.write().delete(key.as_slice());
1081 }
1082 },
1083 }
1084 }
1085 }
1086
1087 pub fn replay_append_writes(&self, log_items: impl Iterator<Item = LogItem>) {
1092 for item in log_items {
1093 if has_internal_key(&item) {
1094 continue;
1095 }
1096 let raft = item.raft_group_id;
1097 let memtable = self.get_or_insert(raft);
1098 match item.content {
1099 LogItemContent::EntryIndexes(entries_to_add) => {
1100 memtable.write().replay_append(entries_to_add.0);
1101 }
1102 LogItemContent::Command(Command::Clean) => {
1103 self.remove(raft, true );
1104 }
1105 LogItemContent::Command(Command::Compact { index }) => {
1106 memtable.write().compact_to(index);
1107 }
1108 LogItemContent::Kv(kv) => match kv.op_type {
1109 OpType::Put => {
1110 let value = kv.value.unwrap();
1111 memtable.write().put(kv.key, value, kv.file_id.unwrap());
1112 }
1113 OpType::Del => {
1114 let key = kv.key;
1115 memtable.write().delete(key.as_slice());
1116 }
1117 },
1118 }
1119 }
1120 }
1121
1122 pub fn apply_rewrite_writes(
1124 &self,
1125 log_items: impl Iterator<Item = LogItem>,
1126 watermark: Option<FileSeq>,
1127 new_file: FileSeq,
1128 ) {
1129 for item in log_items {
1130 if has_internal_key(&item) {
1131 continue;
1132 }
1133 let raft = item.raft_group_id;
1134 let memtable = self.get_or_insert(raft);
1135 match item.content {
1136 LogItemContent::EntryIndexes(entries_to_add) => {
1137 memtable.write().rewrite(entries_to_add.0, watermark);
1138 }
1139 LogItemContent::Kv(kv) => match kv.op_type {
1140 OpType::Put => {
1141 let key = kv.key;
1142 memtable.write().rewrite_key(key, watermark, new_file);
1143 }
1144 _ => unreachable!(),
1145 },
1146 LogItemContent::Command(Command::Clean) => {}
1147 _ => unreachable!(),
1148 }
1149 }
1150 }
1151
1152 pub fn replay_rewrite_writes(&self, log_items: impl Iterator<Item = LogItem>) {
1157 for item in log_items {
1158 if has_internal_key(&item) {
1159 continue;
1160 }
1161 let raft = item.raft_group_id;
1162 let memtable = self.get_or_insert(raft);
1163 match item.content {
1164 LogItemContent::EntryIndexes(entries_to_add) => {
1165 memtable.write().replay_rewrite(entries_to_add.0);
1166 }
1167 LogItemContent::Command(Command::Clean) => {
1168 self.remove(raft, false );
1170 }
1171 LogItemContent::Command(Command::Compact { index }) => {
1172 memtable.write().compact_to(index);
1173 }
1174 LogItemContent::Kv(kv) => match kv.op_type {
1175 OpType::Put => {
1176 let value = kv.value.unwrap();
1177 memtable.write().put(kv.key, value, kv.file_id.unwrap());
1178 }
1179 OpType::Del => {
1180 let key = kv.key;
1181 memtable.write().delete(key.as_slice());
1182 }
1183 },
1184 }
1185 }
1186 }
1187
1188 pub fn apply_rewrite_atomic_group(&self, raft: u64, start: FileSeq, end: FileSeq) {
1189 let memtable = self.get_or_insert(raft);
1190 memtable.write().apply_rewrite_atomic_group(start, end);
1191 }
1192
1193 #[inline]
1194 fn slot_index(id: u64) -> usize {
1195 debug_assert!(MEMTABLE_SLOT_COUNT.is_power_of_two());
1196 hash_u64(id) as usize & (MEMTABLE_SLOT_COUNT - 1)
1197 }
1198}
1199
1200#[inline]
1201fn has_internal_key(item: &LogItem) -> bool {
1202 matches!(&item.content, LogItemContent::Kv(KeyValue { key, .. }) if crate::is_internal_key(key, None))
1203}
1204
1205struct PendingAtomicGroup {
1206 status: AtomicGroupStatus,
1207 items: Vec<LogItem>,
1208 tombstone_items: Vec<LogItem>,
1209 start: FileSeq,
1210 end: FileSeq,
1211}
1212
1213pub struct MemTableRecoverContext<A: AllocatorTrait> {
1214 stats: Arc<GlobalStats>,
1215 tombstone_items: Vec<LogItem>,
1217 memtables: MemTableAccessor<A>,
1218
1219 pending_atomic_groups: HashMap<u64, Vec<PendingAtomicGroup>>,
1222}
1223
1224impl MemTableRecoverContext<VacantAllocator> {
1225 fn new() -> Self {
1226 let stats = Arc::new(GlobalStats::default());
1227 Self {
1228 stats: stats.clone(),
1229 tombstone_items: Vec::new(),
1230 memtables: MemTableAccessor::new(stats),
1231 pending_atomic_groups: HashMap::new(),
1232 }
1233 }
1234}
1235
1236impl<A: AllocatorTrait> MemTableRecoverContext<A> {
1237 fn new_with_allocator(allocator: A) -> Self {
1238 let stats = Arc::new(GlobalStats::default());
1239 Self {
1240 stats: stats.clone(),
1241 tombstone_items: Vec::new(),
1242 memtables: MemTableAccessor::new_with_allocator(stats, allocator),
1243 pending_atomic_groups: HashMap::new(),
1244 }
1245 }
1246
1247 pub fn finish(self) -> (MemTableAccessor<A>, Arc<GlobalStats>) {
1248 (self.memtables, self.stats)
1249 }
1250
1251 pub fn merge_append_context(&self, append: MemTableRecoverContext<A>) {
1252 self.memtables
1253 .apply_append_writes(append.tombstone_items.into_iter());
1254 self.memtables.merge_append_table(append.memtables);
1255 }
1256
1257 #[inline]
1258 fn is_tombstone(item: &LogItem) -> bool {
1259 match &item.content {
1260 LogItemContent::Command(Command::Clean)
1261 | LogItemContent::Command(Command::Compact { .. }) => true,
1262 LogItemContent::Kv(KeyValue { op_type, .. }) if *op_type == OpType::Del => true,
1263 _ => false,
1264 }
1265 }
1266
1267 fn accept_new_group(&mut self, queue: LogQueue, id: u64, mut new_group: PendingAtomicGroup) {
1268 assert_eq!(queue, LogQueue::Rewrite);
1269 if let Some(groups) = self.pending_atomic_groups.get_mut(&id) {
1270 let group = groups.last_mut().unwrap();
1271 match (group.status, new_group.status) {
1272 (AtomicGroupStatus::End, AtomicGroupStatus::Begin) => {
1273 groups.push(new_group);
1274 }
1275 (_, AtomicGroupStatus::Begin) => {
1277 warn!(
1278 "discard old atomic group, status: {:?}, raft_group_id: {:?}",
1279 group.status,
1280 group.items.first().map(|item| item.raft_group_id)
1281 );
1282 *group = new_group;
1283 }
1284 (AtomicGroupStatus::End, _) => {
1286 warn!(
1287 "discard new atomic group, status: {:?}, raft_group_id: {:?}",
1288 new_group.status,
1289 new_group.items.first().map(|item| item.raft_group_id)
1290 );
1291 }
1292 (AtomicGroupStatus::Begin, AtomicGroupStatus::Middle)
1293 | (AtomicGroupStatus::Middle, AtomicGroupStatus::Middle) => {
1294 group.items.append(&mut new_group.items);
1295 group.tombstone_items.append(&mut new_group.tombstone_items);
1296 assert!(group.end <= new_group.start);
1297 group.end = new_group.end;
1298 }
1299 (AtomicGroupStatus::Middle, AtomicGroupStatus::End) => {
1300 group.items.append(&mut new_group.items);
1301 group.tombstone_items.append(&mut new_group.tombstone_items);
1302 group.status = new_group.status;
1303 assert!(group.end <= new_group.start);
1304 group.end = new_group.end;
1305 }
1306 (AtomicGroupStatus::Begin, AtomicGroupStatus::End) => {
1307 let mut group = groups.pop().unwrap();
1308 let mut rids = HashSet::with_capacity(1);
1309 for item in group
1310 .items
1311 .iter()
1312 .chain(group.tombstone_items.iter())
1313 .chain(new_group.items.iter())
1314 .chain(new_group.tombstone_items.iter())
1315 {
1316 rids.insert(item.raft_group_id);
1317 }
1318 self.tombstone_items.append(&mut group.tombstone_items);
1319 self.tombstone_items.append(&mut new_group.tombstone_items);
1320 self.memtables
1321 .replay_rewrite_writes(group.items.into_iter());
1322 self.memtables
1323 .replay_rewrite_writes(new_group.items.into_iter());
1324 assert!(group.end <= new_group.start);
1325 for rid in rids {
1326 self.memtables
1327 .apply_rewrite_atomic_group(rid, group.start, new_group.end);
1328 }
1329 }
1330 }
1331 if groups.is_empty() {
1332 self.pending_atomic_groups.remove(&id);
1333 }
1334 } else {
1335 self.pending_atomic_groups.insert(id, vec![new_group]);
1336 }
1337 }
1338}
1339
1340impl Default for MemTableRecoverContext<VacantAllocator> {
1341 fn default() -> Self {
1342 Self::new()
1343 }
1344}
1345
1346impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
1347 fn replay(&mut self, mut item_batch: LogItemBatch, file_id: FileId) -> Result<()> {
1348 if file_id.queue == LogQueue::Append {
1349 let mut new_tombstones = Vec::new();
1350 self.memtables
1351 .replay_append_writes(item_batch.drain().filter(|item| {
1352 if Self::is_tombstone(item) {
1353 new_tombstones.push(item.clone());
1354 }
1355 true
1356 }));
1357 self.tombstone_items.append(&mut new_tombstones);
1358 } else {
1359 let mut new_tombstones = Vec::new();
1360 let mut is_group = None;
1361 let items = item_batch
1362 .drain()
1363 .filter(|item| {
1364 if let Some(g) = AtomicGroupStatus::parse(item) {
1365 if is_group.is_none() {
1366 is_group = Some(g);
1367 } else {
1368 let msg = format!("skipped an atomic group: {g:?}");
1369 error!("{msg}");
1370 debug_assert!(false, "{}", msg);
1371 }
1372 return false;
1373 }
1374 if Self::is_tombstone(item) {
1375 new_tombstones.push(item.clone());
1376 }
1377 true
1378 })
1379 .collect();
1380 if let Some((id, status)) = is_group {
1381 self.accept_new_group(
1382 file_id.queue,
1383 id,
1384 PendingAtomicGroup {
1385 status,
1386 items,
1387 tombstone_items: new_tombstones,
1388 start: file_id.seq,
1389 end: file_id.seq,
1390 },
1391 );
1392 } else {
1393 self.tombstone_items.append(&mut new_tombstones);
1394 self.memtables.replay_rewrite_writes(items.into_iter());
1395 }
1396 }
1397 Ok(())
1398 }
1399
1400 fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
1401 self.tombstone_items
1402 .append(&mut rhs.tombstone_items.clone());
1403 for (id, groups) in rhs.pending_atomic_groups.drain() {
1404 for group in groups {
1405 self.accept_new_group(queue, id, group);
1406 }
1407 }
1408 match queue {
1409 LogQueue::Append => self
1410 .memtables
1411 .replay_append_writes(rhs.tombstone_items.into_iter()),
1412 LogQueue::Rewrite => self
1413 .memtables
1414 .replay_rewrite_writes(rhs.tombstone_items.into_iter()),
1415 }
1416 self.memtables.merge_newer_neighbor(rhs.memtables);
1417 Ok(())
1418 }
1419}
1420
1421pub struct MemTableRecoverContextFactory {
1422 allocator: SelectedAllocator,
1423}
1424
1425impl MemTableRecoverContextFactory {
1426 pub fn new(cfg: &Config) -> Self {
1427 Self {
1428 allocator: new_allocator(cfg),
1429 }
1430 }
1431}
1432
1433impl Factory<MemTableRecoverContext<SelectedAllocator>> for MemTableRecoverContextFactory {
1434 fn new_target(&self) -> MemTableRecoverContext<SelectedAllocator> {
1435 MemTableRecoverContext::new_with_allocator(self.allocator.clone())
1436 }
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441 use super::*;
1442 use crate::test_util::{catch_unwind_silent, generate_entry_indexes};
1443
1444 impl<A: AllocatorTrait> MemTable<A> {
1445 fn max_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
1446 let entry = match queue {
1447 LogQueue::Append if self.rewrite_count == self.entry_indexes.len() => None,
1448 LogQueue::Append => self.entry_indexes.back(),
1449 LogQueue::Rewrite if self.rewrite_count == 0 => None,
1450 LogQueue::Rewrite => self.entry_indexes.get(self.rewrite_count - 1),
1451 };
1452 let ents_max = entry.map(|e| e.entries.unwrap().id.seq);
1453
1454 let kvs_max = self.kvs_max_file_seq(queue);
1455 match (ents_max, kvs_max) {
1456 (Some(ents_max), Some(kvs_max)) => Some(FileSeq::max(kvs_max, ents_max)),
1457 (Some(ents_max), None) => Some(ents_max),
1458 (None, Some(kvs_max)) => Some(kvs_max),
1459 (None, None) => None,
1460 }
1461 }
1462
1463 pub fn kvs_max_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
1464 self.kvs
1465 .values()
1466 .filter(|v| v.1.queue == queue)
1467 .fold(None, |max, v| {
1468 if let Some(max) = max {
1469 Some(std::cmp::max(max, v.1.seq))
1470 } else {
1471 Some(v.1.seq)
1472 }
1473 })
1474 }
1475
1476 pub fn fetch_all(&self, vec_idx: &mut Vec<EntryIndex>) {
1477 if let Some((first, last)) = self.span() {
1478 self.fetch_entries_to(first, last + 1, None, vec_idx)
1479 .unwrap();
1480 }
1481 }
1482
1483 fn entries_size(&self) -> usize {
1484 self.entry_indexes
1485 .iter()
1486 .fold(0, |acc, e| acc + e.entry_len) as usize
1487 }
1488 }
1489
1490 #[test]
1491 fn test_memtable_append() {
1492 let region_id = 8;
1493 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1494
1495 memtable.append(generate_entry_indexes(
1499 10,
1500 20,
1501 FileId::new(LogQueue::Append, 1),
1502 ));
1503 assert_eq!(memtable.entries_size(), 10);
1504 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1505 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1);
1506 memtable.consistency_check();
1507
1508 memtable.append(Vec::new());
1510
1511 assert!(
1513 catch_unwind_silent(|| memtable.append(generate_entry_indexes(
1514 21,
1515 22,
1516 FileId::dummy(LogQueue::Append)
1517 )))
1518 .is_err()
1519 );
1520 memtable.consistency_check();
1521
1522 memtable.append(generate_entry_indexes(
1527 20,
1528 30,
1529 FileId::new(LogQueue::Append, 2),
1530 ));
1531 assert_eq!(memtable.entries_size(), 20);
1532 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1533 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 2);
1534 memtable.consistency_check();
1535 assert_eq!(
1536 memtable.global_stats.live_entries(LogQueue::Append),
1537 memtable.entries_size()
1538 );
1539
1540 memtable.append(generate_entry_indexes(
1547 25,
1548 35,
1549 FileId::new(LogQueue::Append, 3),
1550 ));
1551 assert_eq!(memtable.entries_size(), 25);
1552 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1553 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1554 memtable.consistency_check();
1555 assert_eq!(
1556 memtable.global_stats.live_entries(LogQueue::Append),
1557 memtable.entries_size()
1558 );
1559
1560 memtable.append(generate_entry_indexes(
1565 10,
1566 40,
1567 FileId::new(LogQueue::Append, 4),
1568 ));
1569 assert_eq!(memtable.entries_size(), 30);
1570 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 4);
1571 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
1572 memtable.consistency_check();
1573 assert_eq!(
1574 memtable.global_stats.live_entries(LogQueue::Append),
1575 memtable.entries_size()
1576 );
1577
1578 let global_stats = Arc::clone(&memtable.global_stats);
1579 drop(memtable);
1580 assert_eq!(global_stats.live_entries(LogQueue::Append), 0);
1581 }
1582
1583 #[test]
1584 fn test_memtable_compact() {
1585 let region_id = 8;
1586 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1587
1588 memtable.append(generate_entry_indexes(
1593 0,
1594 10,
1595 FileId::new(LogQueue::Append, 1),
1596 ));
1597 memtable.append(generate_entry_indexes(
1598 10,
1599 15,
1600 FileId::new(LogQueue::Append, 2),
1601 ));
1602 memtable.append(generate_entry_indexes(
1603 15,
1604 20,
1605 FileId::new(LogQueue::Append, 2),
1606 ));
1607 memtable.append(generate_entry_indexes(
1608 20,
1609 25,
1610 FileId::new(LogQueue::Append, 3),
1611 ));
1612
1613 assert_eq!(memtable.entries_size(), 25);
1614 assert_eq!(memtable.first_index().unwrap(), 0);
1615 assert_eq!(memtable.last_index().unwrap(), 24);
1616 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1617 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1618 assert_eq!(
1619 memtable.global_stats.live_entries(LogQueue::Append),
1620 memtable.entries_size()
1621 );
1622 memtable.consistency_check();
1623
1624 assert_eq!(memtable.compact_to(5), 5);
1627 assert_eq!(memtable.entries_size(), 20);
1628 assert_eq!(memtable.first_index().unwrap(), 5);
1629 assert_eq!(memtable.last_index().unwrap(), 24);
1630 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1631 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1632 assert_eq!(
1633 memtable.global_stats.live_entries(LogQueue::Append),
1634 memtable.entries_size()
1635 );
1636 assert!(
1638 catch_unwind_silent(|| memtable.append(generate_entry_indexes(
1639 4,
1640 5,
1641 FileId::dummy(LogQueue::Append)
1642 )))
1643 .is_err()
1644 );
1645 memtable.consistency_check();
1646
1647 assert_eq!(memtable.compact_to(20), 15);
1649 assert_eq!(memtable.entries_size(), 5);
1650 assert_eq!(memtable.first_index().unwrap(), 20);
1651 assert_eq!(memtable.last_index().unwrap(), 24);
1652 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 3);
1653 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1654 assert_eq!(
1655 memtable.global_stats.live_entries(LogQueue::Append),
1656 memtable.entries_size()
1657 );
1658 memtable.consistency_check();
1659
1660 assert_eq!(memtable.compact_to(20), 0);
1662 assert_eq!(memtable.compact_to(15), 0);
1663 assert_eq!(memtable.entries_size(), 5);
1664 assert_eq!(memtable.first_index().unwrap(), 20);
1665 assert_eq!(memtable.last_index().unwrap(), 24);
1666 memtable.consistency_check();
1667 }
1668
1669 #[test]
1670 fn test_memtable_fetch() {
1671 let region_id = 8;
1672 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1673
1674 let mut ents_idx = vec![];
1675
1676 memtable.fetch_all(&mut ents_idx);
1678 assert!(ents_idx.is_empty());
1679 memtable
1680 .fetch_entries_to(0, 0, None, &mut ents_idx)
1681 .unwrap();
1682 assert!(matches!(
1683 memtable
1684 .fetch_entries_to(0, 1, None, &mut ents_idx)
1685 .unwrap_err(),
1686 Error::EntryNotFound
1687 ));
1688
1689 memtable.append(generate_entry_indexes(
1695 0,
1696 10,
1697 FileId::new(LogQueue::Append, 1),
1698 ));
1699 memtable.append(generate_entry_indexes(
1700 10,
1701 20,
1702 FileId::new(LogQueue::Append, 2),
1703 ));
1704 memtable.append(generate_entry_indexes(
1705 20,
1706 25,
1707 FileId::new(LogQueue::Append, 3),
1708 ));
1709
1710 memtable.fetch_all(&mut ents_idx);
1712 assert_eq!(ents_idx.len(), 25);
1713 assert_eq!(ents_idx[0].index, 0);
1714 assert_eq!(ents_idx[24].index, 24);
1715
1716 assert_eq!(memtable.compact_to(10), 10);
1721
1722 ents_idx.clear();
1724 assert!(matches!(
1725 memtable
1726 .fetch_entries_to(5, 15, None, &mut ents_idx)
1727 .unwrap_err(),
1728 Error::EntryCompacted
1729 ));
1730
1731 ents_idx.clear();
1733 assert!(matches!(
1734 memtable
1735 .fetch_entries_to(20, 30, None, &mut ents_idx)
1736 .unwrap_err(),
1737 Error::EntryNotFound
1738 ));
1739
1740 ents_idx.clear();
1741 memtable
1742 .fetch_entries_to(20, 25, None, &mut ents_idx)
1743 .unwrap();
1744 assert_eq!(ents_idx.len(), 5);
1745 assert_eq!(ents_idx[0].index, 20);
1746 assert_eq!(ents_idx[4].index, 24);
1747
1748 ents_idx.clear();
1749 memtable
1750 .fetch_entries_to(10, 15, None, &mut ents_idx)
1751 .unwrap();
1752 assert_eq!(ents_idx.len(), 5);
1753 assert_eq!(ents_idx[0].index, 10);
1754 assert_eq!(ents_idx[4].index, 14);
1755
1756 ents_idx.clear();
1757 memtable
1758 .fetch_entries_to(10, 25, None, &mut ents_idx)
1759 .unwrap();
1760 assert_eq!(ents_idx.len(), 15);
1761 assert_eq!(ents_idx[0].index, 10);
1762 assert_eq!(ents_idx[14].index, 24);
1763
1764 ents_idx.clear();
1767 let max_size = Some(10);
1768 memtable
1769 .fetch_entries_to(10, 25, max_size, &mut ents_idx)
1770 .unwrap();
1771 assert_eq!(ents_idx.len(), 10);
1772 assert_eq!(ents_idx[0].index, 10);
1773 assert_eq!(ents_idx[9].index, 19);
1774
1775 ents_idx.clear();
1777 memtable
1778 .fetch_entries_to(20, 25, Some(0), &mut ents_idx)
1779 .unwrap();
1780 assert_eq!(ents_idx.len(), 1);
1781 assert_eq!(ents_idx[0].index, 20);
1782 }
1783
1784 #[test]
1785 fn test_memtable_fetch_rewrite() {
1786 let region_id = 8;
1787 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1788 let (k1, v1) = (b"key1", b"value1");
1789 let (k2, v2) = (b"key2", b"value2");
1790 let (k3, v3) = (b"key3", b"value3");
1791
1792 memtable.append(generate_entry_indexes(
1797 0,
1798 10,
1799 FileId::new(LogQueue::Append, 1),
1800 ));
1801 memtable.put(k1.to_vec(), v1.to_vec(), FileId::new(LogQueue::Append, 1));
1802 memtable.append(generate_entry_indexes(
1803 10,
1804 20,
1805 FileId::new(LogQueue::Append, 2),
1806 ));
1807 memtable.put(k2.to_vec(), v2.to_vec(), FileId::new(LogQueue::Append, 2));
1808 memtable.append(generate_entry_indexes(
1809 20,
1810 25,
1811 FileId::new(LogQueue::Append, 3),
1812 ));
1813 memtable.put(k3.to_vec(), v3.to_vec(), FileId::new(LogQueue::Append, 3));
1814 memtable.consistency_check();
1815
1816 memtable.rewrite_key(k1.to_vec(), Some(1), 50);
1818 let mut kvs = Vec::new();
1819 memtable.fetch_kvs_before(1, &mut kvs);
1820 assert!(kvs.is_empty());
1821 memtable.fetch_rewritten_kvs(&mut kvs);
1822 assert_eq!(kvs.len(), 1);
1823 assert_eq!(kvs.pop().unwrap(), (k1.to_vec(), v1.to_vec()));
1824 memtable.delete(k1.as_ref());
1826 assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1);
1827 memtable.rewrite_key(k1.to_vec(), Some(1), 50);
1828 assert_eq!(memtable.get(k1.as_ref()), None);
1829 memtable.fetch_rewritten_kvs(&mut kvs);
1830 assert!(kvs.is_empty());
1831 assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 2);
1832 memtable.rewrite_key(k2.to_vec(), Some(1), 50);
1834 memtable.fetch_rewritten_kvs(&mut kvs);
1835 assert!(kvs.is_empty());
1836 memtable.rewrite_key(k3.to_vec(), None, 50); memtable.fetch_rewritten_kvs(&mut kvs);
1838 assert!(kvs.is_empty());
1839 assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 4);
1840 memtable.rewrite_key(k3.to_vec(), Some(10), 50);
1842 memtable.rewrite_key(k3.to_vec(), None, 51);
1843 memtable.rewrite_key(k3.to_vec(), Some(11), 52);
1844 memtable.fetch_rewritten_kvs(&mut kvs);
1845 assert_eq!(kvs.len(), 1);
1846 assert_eq!(kvs.pop().unwrap(), (k3.to_vec(), v3.to_vec()));
1847
1848 let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1));
1853 memtable.rewrite(ents_idx, Some(1));
1854 assert_eq!(memtable.entries_size(), 25);
1855 memtable.consistency_check();
1856
1857 let mut ents_idx = vec![];
1858 assert!(memtable
1859 .fetch_entry_indexes_before(2, &mut ents_idx)
1860 .is_ok());
1861 assert_eq!(ents_idx.len(), 10);
1862 assert_eq!(ents_idx.last().unwrap().index, 19);
1863 ents_idx.clear();
1864 assert!(memtable
1865 .fetch_entry_indexes_before(1, &mut ents_idx)
1866 .is_ok());
1867 assert!(ents_idx.is_empty());
1868
1869 ents_idx.clear();
1870 assert!(memtable
1871 .fetch_rewritten_entry_indexes(&mut ents_idx)
1872 .is_ok());
1873 assert_eq!(ents_idx.len(), 10);
1874 assert_eq!(ents_idx.first().unwrap().index, 0);
1875 assert_eq!(ents_idx.last().unwrap().index, 9);
1876 }
1877
1878 #[test]
1879 fn test_memtable_kv_operations() {
1880 fn key(i: u64) -> Vec<u8> {
1881 format!("k{i}").as_bytes().to_vec()
1882 }
1883 fn value(i: u64) -> Vec<u8> {
1884 format!("v{i}").as_bytes().to_vec()
1885 }
1886
1887 let region_id = 8;
1888 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1889
1890 memtable.put(key(1), value(1), FileId::new(LogQueue::Append, 1));
1891 memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 5));
1892 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1893 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5);
1894 assert_eq!(memtable.get(&key(1)), Some(value(1)));
1895 assert_eq!(memtable.get(&key(5)), Some(value(5)));
1896
1897 let mut res = Vec::new();
1898 memtable
1899 .scan(None, None, false, |k, v| {
1900 res.push((k.to_vec(), v.to_vec()));
1901 false
1902 })
1903 .unwrap();
1904 assert_eq!(res, vec![(key(1), value(1))]);
1905 res.clear();
1906 memtable
1907 .scan(None, None, true, |k, v| {
1908 res.push((k.to_vec(), v.to_vec()));
1909 false
1910 })
1911 .unwrap();
1912 assert_eq!(res, vec![(key(5), value(5))]);
1913 res.clear();
1914 memtable
1915 .scan(Some(&key(5)), None, false, |key, value| {
1916 res.push((key.to_vec(), value.to_vec()));
1917 true
1918 })
1919 .unwrap();
1920 assert_eq!(res, vec![(key(5), value(5))]);
1921 res.clear();
1922 memtable
1923 .scan(Some(&key(1)), Some(&key(5)), false, |key, value| {
1924 res.push((key.to_vec(), value.to_vec()));
1925 true
1926 })
1927 .unwrap();
1928 assert_eq!(res, vec![(key(1), value(1))]);
1929
1930 memtable.delete(&key(5));
1931 assert_eq!(memtable.get(&key(5)), None);
1932 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1933 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1);
1934
1935 memtable.put(key(1), value(1), FileId::new(LogQueue::Rewrite, 2));
1936 memtable.put(key(5), value(5), FileId::new(LogQueue::Rewrite, 3));
1937 assert_eq!(memtable.min_file_seq(LogQueue::Append), None);
1938 assert_eq!(memtable.max_file_seq(LogQueue::Append), None);
1939 assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 2);
1940 assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3);
1941 assert_eq!(memtable.global_stats.rewrite_entries(), 2);
1942
1943 memtable.delete(&key(1));
1944 assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 3);
1945 assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3);
1946 assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1);
1947
1948 memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 7));
1949 assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None);
1950 assert_eq!(memtable.max_file_seq(LogQueue::Rewrite), None);
1951 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 7);
1952 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 7);
1953 assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 2);
1954 }
1955
1956 #[test]
1957 fn test_memtable_get_entry() {
1958 let region_id = 8;
1959 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1960
1961 assert_eq!(memtable.get_entry(0), None);
1962
1963 memtable.append(generate_entry_indexes(
1966 5,
1967 10,
1968 FileId::new(LogQueue::Append, 1),
1969 ));
1970 memtable.append(generate_entry_indexes(
1971 10,
1972 20,
1973 FileId::new(LogQueue::Append, 2),
1974 ));
1975
1976 assert_eq!(memtable.get_entry(2), None);
1978 assert_eq!(memtable.get_entry(25), None);
1979
1980 let entry_idx = memtable.get_entry(5);
1981 assert_eq!(entry_idx.unwrap().index, 5);
1982 }
1983
1984 #[test]
1985 fn test_memtable_rewrite() {
1986 let region_id = 8;
1987 let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1988 let mut expected_append = 0;
1989 let mut expected_rewrite = 0;
1990 let mut expected_deleted_rewrite = 0;
1991
1992 let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1));
1994 memtable.rewrite(ents_idx, Some(1));
1995 expected_rewrite += 10;
1996 expected_deleted_rewrite += 10;
1997 assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None);
1998 assert_eq!(
1999 memtable.global_stats.live_entries(LogQueue::Append),
2000 expected_append
2001 );
2002 assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2003 assert_eq!(
2004 memtable.global_stats.deleted_rewrite_entries(),
2005 expected_deleted_rewrite
2006 );
2007
2008 memtable.append(generate_entry_indexes(
2014 0,
2015 10,
2016 FileId::new(LogQueue::Append, 1),
2017 ));
2018 memtable.append(generate_entry_indexes(
2019 10,
2020 20,
2021 FileId::new(LogQueue::Append, 2),
2022 ));
2023 memtable.put(
2024 b"kk1".to_vec(),
2025 b"vv1".to_vec(),
2026 FileId::new(LogQueue::Append, 2),
2027 );
2028 memtable.append(generate_entry_indexes(
2029 20,
2030 30,
2031 FileId::new(LogQueue::Append, 3),
2032 ));
2033 memtable.put(
2034 b"kk2".to_vec(),
2035 b"vv2".to_vec(),
2036 FileId::new(LogQueue::Append, 3),
2037 );
2038 memtable.append(generate_entry_indexes(
2039 30,
2040 40,
2041 FileId::new(LogQueue::Append, 4),
2042 ));
2043 memtable.put(
2044 b"kk3".to_vec(),
2045 b"vv3".to_vec(),
2046 FileId::new(LogQueue::Append, 4),
2047 );
2048 expected_append += 4 * 10 + 3;
2049 memtable.compact_to(10);
2050 expected_append -= 10;
2051 assert_eq!(memtable.entries_size(), 30);
2052 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 2);
2053 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
2054 assert_eq!(
2055 memtable.global_stats.live_entries(LogQueue::Append),
2056 expected_append
2057 );
2058 memtable.consistency_check();
2059
2060 let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 50));
2066 memtable.rewrite(ents_idx, Some(1));
2067 memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
2068 expected_rewrite += 10 + 1;
2069 expected_deleted_rewrite += 10 + 1;
2070 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 2);
2071 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
2072 assert!(memtable.min_file_seq(LogQueue::Rewrite).is_none());
2073 assert!(memtable.max_file_seq(LogQueue::Rewrite).is_none());
2074 assert_eq!(memtable.rewrite_count, 0);
2075 assert_eq!(memtable.get(b"kk0"), None);
2076 assert_eq!(
2077 memtable.global_stats.live_entries(LogQueue::Append),
2078 expected_append
2079 );
2080 assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2081 assert_eq!(
2082 memtable.global_stats.deleted_rewrite_entries(),
2083 expected_deleted_rewrite
2084 );
2085 memtable.consistency_check();
2086
2087 let ents_idx = generate_entry_indexes(0, 20, FileId::new(LogQueue::Rewrite, 100));
2093 memtable.rewrite(ents_idx, Some(2));
2094 memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
2095 memtable.rewrite_key(b"kk1".to_vec(), Some(2), 100);
2096 expected_append -= 10 + 1;
2097 expected_rewrite += 20 + 2;
2098 expected_deleted_rewrite += 10 + 1;
2099 let ents_idx = generate_entry_indexes(20, 30, FileId::new(LogQueue::Rewrite, 101));
2100 memtable.rewrite(ents_idx, Some(3));
2101 memtable.rewrite_key(b"kk2".to_vec(), Some(3), 101);
2102 expected_append -= 10 + 1;
2103 expected_rewrite += 10 + 1;
2104 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 4);
2105 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
2106 assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 100);
2107 assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 101);
2108 assert_eq!(memtable.rewrite_count, 20);
2109 assert_eq!(memtable.get(b"kk1"), Some(b"vv1".to_vec()));
2110 assert_eq!(
2111 memtable.global_stats.live_entries(LogQueue::Append),
2112 expected_append
2113 );
2114 assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2115 assert_eq!(
2116 memtable.global_stats.deleted_rewrite_entries(),
2117 expected_deleted_rewrite
2118 );
2119 memtable.consistency_check();
2120
2121 memtable.append(generate_entry_indexes(
2128 35,
2129 36,
2130 FileId::new(LogQueue::Append, 5),
2131 ));
2132 expected_append -= 4;
2133 memtable.put(
2134 b"kk3".to_vec(),
2135 b"vv33".to_vec(),
2136 FileId::new(LogQueue::Append, 5),
2137 );
2138 assert_eq!(memtable.last_index().unwrap(), 35);
2139 memtable.consistency_check();
2140 let ents_idx = generate_entry_indexes(30, 40, FileId::new(LogQueue::Rewrite, 102));
2141 memtable.rewrite(ents_idx, Some(4));
2142 expected_append -= 5;
2143 expected_rewrite += 10;
2144 expected_deleted_rewrite += 5;
2145 assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 5);
2146 assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5);
2147 assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 100);
2148 assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 102);
2149 assert_eq!(memtable.rewrite_count, 25);
2150 assert_eq!(memtable.get(b"kk3"), Some(b"vv33".to_vec()));
2151 assert_eq!(
2152 memtable.global_stats.live_entries(LogQueue::Append),
2153 expected_append
2154 );
2155 assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2156 assert_eq!(
2157 memtable.global_stats.deleted_rewrite_entries(),
2158 expected_deleted_rewrite
2159 );
2160 memtable.consistency_check();
2161
2162 memtable.append(generate_entry_indexes(
2167 35,
2168 50,
2169 FileId::new(LogQueue::Append, 6),
2170 ));
2171 expected_append += 15 - 1;
2172 memtable.compact_to(30);
2173 expected_deleted_rewrite += 20;
2174 assert_eq!(memtable.last_index().unwrap(), 49);
2175 assert_eq!(memtable.rewrite_count, 5);
2176 assert_eq!(
2177 memtable.global_stats.live_entries(LogQueue::Append),
2178 expected_append
2179 );
2180 assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2181 assert_eq!(
2182 memtable.global_stats.deleted_rewrite_entries(),
2183 expected_deleted_rewrite
2184 );
2185 memtable.consistency_check();
2186
2187 let ents_idx = generate_entry_indexes(10, 60, FileId::new(LogQueue::Rewrite, 103));
2192 memtable.rewrite(ents_idx, None);
2193 expected_rewrite += 50;
2194 expected_deleted_rewrite += 50;
2195 assert_eq!(memtable.first_index().unwrap(), 30);
2196 assert_eq!(memtable.rewrite_count, 5);
2197 assert_eq!(
2198 memtable.global_stats.live_entries(LogQueue::Append),
2199 expected_append
2200 );
2201 assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2202 assert_eq!(
2203 memtable.global_stats.deleted_rewrite_entries(),
2204 expected_deleted_rewrite
2205 );
2206 memtable.consistency_check();
2207
2208 let global_stats = Arc::clone(&memtable.global_stats);
2209 drop(memtable);
2210 assert_eq!(global_stats.live_entries(LogQueue::Append), 0);
2211 assert_eq!(global_stats.live_entries(LogQueue::Rewrite), 0);
2212 }
2213
2214 #[test]
2215 fn test_memtable_merge_append() {
2216 type TestMemTable = MemTable<VacantAllocator>;
2217 fn empty_table(id: u64) -> TestMemTable {
2218 MemTable::new(id, Arc::new(GlobalStats::default()))
2219 }
2220 let cases = [
2221 |mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
2222 match on {
2223 None => {
2224 memtable.append(generate_entry_indexes(
2225 0,
2226 10,
2227 FileId::new(LogQueue::Append, 1),
2228 ));
2229 memtable.append(generate_entry_indexes(
2230 7,
2231 15,
2232 FileId::new(LogQueue::Append, 2),
2233 ));
2234 memtable.rewrite(
2235 generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
2236 Some(1),
2237 );
2238 }
2239 Some(LogQueue::Append) => {
2240 memtable.append(generate_entry_indexes(
2241 0,
2242 10,
2243 FileId::new(LogQueue::Append, 1),
2244 ));
2245 memtable.append(generate_entry_indexes(
2246 7,
2247 15,
2248 FileId::new(LogQueue::Append, 2),
2249 ));
2250 memtable.compact_to(7);
2251 }
2252 Some(LogQueue::Rewrite) => {
2253 memtable.replay_rewrite(generate_entry_indexes(
2254 0,
2255 7,
2256 FileId::new(LogQueue::Rewrite, 1),
2257 ));
2258 memtable.replay_rewrite(Vec::new());
2259 }
2260 }
2261 memtable
2262 },
2263 |mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
2264 match on {
2265 None => {
2266 memtable.append(generate_entry_indexes(
2267 0,
2268 10,
2269 FileId::new(LogQueue::Append, 1),
2270 ));
2271 memtable.append(generate_entry_indexes(
2272 7,
2273 15,
2274 FileId::new(LogQueue::Append, 2),
2275 ));
2276 memtable.rewrite(
2277 generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
2278 Some(1),
2279 );
2280 memtable.compact_to(10);
2281 }
2282 Some(LogQueue::Append) => {
2283 memtable.append(generate_entry_indexes(
2284 0,
2285 10,
2286 FileId::new(LogQueue::Append, 1),
2287 ));
2288 memtable.append(generate_entry_indexes(
2289 7,
2290 15,
2291 FileId::new(LogQueue::Append, 2),
2292 ));
2293 memtable.compact_to(10);
2294 }
2295 Some(LogQueue::Rewrite) => {
2296 memtable.replay_rewrite(generate_entry_indexes(
2297 0,
2298 7,
2299 FileId::new(LogQueue::Rewrite, 1),
2300 ));
2301 memtable.compact_to(10);
2303 }
2304 }
2305 memtable
2306 },
2307 |mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
2308 match on {
2309 None => {
2310 memtable.append(generate_entry_indexes(
2311 0,
2312 10,
2313 FileId::new(LogQueue::Append, 1),
2314 ));
2315 memtable.rewrite(
2316 generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
2317 Some(1),
2318 );
2319 memtable.append(generate_entry_indexes(
2320 10,
2321 15,
2322 FileId::new(LogQueue::Append, 2),
2323 ));
2324 memtable.append(generate_entry_indexes(
2325 5,
2326 10,
2327 FileId::new(LogQueue::Append, 2),
2328 ));
2329 }
2330 Some(LogQueue::Append) => {
2331 let mut m1 = empty_table(memtable.region_id);
2332 m1.append(generate_entry_indexes(
2333 10,
2334 15,
2335 FileId::new(LogQueue::Append, 2),
2336 ));
2337 let mut m2 = empty_table(memtable.region_id);
2338 m2.append(generate_entry_indexes(
2339 5,
2340 10,
2341 FileId::new(LogQueue::Append, 2),
2342 ));
2343 m1.merge_newer_neighbor(&mut m2);
2344 memtable.merge_newer_neighbor(&mut m1);
2345 }
2346 Some(LogQueue::Rewrite) => {
2347 memtable.replay_rewrite(generate_entry_indexes(
2348 0,
2349 10,
2350 FileId::new(LogQueue::Rewrite, 1),
2351 ));
2352 }
2353 }
2354 memtable
2355 },
2356 ];
2357
2358 for (i, case) in cases.iter().enumerate() {
2360 let region_id = i as u64;
2361 let mut append = empty_table(region_id);
2362 let mut rewrite = case(empty_table(region_id), Some(LogQueue::Rewrite));
2363 rewrite.merge_append_table(&mut append);
2364 assert_eq!(
2365 rewrite.entry_indexes,
2366 case(empty_table(region_id), Some(LogQueue::Rewrite)).entry_indexes,
2367 );
2368 assert!(append.entry_indexes.is_empty());
2369
2370 let mut append = case(empty_table(region_id), Some(LogQueue::Append));
2371 let mut rewrite = empty_table(region_id);
2372 rewrite.merge_append_table(&mut append);
2373 assert_eq!(
2374 rewrite.entry_indexes,
2375 case(empty_table(region_id), Some(LogQueue::Append)).entry_indexes
2376 );
2377 assert!(append.entry_indexes.is_empty());
2378 }
2379
2380 for (i, case) in cases.iter().enumerate() {
2381 let region_id = i as u64;
2382 let mut append = case(empty_table(region_id), Some(LogQueue::Append));
2383 let mut rewrite = case(empty_table(region_id), Some(LogQueue::Rewrite));
2384 rewrite.merge_append_table(&mut append);
2385 let expected = case(empty_table(region_id), None);
2386 assert_eq!(
2387 rewrite.global_stats.live_entries(LogQueue::Append),
2388 expected.global_stats.live_entries(LogQueue::Append)
2389 );
2390 assert_eq!(
2391 rewrite.global_stats.live_entries(LogQueue::Rewrite),
2392 expected.global_stats.live_entries(LogQueue::Rewrite)
2393 );
2394 assert_eq!(rewrite.entry_indexes, expected.entry_indexes);
2395 assert!(append.entry_indexes.is_empty());
2396 }
2397 }
2398
2399 #[test]
2400 fn test_memtables_merge_append_neighbor() {
2401 let first_rid = 17;
2402 let mut last_rid = first_rid;
2403
2404 let mut batches = vec![
2405 LogItemBatch::with_capacity(0),
2406 LogItemBatch::with_capacity(0),
2407 LogItemBatch::with_capacity(0),
2408 ];
2409 let files: Vec<_> = (0..batches.len())
2410 .map(|i| FileId::new(LogQueue::Append, 10 + i as u64))
2411 .collect();
2412
2413 batches[0].put(last_rid, b"key1".to_vec(), b"val1".to_vec());
2415 batches[1].delete(last_rid, b"key1".to_vec());
2416 batches[2].put(last_rid, b"key1".to_vec(), b"val2".to_vec());
2417
2418 last_rid += 1;
2420 batches[0].put(last_rid, b"key".to_vec(), b"ANYTHING".to_vec());
2421 batches[1].add_command(last_rid, Command::Clean);
2422
2423 last_rid += 1;
2425 batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
2426 batches[1].add_command(last_rid, Command::Compact { index: 5 });
2427 batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));
2428
2429 last_rid += 1;
2431 batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
2432 batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
2433 batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
2434 batches[2].add_command(last_rid, Command::Compact { index: 8 });
2435
2436 for b in batches.iter_mut() {
2437 b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
2438 }
2439
2440 let mut ctxs = VecDeque::default();
2442 for (batch, file_id) in batches.clone().into_iter().zip(files) {
2443 let mut ctx = MemTableRecoverContext::default();
2444 ctx.replay(batch, file_id).unwrap();
2445 ctxs.push_back(ctx);
2446 }
2447 while ctxs.len() > 1 {
2448 let (y, mut x) = (ctxs.pop_back().unwrap(), ctxs.pop_back().unwrap());
2449 x.merge(y, LogQueue::Append).unwrap();
2450 ctxs.push_back(x);
2451 }
2452 let (merged_memtables, merged_global_stats) = ctxs.pop_front().unwrap().finish();
2453
2454 let sequential_global_stats = Arc::new(GlobalStats::default());
2456 let sequential_memtables = MemTableAccessor::new(sequential_global_stats.clone());
2457 for mut batch in batches.clone() {
2458 sequential_memtables.apply_append_writes(batch.drain());
2459 }
2460
2461 for rid in first_rid..=last_rid {
2462 let m = merged_memtables.get(rid);
2463 let s = sequential_memtables.get(rid);
2464 if m.is_none() {
2465 assert!(s.is_none());
2466 continue;
2467 }
2468 let merged = m.as_ref().unwrap().read();
2469 let sequential = s.as_ref().unwrap().read();
2470 let mut merged_vec = Vec::new();
2471 let mut sequential_vec = Vec::new();
2472 merged
2473 .fetch_entry_indexes_before(u64::MAX, &mut merged_vec)
2474 .unwrap();
2475 sequential
2476 .fetch_entry_indexes_before(u64::MAX, &mut sequential_vec)
2477 .unwrap();
2478 assert_eq!(merged_vec, sequential_vec);
2479 merged_vec.clear();
2480 sequential_vec.clear();
2481 merged
2482 .fetch_rewritten_entry_indexes(&mut merged_vec)
2483 .unwrap();
2484 sequential
2485 .fetch_rewritten_entry_indexes(&mut sequential_vec)
2486 .unwrap();
2487 assert_eq!(merged_vec, sequential_vec);
2488 let mut merged_vec = Vec::new();
2489 let mut sequential_vec = Vec::new();
2490 merged.fetch_kvs_before(u64::MAX, &mut merged_vec);
2491 sequential.fetch_kvs_before(u64::MAX, &mut sequential_vec);
2492 assert_eq!(merged_vec, sequential_vec);
2493 merged_vec.clear();
2494 sequential_vec.clear();
2495 merged.fetch_rewritten_kvs(&mut merged_vec);
2496 sequential.fetch_rewritten_kvs(&mut sequential_vec);
2497 assert_eq!(merged_vec, sequential_vec);
2498 }
2499 assert_eq!(
2500 merged_global_stats.live_entries(LogQueue::Append),
2501 sequential_global_stats.live_entries(LogQueue::Append),
2502 );
2503 assert_eq!(
2504 merged_global_stats.rewrite_entries(),
2505 sequential_global_stats.rewrite_entries(),
2506 );
2507 assert_eq!(
2508 merged_global_stats.deleted_rewrite_entries(),
2509 sequential_global_stats.deleted_rewrite_entries(),
2510 );
2511 }
2512
2513 #[cfg(feature = "nightly")]
2514 #[bench]
2515 fn bench_memtable_single_put(b: &mut test::Bencher) {
2516 let mut memtable = MemTable::new(0, Arc::new(GlobalStats::default()));
2517 let key = b"some_key".to_vec();
2518 let value = vec![7; 12];
2519 b.iter(move || {
2520 memtable.put(key.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2521 });
2522 }
2523
2524 #[cfg(feature = "nightly")]
2525 #[bench]
2526 fn bench_memtable_triple_puts(b: &mut test::Bencher) {
2527 let mut memtable = MemTable::new(0, Arc::new(GlobalStats::default()));
2528 let key0 = b"some_key0".to_vec();
2529 let key1 = b"some_key1".to_vec();
2530 let key2 = b"some_key2".to_vec();
2531 let value = vec![7; 12];
2532 b.iter(move || {
2533 memtable.put(key0.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2534 memtable.put(key1.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2535 memtable.put(key2.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2536 });
2537 }
2538}