raft_engine/
memtable.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use std::borrow::BorrowMut;
4use std::collections::{BTreeMap, HashSet, VecDeque};
5use std::marker::PhantomData;
6use std::ops::Bound;
7use std::sync::Arc;
8
9use fail::fail_point;
10use hashbrown::HashMap;
11use log::{error, warn};
12use parking_lot::{Mutex, RwLock};
13
14use crate::config::Config;
15use crate::file_pipe_log::ReplayMachine;
16use crate::log_batch::{
17    AtomicGroupStatus, Command, CompressionType, KeyValue, LogBatch, LogItem, LogItemBatch,
18    LogItemContent, OpType,
19};
20use crate::metrics::MEMORY_USAGE;
21use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue};
22use crate::util::{hash_u64, Factory};
23use crate::{Error, GlobalStats, Result};
24
25#[cfg(feature = "swap")]
26mod swap_conditional_imports {
27    use crate::swappy_allocator::SwappyAllocator;
28    use std::convert::TryFrom;
29    use std::path::Path;
30
31    pub trait AllocatorTrait: std::alloc::Allocator + Clone + Send + Sync {}
32    impl<T: std::alloc::Allocator + Clone + Send + Sync> AllocatorTrait for T {}
33
34    pub type VacantAllocator = std::alloc::Global;
35    pub type SelectedAllocator = SwappyAllocator<std::alloc::Global>;
36
37    pub fn new_vacant_allocator() -> VacantAllocator {
38        std::alloc::Global
39    }
40    pub fn new_allocator(cfg: &crate::Config) -> SelectedAllocator {
41        let memory_limit =
42            usize::try_from(cfg.memory_limit.map_or(u64::MAX, |l| l.0)).unwrap_or(usize::MAX);
43        let path = Path::new(&cfg.dir).join("swap");
44        SwappyAllocator::new(&path, memory_limit)
45    }
46}
47
48#[cfg(not(feature = "swap"))]
49mod swap_conditional_imports {
50    pub trait AllocatorTrait: Clone + Send + Sync {}
51
52    #[derive(Clone)]
53    pub struct DummyAllocator;
54    impl AllocatorTrait for DummyAllocator {}
55
56    pub type VacantAllocator = DummyAllocator;
57    pub type SelectedAllocator = DummyAllocator;
58
59    pub fn new_vacant_allocator() -> VacantAllocator {
60        DummyAllocator
61    }
62    pub fn new_allocator(_: &crate::Config) -> SelectedAllocator {
63        DummyAllocator
64    }
65}
66
67use swap_conditional_imports::*;
68
69/// Attempt to shrink entry container if its capacity reaches the threshold.
70const CAPACITY_SHRINK_THRESHOLD: usize = 1024 - 1;
71const CAPACITY_INIT: usize = 32 - 1;
72/// Number of hash table to store [`MemTable`].
73const MEMTABLE_SLOT_COUNT: usize = 128;
74
75/// Location of a log entry.
76#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub struct EntryIndex {
78    /// Logical index.
79    pub index: u64,
80
81    /// File location of the group of entries that this entry belongs to.
82    pub entries: Option<FileBlockHandle>,
83    // How its group of entries is compacted.
84    pub compression_type: CompressionType,
85
86    /// The relative offset within its group of entries.
87    pub entry_offset: u32,
88    /// The encoded length within its group of entries.
89    pub entry_len: u32,
90}
91
92impl Default for EntryIndex {
93    fn default() -> EntryIndex {
94        EntryIndex {
95            index: 0,
96            entries: None,
97            compression_type: CompressionType::None,
98            entry_offset: 0,
99            entry_len: 0,
100        }
101    }
102}
103
104impl EntryIndex {
105    fn from_thin(index: u64, e: ThinEntryIndex) -> Self {
106        Self {
107            index,
108            entries: e.entries,
109            compression_type: e.compression_type,
110            entry_offset: e.entry_offset,
111            entry_len: e.entry_len,
112        }
113    }
114}
115
116#[derive(Debug, Copy, Clone, PartialEq, Eq)]
117struct ThinEntryIndex {
118    entries: Option<FileBlockHandle>,
119    compression_type: CompressionType,
120    entry_offset: u32,
121    entry_len: u32,
122}
123
124impl From<&EntryIndex> for ThinEntryIndex {
125    fn from(e: &EntryIndex) -> Self {
126        Self {
127            entries: e.entries,
128            compression_type: e.compression_type,
129            entry_offset: e.entry_offset,
130            entry_len: e.entry_len,
131        }
132    }
133}
134
135/// In-memory storage for Raft Groups.
136///
137/// Each Raft Group has its own `MemTable` to store all key value pairs and the
138/// file locations of all log entries.
139pub struct MemTable<A: AllocatorTrait> {
140    /// The ID of current Raft Group.
141    region_id: u64,
142
143    /// Container of entries. Incoming entries are pushed to the back with
144    /// ascending log indexes.
145    #[cfg(feature = "swap")]
146    entry_indexes: VecDeque<ThinEntryIndex, A>,
147    #[cfg(not(feature = "swap"))]
148    entry_indexes: VecDeque<ThinEntryIndex>,
149    /// The log index of the first entry.
150    first_index: u64,
151    /// The amount of rewritten entries. Rewritten entries are the oldest
152    /// entries and stored at the front of the container.
153    rewrite_count: usize,
154
155    /// A map of active key value pairs.
156    kvs: BTreeMap<Vec<u8>, (Vec<u8>, FileId)>,
157
158    /// (start_seq, end_seq).
159    /// If there's an active entry stored before end_seq, it possibly belongs to
160    /// an atomic group. In order to not lose this entry, We cannot delete any
161    /// other entries in that group.
162    /// Only applies to Rewrite queue. Each Raft Group has at most one atomic
163    /// group at a time, because we only use atomic group for rewrite-rewrite
164    /// operation, a group always contains all the Rewrite entries in a Raft
165    /// Group.
166    atomic_group: Option<(FileSeq, FileSeq)>,
167
168    /// Shared statistics.
169    global_stats: Arc<GlobalStats>,
170
171    _phantom: PhantomData<A>,
172}
173
174impl MemTable<VacantAllocator> {
175    #[allow(dead_code)]
176    fn new(region_id: u64, global_stats: Arc<GlobalStats>) -> MemTable<VacantAllocator> {
177        Self::with_allocator(region_id, global_stats, &new_vacant_allocator())
178    }
179}
180
181impl<A: AllocatorTrait> MemTable<A> {
182    fn with_allocator(
183        region_id: u64,
184        global_stats: Arc<GlobalStats>,
185        _allocator: &A,
186    ) -> MemTable<A> {
187        MemTable {
188            region_id,
189            #[cfg(feature = "swap")]
190            entry_indexes: VecDeque::with_capacity_in(CAPACITY_INIT, _allocator.clone()),
191            #[cfg(not(feature = "swap"))]
192            entry_indexes: VecDeque::with_capacity(CAPACITY_INIT),
193            first_index: 0,
194            rewrite_count: 0,
195            kvs: BTreeMap::default(),
196            atomic_group: None,
197            global_stats,
198            _phantom: PhantomData,
199        }
200    }
201
202    /// Merges with a newer neighbor [`MemTable`].
203    ///
204    /// This method is only used for recovery.
205    pub fn merge_newer_neighbor(&mut self, rhs: &mut Self) {
206        debug_assert_eq!(self.region_id, rhs.region_id);
207        if let Some((rhs_first, _)) = rhs.span() {
208            self.prepare_append(
209                rhs_first,
210                // Rewrite -> Compact Append -> Rewrite.
211                // TODO: add test case.
212                rhs.rewrite_count > 0, /* allow_hole */
213                // Always true, because `self` might not have all entries in
214                // history.
215                true, /* allow_overwrite */
216            );
217            self.global_stats.add(
218                rhs.entry_indexes[0].entries.unwrap().id.queue,
219                rhs.entry_indexes.len(),
220            );
221            self.rewrite_count += rhs.rewrite_count;
222            self.entry_indexes.append(&mut rhs.entry_indexes);
223            rhs.rewrite_count = 0;
224        }
225
226        for (key, (value, file_id)) in rhs.kvs.iter() {
227            self.put(key.clone(), value.clone(), *file_id);
228        }
229
230        if let Some(g) = rhs.atomic_group.take() {
231            assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0));
232            self.atomic_group = Some(g);
233        }
234
235        let deleted = rhs.global_stats.deleted_rewrite_entries();
236        self.global_stats.add(LogQueue::Rewrite, deleted);
237        self.global_stats.delete(LogQueue::Rewrite, deleted);
238    }
239
240    /// Merges with a [`MemTable`] that contains only append data. Assumes
241    /// `self` contains all rewritten data of the same region.
242    ///
243    /// This method is only used for recovery.
244    pub fn merge_append_table(&mut self, rhs: &mut Self) {
245        debug_assert_eq!(self.region_id, rhs.region_id);
246        debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
247        debug_assert_eq!(rhs.rewrite_count, 0);
248
249        if let Some((first, _)) = rhs.span() {
250            self.prepare_append(
251                first,
252                // FIXME: It's possibly okay to set it to false. Any compact
253                // command applied to append queue will also be applied to
254                // rewrite queue.
255                true, /* allow_hole */
256                // Compact -> Rewrite -> Data loss of the compact command.
257                true, /* allow_overwrite */
258            );
259            self.global_stats.add(
260                rhs.entry_indexes[0].entries.unwrap().id.queue,
261                rhs.entry_indexes.len(),
262            );
263            self.entry_indexes.append(&mut rhs.entry_indexes);
264        }
265
266        for (key, (value, file_id)) in rhs.kvs.iter() {
267            self.put(key.clone(), value.clone(), *file_id);
268        }
269
270        assert!(rhs.atomic_group.is_none());
271
272        let deleted = rhs.global_stats.deleted_rewrite_entries();
273        self.global_stats.add(LogQueue::Rewrite, deleted);
274        self.global_stats.delete(LogQueue::Rewrite, deleted);
275    }
276
277    /// Returns value for a given key.
278    pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
279        self.kvs.get(key).map(|v| v.0.clone())
280    }
281
282    /// Iterates over [start_key, end_key) range and yields all key value pairs
283    /// as bytes.
284    pub fn scan<F>(
285        &self,
286        start_key: Option<&[u8]>,
287        end_key: Option<&[u8]>,
288        reverse: bool,
289        mut f: F,
290    ) -> Result<()>
291    where
292        F: FnMut(&[u8], &[u8]) -> bool,
293    {
294        let lower = start_key.map(Bound::Included).unwrap_or(Bound::Unbounded);
295        let upper = end_key.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
296        let iter = self.kvs.range::<[u8], _>((lower, upper));
297        if reverse {
298            for (key, (value, _)) in iter.rev() {
299                if !f(key, value) {
300                    break;
301                }
302            }
303        } else {
304            for (key, (value, _)) in iter {
305                if !f(key, value) {
306                    break;
307                }
308            }
309        }
310        Ok(())
311    }
312
313    /// Deletes a key value pair.
314    pub fn delete(&mut self, key: &[u8]) {
315        if let Some(value) = self.kvs.remove(key) {
316            self.global_stats.delete(value.1.queue, 1);
317        }
318    }
319
320    /// Puts a key value pair that has been written to the specified file. The
321    /// old value for this key will be deleted if exists.
322    pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>, file_id: FileId) {
323        if let Some(origin) = self.kvs.insert(key, (value, file_id)) {
324            self.global_stats.delete(origin.1.queue, 1);
325        }
326        self.global_stats.add(file_id.queue, 1);
327    }
328
329    /// Rewrites a key by marking its location to the `seq`-th log file in
330    /// rewrite queue. No-op if the key does not exist.
331    ///
332    /// When `gate` is present, only append data no newer than it will be
333    /// rewritten.
334    pub fn rewrite_key(&mut self, key: Vec<u8>, gate: Option<FileSeq>, seq: FileSeq) {
335        self.global_stats.add(LogQueue::Rewrite, 1);
336        if let Some(origin) = self.kvs.get_mut(&key) {
337            if origin.1.queue == LogQueue::Append {
338                if let Some(gate) = gate {
339                    if origin.1.seq <= gate {
340                        origin.1 = FileId {
341                            queue: LogQueue::Rewrite,
342                            seq,
343                        };
344                        self.global_stats.delete(LogQueue::Append, 1);
345                        return;
346                    }
347                }
348            } else {
349                assert!(origin.1.seq <= seq);
350                origin.1.seq = seq;
351            }
352        }
353        self.global_stats.delete(LogQueue::Rewrite, 1);
354    }
355
356    /// Returns the log entry location for a given logical log index.
357    pub fn get_entry(&self, index: u64) -> Option<EntryIndex> {
358        if let Some((first, last)) = self.span() {
359            if index < first || index > last {
360                return None;
361            }
362
363            let ioffset = (index - first) as usize;
364            let entry_index = self.entry_indexes[ioffset];
365            Some(EntryIndex::from_thin(index, entry_index))
366        } else {
367            None
368        }
369    }
370
371    /// Appends some log entries from append queue. Existing entries newer than
372    /// any of the incoming entries will be deleted silently. Assumes the
373    /// provided entries have consecutive logical indexes.
374    ///
375    /// # Panics
376    ///
377    /// Panics if index of the first entry in `entry_indexes` is greater than
378    /// largest existing index + 1 (hole).
379    ///
380    /// Panics if incoming entries contains indexes that might be compacted
381    /// before (overwrite history).
382    pub fn append(&mut self, entry_indexes: Vec<EntryIndex>) {
383        let len = entry_indexes.len();
384        if len > 0 {
385            self.prepare_append(
386                entry_indexes[0].index,
387                false, /* allow_hole */
388                false, /* allow_overwrite */
389            );
390            self.global_stats.add(LogQueue::Append, len);
391            for ei in &entry_indexes {
392                self.entry_indexes.push_back(ei.into());
393            }
394        }
395    }
396
397    /// Appends some entries from append queue. Assumes this table has no
398    /// rewrite data.
399    ///
400    /// This method is only used for recovery.
401    pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
402        let len = entry_indexes.len();
403        if len > 0 {
404            debug_assert_eq!(self.rewrite_count, 0);
405            self.prepare_append(
406                entry_indexes[0].index,
407                false, /* allow_hole */
408                // Refer to case in `merge_newer_neighbor`.
409                true, /* allow_overwrite */
410            );
411            self.global_stats.add(LogQueue::Append, len);
412            for ei in &entry_indexes {
413                debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
414                self.entry_indexes.push_back(ei.into());
415            }
416        }
417    }
418
419    /// Rewrites some entries by modifying their location.
420    ///
421    /// When `gate` is present, only append data no newer than it will be
422    /// rewritten.
423    ///
424    /// # Panics
425    ///
426    /// Panics if index of the first entry in `rewrite_indexes` is greater than
427    /// largest existing rewritten index + 1 (hole).
428    pub fn rewrite(&mut self, rewrite_indexes: Vec<EntryIndex>, gate: Option<FileSeq>) {
429        if rewrite_indexes.is_empty() {
430            return;
431        }
432        self.global_stats
433            .add(LogQueue::Rewrite, rewrite_indexes.len());
434
435        let len = self.entry_indexes.len();
436        if len == 0 {
437            self.global_stats
438                .delete(LogQueue::Rewrite, rewrite_indexes.len());
439            return;
440        }
441
442        let first = self.first_index;
443        let last = self.first_index + len as u64 - 1;
444        let rewrite_first = std::cmp::max(rewrite_indexes[0].index, first);
445        let rewrite_last = std::cmp::min(rewrite_indexes[rewrite_indexes.len() - 1].index, last);
446        let mut rewrite_len = (rewrite_last + 1).saturating_sub(rewrite_first) as usize;
447        if rewrite_len == 0 {
448            self.global_stats
449                .delete(LogQueue::Rewrite, rewrite_indexes.len());
450            return;
451        }
452
453        let pos = (rewrite_first - first) as usize;
454        // No normal log entry mixed in rewritten entries at the front.
455        assert!(
456            pos == 0 || self.entry_indexes[pos - 1].entries.unwrap().id.queue == LogQueue::Rewrite
457        );
458        let rewrite_pos = (rewrite_first - rewrite_indexes[0].index) as usize;
459
460        for (i, rindex) in rewrite_indexes[rewrite_pos..rewrite_pos + rewrite_len]
461            .iter()
462            .enumerate()
463        {
464            let index = &mut self.entry_indexes[i + pos];
465            if let Some(gate) = gate {
466                debug_assert_eq!(index.entries.unwrap().id.queue, LogQueue::Append);
467                if index.entries.unwrap().id.seq > gate {
468                    // Some entries are overwritten by new appends.
469                    rewrite_len = i;
470                    break;
471                }
472            } else if index.entries.unwrap().id.queue == LogQueue::Append {
473                // Squeeze operation encounters a new append.
474                rewrite_len = i;
475                break;
476            }
477
478            *index = rindex.into();
479        }
480
481        if gate.is_none() {
482            // We either replaced some old rewrite entries, or some incoming entries are
483            // discarded.
484            self.global_stats
485                .delete(LogQueue::Rewrite, rewrite_indexes.len());
486            // rewrite-rewrite could partially renew rewrite entries due to batch splitting.
487            self.rewrite_count = std::cmp::max(self.rewrite_count, pos + rewrite_len);
488        } else {
489            self.global_stats.delete(LogQueue::Append, rewrite_len);
490            self.global_stats
491                .delete(LogQueue::Rewrite, rewrite_indexes.len() - rewrite_len);
492            // rewrite-append always push forward.
493            assert!(pos + rewrite_len >= self.rewrite_count);
494            self.rewrite_count = pos + rewrite_len;
495        }
496    }
497
498    /// Appends some entries from rewrite queue. Assumes this table has no
499    /// append data.
500    ///
501    /// This method is only used for recovery.
502    pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
503        let len = entry_indexes.len();
504        if len > 0 {
505            debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
506            self.prepare_append(
507                entry_indexes[0].index,
508                // Rewrite -> Compact Append -> Rewrite.
509                true, /* allow_hole */
510                // Refer to case in `merge_append_table`. They can be adapted
511                // to attack this path via a global rewrite without deleting
512                // obsolete rewrite files.
513                true, /* allow_overwrite */
514            );
515            self.global_stats.add(LogQueue::Rewrite, len);
516            for ei in &entry_indexes {
517                self.entry_indexes.push_back(ei.into());
518            }
519            self.rewrite_count = self.entry_indexes.len();
520        }
521    }
522
523    /// Removes all entries with index smaller than `index`. Returns the number
524    /// of deleted entries.
525    pub fn compact_to(&mut self, index: u64) -> u64 {
526        if self.entry_indexes.is_empty() {
527            return 0;
528        }
529        let first = self.first_index;
530        if index <= first {
531            return 0;
532        }
533        let count = std::cmp::min((index - first) as usize, self.entry_indexes.len());
534        self.first_index = index;
535        self.entry_indexes.drain(..count);
536        self.maybe_shrink_entry_indexes();
537
538        let compacted_rewrite = std::cmp::min(count, self.rewrite_count);
539        self.rewrite_count -= compacted_rewrite;
540        self.global_stats
541            .delete(LogQueue::Rewrite, compacted_rewrite);
542        self.global_stats
543            .delete(LogQueue::Append, count - compacted_rewrite);
544        count as u64
545    }
546
547    pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
548        assert!(self.atomic_group.map_or(true, |(_, b)| b <= start));
549        self.atomic_group = Some((start, end));
550    }
551
552    /// Removes all entry indexes with index greater than or equal to `index`.
553    /// Assumes `index` <= `last`.
554    ///
555    /// Returns the number of deleted entries.
556    fn unsafe_truncate_back(&mut self, first: u64, index: u64, last: u64) -> usize {
557        debug_assert!(index <= last);
558        let len = self.entry_indexes.len();
559        debug_assert_eq!(len as u64, last - first + 1);
560        self.entry_indexes
561            .truncate(index.saturating_sub(first) as usize);
562        let new_len = self.entry_indexes.len();
563        let truncated = len - new_len;
564
565        if self.rewrite_count > new_len {
566            let truncated_rewrite = self.rewrite_count - new_len;
567            self.rewrite_count = new_len;
568            self.global_stats
569                .delete(LogQueue::Rewrite, truncated_rewrite);
570            self.global_stats
571                .delete(LogQueue::Append, truncated - truncated_rewrite);
572        } else {
573            self.global_stats.delete(LogQueue::Append, truncated);
574        }
575        truncated
576    }
577
578    /// Prepares to append entries with indexes starting at
579    /// `first_index_to_add`. After preparation, those entries can be directly
580    /// appended to internal container.
581    ///
582    /// When `allow_hole` is set, existing entries will be removes if there is a
583    /// hole detected. Otherwise, panic.
584    ///
585    /// When `allow_overwrite_compacted` is set, existing entries will be
586    /// removes if incoming entries attempt to overwrite compacted slots.
587    /// Otherwise, panic.
588    #[inline]
589    fn prepare_append(
590        &mut self,
591        first_index_to_add: u64,
592        allow_hole: bool,
593        allow_overwrite_compacted: bool,
594    ) {
595        if let Some((first, last)) = self.span() {
596            if first_index_to_add < first {
597                if allow_overwrite_compacted {
598                    self.unsafe_truncate_back(first, 0, last);
599                } else {
600                    panic!(
601                        "attempt to overwrite compacted entries in {}",
602                        self.region_id
603                    );
604                }
605                self.first_index = first_index_to_add;
606            } else if last + 1 < first_index_to_add {
607                if allow_hole {
608                    self.unsafe_truncate_back(first, 0, last);
609                } else {
610                    panic!("memtable {} has a hole", self.region_id);
611                }
612                self.first_index = first_index_to_add;
613            } else if first_index_to_add != last + 1 {
614                self.unsafe_truncate_back(first, first_index_to_add, last);
615            }
616        } else {
617            self.first_index = first_index_to_add;
618        }
619    }
620
621    #[inline]
622    fn maybe_shrink_entry_indexes(&mut self) {
623        if self.entry_indexes.capacity() >= CAPACITY_SHRINK_THRESHOLD {
624            self.entry_indexes.shrink_to_fit();
625        }
626    }
627
628    /// Pulls all entries between log index `begin` and `end` to the given
629    /// buffer. Returns error if any entry is missing.
630    ///
631    /// When `max_size` is present, stops pulling entries when the total size
632    /// reaches it.
633    pub fn fetch_entries_to(
634        &self,
635        begin: u64,
636        end: u64,
637        max_size: Option<usize>,
638        vec_idx: &mut Vec<EntryIndex>,
639    ) -> Result<()> {
640        if end <= begin {
641            return Ok(());
642        }
643        let len = self.entry_indexes.len();
644        if len == 0 {
645            return Err(Error::EntryNotFound);
646        }
647        let first = self.first_index;
648        if begin < first {
649            return Err(Error::EntryCompacted);
650        }
651        if end > self.first_index + len as u64 {
652            return Err(Error::EntryNotFound);
653        }
654
655        let start_pos = (begin - first) as usize;
656        let end_pos = (end - begin) as usize + start_pos;
657
658        let mut total_size = 0;
659        let mut index = begin;
660        for idx in self.entry_indexes.range(start_pos..end_pos) {
661            total_size += idx.entry_len;
662            // No matter max_size's value, fetch one entry at least.
663            if let Some(max_size) = max_size {
664                if total_size as usize > max_size && total_size > idx.entry_len {
665                    break;
666                }
667            }
668            vec_idx.push(EntryIndex::from_thin(index, *idx));
669            index += 1;
670        }
671        Ok(())
672    }
673
674    /// Pulls all append entries older than or equal to `gate`, to the provided
675    /// buffer.
676    pub fn fetch_entry_indexes_before(
677        &self,
678        gate: FileSeq,
679        vec_idx: &mut Vec<EntryIndex>,
680    ) -> Result<()> {
681        if let Some((first, last)) = self.span() {
682            let mut i = self.rewrite_count;
683            while first + i as u64 <= last && self.entry_indexes[i].entries.unwrap().id.seq <= gate
684            {
685                vec_idx.push(EntryIndex::from_thin(
686                    first + i as u64,
687                    self.entry_indexes[i],
688                ));
689                i += 1;
690            }
691        }
692        Ok(())
693    }
694
695    /// Pulls all rewrite entries to the provided buffer.
696    pub fn fetch_rewritten_entry_indexes(&self, vec_idx: &mut Vec<EntryIndex>) -> Result<()> {
697        if self.rewrite_count > 0 {
698            let first = self.first_index;
699            let end = self.first_index + self.rewrite_count as u64;
700            self.fetch_entries_to(first, end, None, vec_idx)
701        } else {
702            Ok(())
703        }
704    }
705
706    /// Pulls all key value pairs older than or equal to `gate`, to the provided
707    /// buffer.
708    pub fn fetch_kvs_before(&self, gate: FileSeq, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
709        for (key, (value, file_id)) in &self.kvs {
710            if file_id.queue == LogQueue::Append && file_id.seq <= gate {
711                vec.push((key.clone(), value.clone()));
712            }
713        }
714    }
715
716    /// Pulls all rewrite key value pairs to the provided buffer.
717    pub fn fetch_rewritten_kvs(&self, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
718        for (key, (value, file_id)) in &self.kvs {
719            if file_id.queue == LogQueue::Rewrite {
720                vec.push((key.clone(), value.clone()));
721            }
722        }
723    }
724
725    /// Returns the smallest file sequence number of entries or key value pairs
726    /// in this table.
727    pub fn min_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
728        let entry = match queue {
729            LogQueue::Append => self.entry_indexes.get(self.rewrite_count),
730            LogQueue::Rewrite if self.rewrite_count == 0 => None,
731            LogQueue::Rewrite => self.entry_indexes.front(),
732        };
733        let ents_min = entry.map(|e| e.entries.unwrap().id.seq);
734        let kvs_min = self
735            .kvs
736            .values()
737            .filter(|v| v.1.queue == queue)
738            .fold(None, |min, v| {
739                if let Some(min) = min {
740                    Some(std::cmp::min(min, v.1.seq))
741                } else {
742                    Some(v.1.seq)
743                }
744            });
745        let res = match (ents_min, kvs_min) {
746            (Some(ents_min), Some(kvs_min)) => std::cmp::min(kvs_min, ents_min),
747            (Some(ents_min), None) => ents_min,
748            (None, Some(kvs_min)) => kvs_min,
749            (None, None) => return None,
750        };
751        if queue == LogQueue::Rewrite {
752            if let Some((start, end)) = self.atomic_group {
753                if res <= end {
754                    return Some(std::cmp::min(start, res));
755                }
756            }
757        }
758        Some(res)
759    }
760
761    #[inline]
762    pub fn has_at_least_some_entries_before(&self, gate: FileId, count: usize) -> bool {
763        debug_assert!(count > 0);
764        self.entry_indexes
765            .get(count - 1)
766            .map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
767    }
768
769    /// Returns the region ID.
770    pub fn region_id(&self) -> u64 {
771        self.region_id
772    }
773
774    pub(crate) fn rewrite_count(&self) -> usize {
775        self.rewrite_count
776    }
777
778    /// Returns the log index of the first log entry.
779    pub fn first_index(&self) -> Option<u64> {
780        self.span().map(|s| s.0)
781    }
782
783    /// Returns the log index of the last log entry.
784    pub fn last_index(&self) -> Option<u64> {
785        self.span().map(|s| s.1)
786    }
787
788    #[allow(dead_code)]
789    fn heap_size(&self) -> usize {
790        // FIXME: cover the map of kvs.
791        self.entry_indexes.capacity() * std::mem::size_of::<EntryIndex>()
792    }
793
794    /// Returns the first and last log index of the entries in this table.
795    #[inline]
796    fn span(&self) -> Option<(u64, u64)> {
797        let len = self.entry_indexes.len();
798        if len > 0 {
799            Some((self.first_index, self.first_index + len as u64 - 1))
800        } else {
801            None
802        }
803    }
804
805    #[cfg(test)]
806    fn consistency_check(&self) {
807        let mut seen_append = false;
808        for idx in self.entry_indexes.iter() {
809            // rewrites are at the front.
810            let queue = idx.entries.unwrap().id.queue;
811            if queue == LogQueue::Append {
812                seen_append = true;
813            }
814            assert_eq!(
815                queue,
816                if seen_append {
817                    LogQueue::Append
818                } else {
819                    LogQueue::Rewrite
820                }
821            );
822        }
823    }
824}
825
826impl<A: AllocatorTrait> Drop for MemTable<A> {
827    fn drop(&mut self) {
828        let mut append_kvs = 0;
829        let mut rewrite_kvs = 0;
830        for (_v, id) in self.kvs.values() {
831            match id.queue {
832                LogQueue::Rewrite => rewrite_kvs += 1,
833                LogQueue::Append => append_kvs += 1,
834            }
835        }
836
837        self.global_stats
838            .delete(LogQueue::Rewrite, self.rewrite_count + rewrite_kvs);
839        self.global_stats.delete(
840            LogQueue::Append,
841            self.entry_indexes.len() - self.rewrite_count + append_kvs,
842        );
843    }
844}
845
846type MemTableMap<A> = HashMap<u64, Arc<RwLock<MemTable<A>>>>;
847pub type MemTableHandle = Arc<RwLock<MemTable<SelectedAllocator>>>;
848pub type MemTables = MemTableAccessor<SelectedAllocator>;
849
850/// A collection of [`MemTable`]s.
851///
852/// Internally, they are stored in multiple [`HashMap`]s, which are indexed by
853/// hashed region IDs.
854#[derive(Clone)]
855pub struct MemTableAccessor<A: AllocatorTrait> {
856    global_stats: Arc<GlobalStats>,
857    allocator: A,
858
859    /// A fixed-size array of maps of [`MemTable`]s.
860    slots: Vec<Arc<RwLock<MemTableMap<A>>>>,
861    /// Deleted [`MemTable`]s that are not yet rewritten.
862    removed_memtables: Arc<Mutex<VecDeque<u64>>>,
863}
864
865impl MemTableAccessor<VacantAllocator> {
866    pub fn new(global_stats: Arc<GlobalStats>) -> MemTableAccessor<VacantAllocator> {
867        let mut slots = Vec::with_capacity(MEMTABLE_SLOT_COUNT);
868        for _ in 0..MEMTABLE_SLOT_COUNT {
869            slots.push(Arc::new(RwLock::new(MemTableMap::default())));
870        }
871        MemTableAccessor {
872            global_stats,
873            allocator: new_vacant_allocator(),
874            slots,
875            removed_memtables: Default::default(),
876        }
877    }
878}
879
880impl MemTableAccessor<SelectedAllocator> {
881    pub fn memory_usage(&self) -> usize {
882        #[cfg(not(feature = "swap"))]
883        {
884            let mut total = 0;
885            for tables in &self.slots {
886                tables.read().values().for_each(|t| {
887                    total += t.read().heap_size();
888                });
889            }
890            total
891        }
892        #[cfg(feature = "swap")]
893        {
894            self.allocator.memory_usage()
895        }
896    }
897
898    pub(crate) fn flush_metrics(&self) {
899        MEMORY_USAGE.set(self.memory_usage() as i64);
900    }
901}
902
903impl<A: AllocatorTrait> MemTableAccessor<A> {
904    pub fn new_with_allocator(global_stats: Arc<GlobalStats>, allocator: A) -> MemTableAccessor<A> {
905        let mut slots = Vec::with_capacity(MEMTABLE_SLOT_COUNT);
906        for _ in 0..MEMTABLE_SLOT_COUNT {
907            slots.push(Arc::new(RwLock::new(MemTableMap::default())));
908        }
909        MemTableAccessor {
910            global_stats,
911            allocator,
912            slots,
913            removed_memtables: Default::default(),
914        }
915    }
916
917    pub fn get_or_insert(&self, raft_group_id: u64) -> Arc<RwLock<MemTable<A>>> {
918        let global_stats = self.global_stats.clone();
919        let mut memtables = self.slots[Self::slot_index(raft_group_id)].write();
920        let memtable = memtables.entry(raft_group_id).or_insert_with(|| {
921            let memtable =
922                MemTable::with_allocator(raft_group_id, global_stats.clone(), &self.allocator);
923            Arc::new(RwLock::new(memtable))
924        });
925        memtable.clone()
926    }
927
928    pub fn get(&self, raft_group_id: u64) -> Option<Arc<RwLock<MemTable<A>>>> {
929        self.slots[Self::slot_index(raft_group_id)]
930            .read()
931            .get(&raft_group_id)
932            .cloned()
933    }
934
935    pub fn insert(&self, raft_group_id: u64, memtable: Arc<RwLock<MemTable<A>>>) {
936        self.slots[Self::slot_index(raft_group_id)]
937            .write()
938            .insert(raft_group_id, memtable);
939    }
940
941    pub fn remove(&self, raft_group_id: u64, record_tombstone: bool) {
942        self.slots[Self::slot_index(raft_group_id)]
943            .write()
944            .remove(&raft_group_id);
945        if record_tombstone {
946            let mut removed_memtables = self.removed_memtables.lock();
947            removed_memtables.push_back(raft_group_id);
948        }
949    }
950
951    pub fn fold<B, F: Fn(B, &MemTable<A>) -> B>(&self, mut init: B, fold: F) -> B {
952        for tables in &self.slots {
953            for memtable in tables.read().values() {
954                init = fold(init, &*memtable.read());
955            }
956        }
957        init
958    }
959
960    pub fn collect<F: FnMut(&MemTable<A>) -> bool>(
961        &self,
962        mut condition: F,
963    ) -> Vec<Arc<RwLock<MemTable<A>>>> {
964        let mut memtables = Vec::new();
965        for tables in &self.slots {
966            memtables.extend(tables.read().values().filter_map(|t| {
967                if condition(&*t.read()) {
968                    return Some(t.clone());
969                }
970                None
971            }));
972        }
973        memtables
974    }
975
976    /// Returns a [`LogBatch`] containing `Command::Clean`s of all deleted
977    /// [`MemTable`]s. The records for these tables will be cleaned up
978    /// afterwards.
979    pub fn take_cleaned_region_logs(&self) -> LogBatch {
980        let mut log_batch = LogBatch::default();
981        let mut removed_memtables = self.removed_memtables.lock();
982        for id in removed_memtables.drain(..) {
983            log_batch.add_command(id, Command::Clean);
984        }
985        log_batch
986    }
987
988    /// Returns a [`HashSet`] containing region IDs of all deleted
989    /// [`MemTable`]s.
990    ///
991    /// This method is only used for recovery.
992    #[cfg(test)]
993    pub fn cleaned_region_ids(&self) -> HashSet<u64> {
994        let mut ids = HashSet::default();
995        let removed_memtables = self.removed_memtables.lock();
996        for raft_id in removed_memtables.iter() {
997            ids.insert(*raft_id);
998        }
999        ids
1000    }
1001
1002    /// Returns `true` if it does not contains any memtable.
1003    pub fn is_empty(&self) -> bool {
1004        for i in 0..MEMTABLE_SLOT_COUNT {
1005            if !self.slots[i].read().is_empty() {
1006                return false;
1007            }
1008        }
1009        true
1010    }
1011
1012    /// Merges with a newer neighbor [`MemTableAccessor`].
1013    ///
1014    /// This method is only used for recovery.
1015    pub fn merge_newer_neighbor(&self, mut rhs: Self) {
1016        for slot in rhs.slots.iter_mut() {
1017            for (raft_group_id, memtable) in slot.write().drain() {
1018                self.get_or_insert(raft_group_id)
1019                    .write()
1020                    .merge_newer_neighbor(memtable.write().borrow_mut());
1021            }
1022        }
1023        // Discarding neighbor's tombstones, they will be applied by
1024        // `MemTableRecoverContext`.
1025    }
1026
1027    /// Merges with a [`MemTableAccessor`] that contains only append data.
1028    /// Assumes `self` contains all rewritten data.
1029    ///
1030    /// This method is only used for recovery.
1031    pub fn merge_append_table(&self, mut rhs: Self) {
1032        for slot in rhs.slots.iter_mut() {
1033            for (id, memtable) in std::mem::take(&mut *slot.write()) {
1034                if let Some(existing_memtable) = self.get(id) {
1035                    existing_memtable
1036                        .write()
1037                        .merge_append_table(&mut *memtable.write());
1038                } else {
1039                    self.insert(id, memtable);
1040                }
1041            }
1042        }
1043        // Tombstones from both table are identical.
1044        debug_assert_eq!(
1045            self.removed_memtables.lock().len(),
1046            rhs.removed_memtables.lock().len()
1047        );
1048    }
1049
1050    /// Applies changes from log items that have been written to append queue.
1051    pub fn apply_append_writes(&self, log_items: impl Iterator<Item = LogItem>) {
1052        for item in log_items {
1053            if has_internal_key(&item) {
1054                continue;
1055            }
1056            let raft = item.raft_group_id;
1057            let memtable = self.get_or_insert(raft);
1058            fail_point!(
1059                "memtable_accessor::apply_append_writes::region_3",
1060                raft == 3,
1061                |_| {}
1062            );
1063            match item.content {
1064                LogItemContent::EntryIndexes(entries_to_add) => {
1065                    memtable.write().append(entries_to_add.0);
1066                }
1067                LogItemContent::Command(Command::Clean) => {
1068                    self.remove(raft, true /* record_tombstone */);
1069                }
1070                LogItemContent::Command(Command::Compact { index }) => {
1071                    memtable.write().compact_to(index);
1072                }
1073                LogItemContent::Kv(kv) => match kv.op_type {
1074                    OpType::Put => {
1075                        let value = kv.value.unwrap();
1076                        memtable.write().put(kv.key, value, kv.file_id.unwrap());
1077                    }
1078                    OpType::Del => {
1079                        let key = kv.key;
1080                        memtable.write().delete(key.as_slice());
1081                    }
1082                },
1083            }
1084        }
1085    }
1086
1087    /// Applies changes from log items that are replayed from a append queue.
1088    /// Assumes it haven't applied any rewrite data.
1089    ///
1090    /// This method is only used for recovery.
1091    pub fn replay_append_writes(&self, log_items: impl Iterator<Item = LogItem>) {
1092        for item in log_items {
1093            if has_internal_key(&item) {
1094                continue;
1095            }
1096            let raft = item.raft_group_id;
1097            let memtable = self.get_or_insert(raft);
1098            match item.content {
1099                LogItemContent::EntryIndexes(entries_to_add) => {
1100                    memtable.write().replay_append(entries_to_add.0);
1101                }
1102                LogItemContent::Command(Command::Clean) => {
1103                    self.remove(raft, true /* record_tombstone */);
1104                }
1105                LogItemContent::Command(Command::Compact { index }) => {
1106                    memtable.write().compact_to(index);
1107                }
1108                LogItemContent::Kv(kv) => match kv.op_type {
1109                    OpType::Put => {
1110                        let value = kv.value.unwrap();
1111                        memtable.write().put(kv.key, value, kv.file_id.unwrap());
1112                    }
1113                    OpType::Del => {
1114                        let key = kv.key;
1115                        memtable.write().delete(key.as_slice());
1116                    }
1117                },
1118            }
1119        }
1120    }
1121
1122    /// Applies changes from log items that have been written to rewrite queue.
1123    pub fn apply_rewrite_writes(
1124        &self,
1125        log_items: impl Iterator<Item = LogItem>,
1126        watermark: Option<FileSeq>,
1127        new_file: FileSeq,
1128    ) {
1129        for item in log_items {
1130            if has_internal_key(&item) {
1131                continue;
1132            }
1133            let raft = item.raft_group_id;
1134            let memtable = self.get_or_insert(raft);
1135            match item.content {
1136                LogItemContent::EntryIndexes(entries_to_add) => {
1137                    memtable.write().rewrite(entries_to_add.0, watermark);
1138                }
1139                LogItemContent::Kv(kv) => match kv.op_type {
1140                    OpType::Put => {
1141                        let key = kv.key;
1142                        memtable.write().rewrite_key(key, watermark, new_file);
1143                    }
1144                    _ => unreachable!(),
1145                },
1146                LogItemContent::Command(Command::Clean) => {}
1147                _ => unreachable!(),
1148            }
1149        }
1150    }
1151
1152    /// Applies changes from log items that are replayed from a rewrite queue.
1153    /// Assumes it haven't applied any append data.
1154    ///
1155    /// This method is only used for recovery.
1156    pub fn replay_rewrite_writes(&self, log_items: impl Iterator<Item = LogItem>) {
1157        for item in log_items {
1158            if has_internal_key(&item) {
1159                continue;
1160            }
1161            let raft = item.raft_group_id;
1162            let memtable = self.get_or_insert(raft);
1163            match item.content {
1164                LogItemContent::EntryIndexes(entries_to_add) => {
1165                    memtable.write().replay_rewrite(entries_to_add.0);
1166                }
1167                LogItemContent::Command(Command::Clean) => {
1168                    // Only append tombstone needs to be recorded.
1169                    self.remove(raft, false /* record_tombstone */);
1170                }
1171                LogItemContent::Command(Command::Compact { index }) => {
1172                    memtable.write().compact_to(index);
1173                }
1174                LogItemContent::Kv(kv) => match kv.op_type {
1175                    OpType::Put => {
1176                        let value = kv.value.unwrap();
1177                        memtable.write().put(kv.key, value, kv.file_id.unwrap());
1178                    }
1179                    OpType::Del => {
1180                        let key = kv.key;
1181                        memtable.write().delete(key.as_slice());
1182                    }
1183                },
1184            }
1185        }
1186    }
1187
1188    pub fn apply_rewrite_atomic_group(&self, raft: u64, start: FileSeq, end: FileSeq) {
1189        let memtable = self.get_or_insert(raft);
1190        memtable.write().apply_rewrite_atomic_group(start, end);
1191    }
1192
1193    #[inline]
1194    fn slot_index(id: u64) -> usize {
1195        debug_assert!(MEMTABLE_SLOT_COUNT.is_power_of_two());
1196        hash_u64(id) as usize & (MEMTABLE_SLOT_COUNT - 1)
1197    }
1198}
1199
1200#[inline]
1201fn has_internal_key(item: &LogItem) -> bool {
1202    matches!(&item.content, LogItemContent::Kv(KeyValue { key, .. }) if crate::is_internal_key(key, None))
1203}
1204
1205struct PendingAtomicGroup {
1206    status: AtomicGroupStatus,
1207    items: Vec<LogItem>,
1208    tombstone_items: Vec<LogItem>,
1209    start: FileSeq,
1210    end: FileSeq,
1211}
1212
1213pub struct MemTableRecoverContext<A: AllocatorTrait> {
1214    stats: Arc<GlobalStats>,
1215    // Tombstones that needs to be transmitted to other context.
1216    tombstone_items: Vec<LogItem>,
1217    memtables: MemTableAccessor<A>,
1218
1219    // All atomic groups that are not yet completed.
1220    // Each id maps to a list of groups. Each list contains at least one, at most two groups.
1221    pending_atomic_groups: HashMap<u64, Vec<PendingAtomicGroup>>,
1222}
1223
1224impl MemTableRecoverContext<VacantAllocator> {
1225    fn new() -> Self {
1226        let stats = Arc::new(GlobalStats::default());
1227        Self {
1228            stats: stats.clone(),
1229            tombstone_items: Vec::new(),
1230            memtables: MemTableAccessor::new(stats),
1231            pending_atomic_groups: HashMap::new(),
1232        }
1233    }
1234}
1235
1236impl<A: AllocatorTrait> MemTableRecoverContext<A> {
1237    fn new_with_allocator(allocator: A) -> Self {
1238        let stats = Arc::new(GlobalStats::default());
1239        Self {
1240            stats: stats.clone(),
1241            tombstone_items: Vec::new(),
1242            memtables: MemTableAccessor::new_with_allocator(stats, allocator),
1243            pending_atomic_groups: HashMap::new(),
1244        }
1245    }
1246
1247    pub fn finish(self) -> (MemTableAccessor<A>, Arc<GlobalStats>) {
1248        (self.memtables, self.stats)
1249    }
1250
1251    pub fn merge_append_context(&self, append: MemTableRecoverContext<A>) {
1252        self.memtables
1253            .apply_append_writes(append.tombstone_items.into_iter());
1254        self.memtables.merge_append_table(append.memtables);
1255    }
1256
1257    #[inline]
1258    fn is_tombstone(item: &LogItem) -> bool {
1259        match &item.content {
1260            LogItemContent::Command(Command::Clean)
1261            | LogItemContent::Command(Command::Compact { .. }) => true,
1262            LogItemContent::Kv(KeyValue { op_type, .. }) if *op_type == OpType::Del => true,
1263            _ => false,
1264        }
1265    }
1266
1267    fn accept_new_group(&mut self, queue: LogQueue, id: u64, mut new_group: PendingAtomicGroup) {
1268        assert_eq!(queue, LogQueue::Rewrite);
1269        if let Some(groups) = self.pending_atomic_groups.get_mut(&id) {
1270            let group = groups.last_mut().unwrap();
1271            match (group.status, new_group.status) {
1272                (AtomicGroupStatus::End, AtomicGroupStatus::Begin) => {
1273                    groups.push(new_group);
1274                }
1275                // (begin, begin), (middle, begin)
1276                (_, AtomicGroupStatus::Begin) => {
1277                    warn!(
1278                        "discard old atomic group, status: {:?}, raft_group_id: {:?}",
1279                        group.status,
1280                        group.items.first().map(|item| item.raft_group_id)
1281                    );
1282                    *group = new_group;
1283                }
1284                // (end, middle), (end, end)
1285                (AtomicGroupStatus::End, _) => {
1286                    warn!(
1287                        "discard new atomic group, status: {:?}, raft_group_id: {:?}",
1288                        new_group.status,
1289                        new_group.items.first().map(|item| item.raft_group_id)
1290                    );
1291                }
1292                (AtomicGroupStatus::Begin, AtomicGroupStatus::Middle)
1293                | (AtomicGroupStatus::Middle, AtomicGroupStatus::Middle) => {
1294                    group.items.append(&mut new_group.items);
1295                    group.tombstone_items.append(&mut new_group.tombstone_items);
1296                    assert!(group.end <= new_group.start);
1297                    group.end = new_group.end;
1298                }
1299                (AtomicGroupStatus::Middle, AtomicGroupStatus::End) => {
1300                    group.items.append(&mut new_group.items);
1301                    group.tombstone_items.append(&mut new_group.tombstone_items);
1302                    group.status = new_group.status;
1303                    assert!(group.end <= new_group.start);
1304                    group.end = new_group.end;
1305                }
1306                (AtomicGroupStatus::Begin, AtomicGroupStatus::End) => {
1307                    let mut group = groups.pop().unwrap();
1308                    let mut rids = HashSet::with_capacity(1);
1309                    for item in group
1310                        .items
1311                        .iter()
1312                        .chain(group.tombstone_items.iter())
1313                        .chain(new_group.items.iter())
1314                        .chain(new_group.tombstone_items.iter())
1315                    {
1316                        rids.insert(item.raft_group_id);
1317                    }
1318                    self.tombstone_items.append(&mut group.tombstone_items);
1319                    self.tombstone_items.append(&mut new_group.tombstone_items);
1320                    self.memtables
1321                        .replay_rewrite_writes(group.items.into_iter());
1322                    self.memtables
1323                        .replay_rewrite_writes(new_group.items.into_iter());
1324                    assert!(group.end <= new_group.start);
1325                    for rid in rids {
1326                        self.memtables
1327                            .apply_rewrite_atomic_group(rid, group.start, new_group.end);
1328                    }
1329                }
1330            }
1331            if groups.is_empty() {
1332                self.pending_atomic_groups.remove(&id);
1333            }
1334        } else {
1335            self.pending_atomic_groups.insert(id, vec![new_group]);
1336        }
1337    }
1338}
1339
1340impl Default for MemTableRecoverContext<VacantAllocator> {
1341    fn default() -> Self {
1342        Self::new()
1343    }
1344}
1345
1346impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
1347    fn replay(&mut self, mut item_batch: LogItemBatch, file_id: FileId) -> Result<()> {
1348        if file_id.queue == LogQueue::Append {
1349            let mut new_tombstones = Vec::new();
1350            self.memtables
1351                .replay_append_writes(item_batch.drain().filter(|item| {
1352                    if Self::is_tombstone(item) {
1353                        new_tombstones.push(item.clone());
1354                    }
1355                    true
1356                }));
1357            self.tombstone_items.append(&mut new_tombstones);
1358        } else {
1359            let mut new_tombstones = Vec::new();
1360            let mut is_group = None;
1361            let items = item_batch
1362                .drain()
1363                .filter(|item| {
1364                    if let Some(g) = AtomicGroupStatus::parse(item) {
1365                        if is_group.is_none() {
1366                            is_group = Some(g);
1367                        } else {
1368                            let msg = format!("skipped an atomic group: {g:?}");
1369                            error!("{msg}");
1370                            debug_assert!(false, "{}", msg);
1371                        }
1372                        return false;
1373                    }
1374                    if Self::is_tombstone(item) {
1375                        new_tombstones.push(item.clone());
1376                    }
1377                    true
1378                })
1379                .collect();
1380            if let Some((id, status)) = is_group {
1381                self.accept_new_group(
1382                    file_id.queue,
1383                    id,
1384                    PendingAtomicGroup {
1385                        status,
1386                        items,
1387                        tombstone_items: new_tombstones,
1388                        start: file_id.seq,
1389                        end: file_id.seq,
1390                    },
1391                );
1392            } else {
1393                self.tombstone_items.append(&mut new_tombstones);
1394                self.memtables.replay_rewrite_writes(items.into_iter());
1395            }
1396        }
1397        Ok(())
1398    }
1399
1400    fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
1401        self.tombstone_items
1402            .append(&mut rhs.tombstone_items.clone());
1403        for (id, groups) in rhs.pending_atomic_groups.drain() {
1404            for group in groups {
1405                self.accept_new_group(queue, id, group);
1406            }
1407        }
1408        match queue {
1409            LogQueue::Append => self
1410                .memtables
1411                .replay_append_writes(rhs.tombstone_items.into_iter()),
1412            LogQueue::Rewrite => self
1413                .memtables
1414                .replay_rewrite_writes(rhs.tombstone_items.into_iter()),
1415        }
1416        self.memtables.merge_newer_neighbor(rhs.memtables);
1417        Ok(())
1418    }
1419}
1420
1421pub struct MemTableRecoverContextFactory {
1422    allocator: SelectedAllocator,
1423}
1424
1425impl MemTableRecoverContextFactory {
1426    pub fn new(cfg: &Config) -> Self {
1427        Self {
1428            allocator: new_allocator(cfg),
1429        }
1430    }
1431}
1432
1433impl Factory<MemTableRecoverContext<SelectedAllocator>> for MemTableRecoverContextFactory {
1434    fn new_target(&self) -> MemTableRecoverContext<SelectedAllocator> {
1435        MemTableRecoverContext::new_with_allocator(self.allocator.clone())
1436    }
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441    use super::*;
1442    use crate::test_util::{catch_unwind_silent, generate_entry_indexes};
1443
1444    impl<A: AllocatorTrait> MemTable<A> {
1445        fn max_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
1446            let entry = match queue {
1447                LogQueue::Append if self.rewrite_count == self.entry_indexes.len() => None,
1448                LogQueue::Append => self.entry_indexes.back(),
1449                LogQueue::Rewrite if self.rewrite_count == 0 => None,
1450                LogQueue::Rewrite => self.entry_indexes.get(self.rewrite_count - 1),
1451            };
1452            let ents_max = entry.map(|e| e.entries.unwrap().id.seq);
1453
1454            let kvs_max = self.kvs_max_file_seq(queue);
1455            match (ents_max, kvs_max) {
1456                (Some(ents_max), Some(kvs_max)) => Some(FileSeq::max(kvs_max, ents_max)),
1457                (Some(ents_max), None) => Some(ents_max),
1458                (None, Some(kvs_max)) => Some(kvs_max),
1459                (None, None) => None,
1460            }
1461        }
1462
1463        pub fn kvs_max_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
1464            self.kvs
1465                .values()
1466                .filter(|v| v.1.queue == queue)
1467                .fold(None, |max, v| {
1468                    if let Some(max) = max {
1469                        Some(std::cmp::max(max, v.1.seq))
1470                    } else {
1471                        Some(v.1.seq)
1472                    }
1473                })
1474        }
1475
1476        pub fn fetch_all(&self, vec_idx: &mut Vec<EntryIndex>) {
1477            if let Some((first, last)) = self.span() {
1478                self.fetch_entries_to(first, last + 1, None, vec_idx)
1479                    .unwrap();
1480            }
1481        }
1482
1483        fn entries_size(&self) -> usize {
1484            self.entry_indexes
1485                .iter()
1486                .fold(0, |acc, e| acc + e.entry_len) as usize
1487        }
1488    }
1489
1490    #[test]
1491    fn test_memtable_append() {
1492        let region_id = 8;
1493        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1494
1495        // Append entries [10, 20) file_num = 1.
1496        // after appending
1497        // [10, 20) file_num = 1
1498        memtable.append(generate_entry_indexes(
1499            10,
1500            20,
1501            FileId::new(LogQueue::Append, 1),
1502        ));
1503        assert_eq!(memtable.entries_size(), 10);
1504        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1505        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1);
1506        memtable.consistency_check();
1507
1508        // Empty.
1509        memtable.append(Vec::new());
1510
1511        // Hole.
1512        assert!(
1513            catch_unwind_silent(|| memtable.append(generate_entry_indexes(
1514                21,
1515                22,
1516                FileId::dummy(LogQueue::Append)
1517            )))
1518            .is_err()
1519        );
1520        memtable.consistency_check();
1521
1522        // Append entries [20, 30) file_num = 2.
1523        // after appending:
1524        // [10, 20) file_num = 1
1525        // [20, 30) file_num = 2
1526        memtable.append(generate_entry_indexes(
1527            20,
1528            30,
1529            FileId::new(LogQueue::Append, 2),
1530        ));
1531        assert_eq!(memtable.entries_size(), 20);
1532        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1533        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 2);
1534        memtable.consistency_check();
1535        assert_eq!(
1536            memtable.global_stats.live_entries(LogQueue::Append),
1537            memtable.entries_size()
1538        );
1539
1540        // Partial overlap Appending.
1541        // Append entries [25, 35) file_num = 3.
1542        // After appending:
1543        // [10, 20) file_num = 1
1544        // [20, 25) file_num = 2
1545        // [25, 35) file_num = 3
1546        memtable.append(generate_entry_indexes(
1547            25,
1548            35,
1549            FileId::new(LogQueue::Append, 3),
1550        ));
1551        assert_eq!(memtable.entries_size(), 25);
1552        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1553        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1554        memtable.consistency_check();
1555        assert_eq!(
1556            memtable.global_stats.live_entries(LogQueue::Append),
1557            memtable.entries_size()
1558        );
1559
1560        // Full overlap Appending.
1561        // Append entries [10, 40) file_num = 4.
1562        // After appending:
1563        // [10, 40) file_num = 4
1564        memtable.append(generate_entry_indexes(
1565            10,
1566            40,
1567            FileId::new(LogQueue::Append, 4),
1568        ));
1569        assert_eq!(memtable.entries_size(), 30);
1570        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 4);
1571        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
1572        memtable.consistency_check();
1573        assert_eq!(
1574            memtable.global_stats.live_entries(LogQueue::Append),
1575            memtable.entries_size()
1576        );
1577
1578        let global_stats = Arc::clone(&memtable.global_stats);
1579        drop(memtable);
1580        assert_eq!(global_stats.live_entries(LogQueue::Append), 0);
1581    }
1582
1583    #[test]
1584    fn test_memtable_compact() {
1585        let region_id = 8;
1586        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1587
1588        // After appending:
1589        // [0, 10) file_num = 1
1590        // [10, 20) file_num = 2
1591        // [20, 25) file_num = 3
1592        memtable.append(generate_entry_indexes(
1593            0,
1594            10,
1595            FileId::new(LogQueue::Append, 1),
1596        ));
1597        memtable.append(generate_entry_indexes(
1598            10,
1599            15,
1600            FileId::new(LogQueue::Append, 2),
1601        ));
1602        memtable.append(generate_entry_indexes(
1603            15,
1604            20,
1605            FileId::new(LogQueue::Append, 2),
1606        ));
1607        memtable.append(generate_entry_indexes(
1608            20,
1609            25,
1610            FileId::new(LogQueue::Append, 3),
1611        ));
1612
1613        assert_eq!(memtable.entries_size(), 25);
1614        assert_eq!(memtable.first_index().unwrap(), 0);
1615        assert_eq!(memtable.last_index().unwrap(), 24);
1616        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1617        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1618        assert_eq!(
1619            memtable.global_stats.live_entries(LogQueue::Append),
1620            memtable.entries_size()
1621        );
1622        memtable.consistency_check();
1623
1624        // Compact to 5.
1625        // Only index is needed to compact.
1626        assert_eq!(memtable.compact_to(5), 5);
1627        assert_eq!(memtable.entries_size(), 20);
1628        assert_eq!(memtable.first_index().unwrap(), 5);
1629        assert_eq!(memtable.last_index().unwrap(), 24);
1630        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1631        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1632        assert_eq!(
1633            memtable.global_stats.live_entries(LogQueue::Append),
1634            memtable.entries_size()
1635        );
1636        // Can't override compacted entries.
1637        assert!(
1638            catch_unwind_silent(|| memtable.append(generate_entry_indexes(
1639                4,
1640                5,
1641                FileId::dummy(LogQueue::Append)
1642            )))
1643            .is_err()
1644        );
1645        memtable.consistency_check();
1646
1647        // Compact to 20.
1648        assert_eq!(memtable.compact_to(20), 15);
1649        assert_eq!(memtable.entries_size(), 5);
1650        assert_eq!(memtable.first_index().unwrap(), 20);
1651        assert_eq!(memtable.last_index().unwrap(), 24);
1652        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 3);
1653        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
1654        assert_eq!(
1655            memtable.global_stats.live_entries(LogQueue::Append),
1656            memtable.entries_size()
1657        );
1658        memtable.consistency_check();
1659
1660        // Compact to 20 or smaller index, nothing happens.
1661        assert_eq!(memtable.compact_to(20), 0);
1662        assert_eq!(memtable.compact_to(15), 0);
1663        assert_eq!(memtable.entries_size(), 5);
1664        assert_eq!(memtable.first_index().unwrap(), 20);
1665        assert_eq!(memtable.last_index().unwrap(), 24);
1666        memtable.consistency_check();
1667    }
1668
1669    #[test]
1670    fn test_memtable_fetch() {
1671        let region_id = 8;
1672        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1673
1674        let mut ents_idx = vec![];
1675
1676        // Fetch empty.
1677        memtable.fetch_all(&mut ents_idx);
1678        assert!(ents_idx.is_empty());
1679        memtable
1680            .fetch_entries_to(0, 0, None, &mut ents_idx)
1681            .unwrap();
1682        assert!(matches!(
1683            memtable
1684                .fetch_entries_to(0, 1, None, &mut ents_idx)
1685                .unwrap_err(),
1686            Error::EntryNotFound
1687        ));
1688
1689        // After appending:
1690        // [0, 10) file_num = 1
1691        // [10, 15) file_num = 2
1692        // [15, 20) file_num = 2
1693        // [20, 25) file_num = 3
1694        memtable.append(generate_entry_indexes(
1695            0,
1696            10,
1697            FileId::new(LogQueue::Append, 1),
1698        ));
1699        memtable.append(generate_entry_indexes(
1700            10,
1701            20,
1702            FileId::new(LogQueue::Append, 2),
1703        ));
1704        memtable.append(generate_entry_indexes(
1705            20,
1706            25,
1707            FileId::new(LogQueue::Append, 3),
1708        ));
1709
1710        // Fetching all
1711        memtable.fetch_all(&mut ents_idx);
1712        assert_eq!(ents_idx.len(), 25);
1713        assert_eq!(ents_idx[0].index, 0);
1714        assert_eq!(ents_idx[24].index, 24);
1715
1716        // After compact:
1717        // [10, 15) file_num = 2
1718        // [15, 20) file_num = 2
1719        // [20, 25) file_num = 3
1720        assert_eq!(memtable.compact_to(10), 10);
1721
1722        // Out of range fetching.
1723        ents_idx.clear();
1724        assert!(matches!(
1725            memtable
1726                .fetch_entries_to(5, 15, None, &mut ents_idx)
1727                .unwrap_err(),
1728            Error::EntryCompacted
1729        ));
1730
1731        // Out of range fetching.
1732        ents_idx.clear();
1733        assert!(matches!(
1734            memtable
1735                .fetch_entries_to(20, 30, None, &mut ents_idx)
1736                .unwrap_err(),
1737            Error::EntryNotFound
1738        ));
1739
1740        ents_idx.clear();
1741        memtable
1742            .fetch_entries_to(20, 25, None, &mut ents_idx)
1743            .unwrap();
1744        assert_eq!(ents_idx.len(), 5);
1745        assert_eq!(ents_idx[0].index, 20);
1746        assert_eq!(ents_idx[4].index, 24);
1747
1748        ents_idx.clear();
1749        memtable
1750            .fetch_entries_to(10, 15, None, &mut ents_idx)
1751            .unwrap();
1752        assert_eq!(ents_idx.len(), 5);
1753        assert_eq!(ents_idx[0].index, 10);
1754        assert_eq!(ents_idx[4].index, 14);
1755
1756        ents_idx.clear();
1757        memtable
1758            .fetch_entries_to(10, 25, None, &mut ents_idx)
1759            .unwrap();
1760        assert_eq!(ents_idx.len(), 15);
1761        assert_eq!(ents_idx[0].index, 10);
1762        assert_eq!(ents_idx[14].index, 24);
1763
1764        // Max size limitation range fetching.
1765        // Only can fetch [10, 20) because of size limitation,
1766        ents_idx.clear();
1767        let max_size = Some(10);
1768        memtable
1769            .fetch_entries_to(10, 25, max_size, &mut ents_idx)
1770            .unwrap();
1771        assert_eq!(ents_idx.len(), 10);
1772        assert_eq!(ents_idx[0].index, 10);
1773        assert_eq!(ents_idx[9].index, 19);
1774
1775        // Even max size limitation is 0, at least fetch one entry.
1776        ents_idx.clear();
1777        memtable
1778            .fetch_entries_to(20, 25, Some(0), &mut ents_idx)
1779            .unwrap();
1780        assert_eq!(ents_idx.len(), 1);
1781        assert_eq!(ents_idx[0].index, 20);
1782    }
1783
1784    #[test]
1785    fn test_memtable_fetch_rewrite() {
1786        let region_id = 8;
1787        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1788        let (k1, v1) = (b"key1", b"value1");
1789        let (k2, v2) = (b"key2", b"value2");
1790        let (k3, v3) = (b"key3", b"value3");
1791
1792        // After appending:
1793        // [0, 10) file_num = 1
1794        // [10, 20) file_num = 2
1795        // [20, 25) file_num = 3
1796        memtable.append(generate_entry_indexes(
1797            0,
1798            10,
1799            FileId::new(LogQueue::Append, 1),
1800        ));
1801        memtable.put(k1.to_vec(), v1.to_vec(), FileId::new(LogQueue::Append, 1));
1802        memtable.append(generate_entry_indexes(
1803            10,
1804            20,
1805            FileId::new(LogQueue::Append, 2),
1806        ));
1807        memtable.put(k2.to_vec(), v2.to_vec(), FileId::new(LogQueue::Append, 2));
1808        memtable.append(generate_entry_indexes(
1809            20,
1810            25,
1811            FileId::new(LogQueue::Append, 3),
1812        ));
1813        memtable.put(k3.to_vec(), v3.to_vec(), FileId::new(LogQueue::Append, 3));
1814        memtable.consistency_check();
1815
1816        // Rewrite k1.
1817        memtable.rewrite_key(k1.to_vec(), Some(1), 50);
1818        let mut kvs = Vec::new();
1819        memtable.fetch_kvs_before(1, &mut kvs);
1820        assert!(kvs.is_empty());
1821        memtable.fetch_rewritten_kvs(&mut kvs);
1822        assert_eq!(kvs.len(), 1);
1823        assert_eq!(kvs.pop().unwrap(), (k1.to_vec(), v1.to_vec()));
1824        // Rewrite deleted k1.
1825        memtable.delete(k1.as_ref());
1826        assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1);
1827        memtable.rewrite_key(k1.to_vec(), Some(1), 50);
1828        assert_eq!(memtable.get(k1.as_ref()), None);
1829        memtable.fetch_rewritten_kvs(&mut kvs);
1830        assert!(kvs.is_empty());
1831        assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 2);
1832        // Rewrite newer append k2/k3.
1833        memtable.rewrite_key(k2.to_vec(), Some(1), 50);
1834        memtable.fetch_rewritten_kvs(&mut kvs);
1835        assert!(kvs.is_empty());
1836        memtable.rewrite_key(k3.to_vec(), None, 50); // Rewrite encounters newer append.
1837        memtable.fetch_rewritten_kvs(&mut kvs);
1838        assert!(kvs.is_empty());
1839        assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 4);
1840        // Rewrite k3 multiple times.
1841        memtable.rewrite_key(k3.to_vec(), Some(10), 50);
1842        memtable.rewrite_key(k3.to_vec(), None, 51);
1843        memtable.rewrite_key(k3.to_vec(), Some(11), 52);
1844        memtable.fetch_rewritten_kvs(&mut kvs);
1845        assert_eq!(kvs.len(), 1);
1846        assert_eq!(kvs.pop().unwrap(), (k3.to_vec(), v3.to_vec()));
1847
1848        // Rewrite indexes:
1849        // [0, 10) queue = rewrite, file_num = 1,
1850        // [10, 20) file_num = 2
1851        // [20, 25) file_num = 3
1852        let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1));
1853        memtable.rewrite(ents_idx, Some(1));
1854        assert_eq!(memtable.entries_size(), 25);
1855        memtable.consistency_check();
1856
1857        let mut ents_idx = vec![];
1858        assert!(memtable
1859            .fetch_entry_indexes_before(2, &mut ents_idx)
1860            .is_ok());
1861        assert_eq!(ents_idx.len(), 10);
1862        assert_eq!(ents_idx.last().unwrap().index, 19);
1863        ents_idx.clear();
1864        assert!(memtable
1865            .fetch_entry_indexes_before(1, &mut ents_idx)
1866            .is_ok());
1867        assert!(ents_idx.is_empty());
1868
1869        ents_idx.clear();
1870        assert!(memtable
1871            .fetch_rewritten_entry_indexes(&mut ents_idx)
1872            .is_ok());
1873        assert_eq!(ents_idx.len(), 10);
1874        assert_eq!(ents_idx.first().unwrap().index, 0);
1875        assert_eq!(ents_idx.last().unwrap().index, 9);
1876    }
1877
1878    #[test]
1879    fn test_memtable_kv_operations() {
1880        fn key(i: u64) -> Vec<u8> {
1881            format!("k{i}").as_bytes().to_vec()
1882        }
1883        fn value(i: u64) -> Vec<u8> {
1884            format!("v{i}").as_bytes().to_vec()
1885        }
1886
1887        let region_id = 8;
1888        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1889
1890        memtable.put(key(1), value(1), FileId::new(LogQueue::Append, 1));
1891        memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 5));
1892        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1893        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5);
1894        assert_eq!(memtable.get(&key(1)), Some(value(1)));
1895        assert_eq!(memtable.get(&key(5)), Some(value(5)));
1896
1897        let mut res = Vec::new();
1898        memtable
1899            .scan(None, None, false, |k, v| {
1900                res.push((k.to_vec(), v.to_vec()));
1901                false
1902            })
1903            .unwrap();
1904        assert_eq!(res, vec![(key(1), value(1))]);
1905        res.clear();
1906        memtable
1907            .scan(None, None, true, |k, v| {
1908                res.push((k.to_vec(), v.to_vec()));
1909                false
1910            })
1911            .unwrap();
1912        assert_eq!(res, vec![(key(5), value(5))]);
1913        res.clear();
1914        memtable
1915            .scan(Some(&key(5)), None, false, |key, value| {
1916                res.push((key.to_vec(), value.to_vec()));
1917                true
1918            })
1919            .unwrap();
1920        assert_eq!(res, vec![(key(5), value(5))]);
1921        res.clear();
1922        memtable
1923            .scan(Some(&key(1)), Some(&key(5)), false, |key, value| {
1924                res.push((key.to_vec(), value.to_vec()));
1925                true
1926            })
1927            .unwrap();
1928        assert_eq!(res, vec![(key(1), value(1))]);
1929
1930        memtable.delete(&key(5));
1931        assert_eq!(memtable.get(&key(5)), None);
1932        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
1933        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1);
1934
1935        memtable.put(key(1), value(1), FileId::new(LogQueue::Rewrite, 2));
1936        memtable.put(key(5), value(5), FileId::new(LogQueue::Rewrite, 3));
1937        assert_eq!(memtable.min_file_seq(LogQueue::Append), None);
1938        assert_eq!(memtable.max_file_seq(LogQueue::Append), None);
1939        assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 2);
1940        assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3);
1941        assert_eq!(memtable.global_stats.rewrite_entries(), 2);
1942
1943        memtable.delete(&key(1));
1944        assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 3);
1945        assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3);
1946        assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1);
1947
1948        memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 7));
1949        assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None);
1950        assert_eq!(memtable.max_file_seq(LogQueue::Rewrite), None);
1951        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 7);
1952        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 7);
1953        assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 2);
1954    }
1955
1956    #[test]
1957    fn test_memtable_get_entry() {
1958        let region_id = 8;
1959        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1960
1961        assert_eq!(memtable.get_entry(0), None);
1962
1963        // [5, 10) file_num = 1
1964        // [10, 20) file_num = 2
1965        memtable.append(generate_entry_indexes(
1966            5,
1967            10,
1968            FileId::new(LogQueue::Append, 1),
1969        ));
1970        memtable.append(generate_entry_indexes(
1971            10,
1972            20,
1973            FileId::new(LogQueue::Append, 2),
1974        ));
1975
1976        // Not in range.
1977        assert_eq!(memtable.get_entry(2), None);
1978        assert_eq!(memtable.get_entry(25), None);
1979
1980        let entry_idx = memtable.get_entry(5);
1981        assert_eq!(entry_idx.unwrap().index, 5);
1982    }
1983
1984    #[test]
1985    fn test_memtable_rewrite() {
1986        let region_id = 8;
1987        let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
1988        let mut expected_append = 0;
1989        let mut expected_rewrite = 0;
1990        let mut expected_deleted_rewrite = 0;
1991
1992        // Rewrite to empty table.
1993        let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1));
1994        memtable.rewrite(ents_idx, Some(1));
1995        expected_rewrite += 10;
1996        expected_deleted_rewrite += 10;
1997        assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None);
1998        assert_eq!(
1999            memtable.global_stats.live_entries(LogQueue::Append),
2000            expected_append
2001        );
2002        assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2003        assert_eq!(
2004            memtable.global_stats.deleted_rewrite_entries(),
2005            expected_deleted_rewrite
2006        );
2007
2008        // Append and compact:
2009        // [10, 20) file_num = 2
2010        // [20, 30) file_num = 3
2011        // [30, 40) file_num = 4
2012        // kk1 -> 2, kk2 -> 3, kk3 -> 4
2013        memtable.append(generate_entry_indexes(
2014            0,
2015            10,
2016            FileId::new(LogQueue::Append, 1),
2017        ));
2018        memtable.append(generate_entry_indexes(
2019            10,
2020            20,
2021            FileId::new(LogQueue::Append, 2),
2022        ));
2023        memtable.put(
2024            b"kk1".to_vec(),
2025            b"vv1".to_vec(),
2026            FileId::new(LogQueue::Append, 2),
2027        );
2028        memtable.append(generate_entry_indexes(
2029            20,
2030            30,
2031            FileId::new(LogQueue::Append, 3),
2032        ));
2033        memtable.put(
2034            b"kk2".to_vec(),
2035            b"vv2".to_vec(),
2036            FileId::new(LogQueue::Append, 3),
2037        );
2038        memtable.append(generate_entry_indexes(
2039            30,
2040            40,
2041            FileId::new(LogQueue::Append, 4),
2042        ));
2043        memtable.put(
2044            b"kk3".to_vec(),
2045            b"vv3".to_vec(),
2046            FileId::new(LogQueue::Append, 4),
2047        );
2048        expected_append += 4 * 10 + 3;
2049        memtable.compact_to(10);
2050        expected_append -= 10;
2051        assert_eq!(memtable.entries_size(), 30);
2052        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 2);
2053        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
2054        assert_eq!(
2055            memtable.global_stats.live_entries(LogQueue::Append),
2056            expected_append
2057        );
2058        memtable.consistency_check();
2059
2060        // Rewrite compacted entries.
2061        // [10, 20) file_num = 2
2062        // [20, 30) file_num = 3
2063        // [30, 40) file_num = 4
2064        // kk1 -> 2, kk2 -> 3, kk3 -> 4
2065        let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 50));
2066        memtable.rewrite(ents_idx, Some(1));
2067        memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
2068        expected_rewrite += 10 + 1;
2069        expected_deleted_rewrite += 10 + 1;
2070        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 2);
2071        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
2072        assert!(memtable.min_file_seq(LogQueue::Rewrite).is_none());
2073        assert!(memtable.max_file_seq(LogQueue::Rewrite).is_none());
2074        assert_eq!(memtable.rewrite_count, 0);
2075        assert_eq!(memtable.get(b"kk0"), None);
2076        assert_eq!(
2077            memtable.global_stats.live_entries(LogQueue::Append),
2078            expected_append
2079        );
2080        assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2081        assert_eq!(
2082            memtable.global_stats.deleted_rewrite_entries(),
2083            expected_deleted_rewrite
2084        );
2085        memtable.consistency_check();
2086
2087        // Mixed rewrite.
2088        // [10, 20) file_num = 100(r)
2089        // [20, 30) file_num = 101(r)
2090        // [30, 40) file_num = 4
2091        // kk1 -> 100(r), kk2 -> 101(r), kk3 -> 4
2092        let ents_idx = generate_entry_indexes(0, 20, FileId::new(LogQueue::Rewrite, 100));
2093        memtable.rewrite(ents_idx, Some(2));
2094        memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
2095        memtable.rewrite_key(b"kk1".to_vec(), Some(2), 100);
2096        expected_append -= 10 + 1;
2097        expected_rewrite += 20 + 2;
2098        expected_deleted_rewrite += 10 + 1;
2099        let ents_idx = generate_entry_indexes(20, 30, FileId::new(LogQueue::Rewrite, 101));
2100        memtable.rewrite(ents_idx, Some(3));
2101        memtable.rewrite_key(b"kk2".to_vec(), Some(3), 101);
2102        expected_append -= 10 + 1;
2103        expected_rewrite += 10 + 1;
2104        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 4);
2105        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
2106        assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 100);
2107        assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 101);
2108        assert_eq!(memtable.rewrite_count, 20);
2109        assert_eq!(memtable.get(b"kk1"), Some(b"vv1".to_vec()));
2110        assert_eq!(
2111            memtable.global_stats.live_entries(LogQueue::Append),
2112            expected_append
2113        );
2114        assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2115        assert_eq!(
2116            memtable.global_stats.deleted_rewrite_entries(),
2117            expected_deleted_rewrite
2118        );
2119        memtable.consistency_check();
2120
2121        // Put some entries overwritting entires in file 4. Then try to rewrite.
2122        // [10, 20) file_num = 100(r)
2123        // [20, 30) file_num = 101(r)
2124        // [30, 35) file_num = 4 -> 102(r)
2125        // 35 file_num = 5
2126        // kk1 -> 100(r), kk2 -> 101(r), kk3 -> 5
2127        memtable.append(generate_entry_indexes(
2128            35,
2129            36,
2130            FileId::new(LogQueue::Append, 5),
2131        ));
2132        expected_append -= 4;
2133        memtable.put(
2134            b"kk3".to_vec(),
2135            b"vv33".to_vec(),
2136            FileId::new(LogQueue::Append, 5),
2137        );
2138        assert_eq!(memtable.last_index().unwrap(), 35);
2139        memtable.consistency_check();
2140        let ents_idx = generate_entry_indexes(30, 40, FileId::new(LogQueue::Rewrite, 102));
2141        memtable.rewrite(ents_idx, Some(4));
2142        expected_append -= 5;
2143        expected_rewrite += 10;
2144        expected_deleted_rewrite += 5;
2145        assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 5);
2146        assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5);
2147        assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 100);
2148        assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 102);
2149        assert_eq!(memtable.rewrite_count, 25);
2150        assert_eq!(memtable.get(b"kk3"), Some(b"vv33".to_vec()));
2151        assert_eq!(
2152            memtable.global_stats.live_entries(LogQueue::Append),
2153            expected_append
2154        );
2155        assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2156        assert_eq!(
2157            memtable.global_stats.deleted_rewrite_entries(),
2158            expected_deleted_rewrite
2159        );
2160        memtable.consistency_check();
2161
2162        // Compact after rewrite.
2163        // [30, 35) file_num = 102(r)
2164        // [35, 50) file_num = 6
2165        // kk1 -> 100(r), kk2 -> 101(r), kk3 -> 5
2166        memtable.append(generate_entry_indexes(
2167            35,
2168            50,
2169            FileId::new(LogQueue::Append, 6),
2170        ));
2171        expected_append += 15 - 1;
2172        memtable.compact_to(30);
2173        expected_deleted_rewrite += 20;
2174        assert_eq!(memtable.last_index().unwrap(), 49);
2175        assert_eq!(memtable.rewrite_count, 5);
2176        assert_eq!(
2177            memtable.global_stats.live_entries(LogQueue::Append),
2178            expected_append
2179        );
2180        assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2181        assert_eq!(
2182            memtable.global_stats.deleted_rewrite_entries(),
2183            expected_deleted_rewrite
2184        );
2185        memtable.consistency_check();
2186
2187        // Squeeze some.
2188        // [30, 35) file_num = 103(r)
2189        // [35, 50) file_num = 6
2190        // kk1 -> 100(r), kk2 -> 101(r), kk3 -> 5
2191        let ents_idx = generate_entry_indexes(10, 60, FileId::new(LogQueue::Rewrite, 103));
2192        memtable.rewrite(ents_idx, None);
2193        expected_rewrite += 50;
2194        expected_deleted_rewrite += 50;
2195        assert_eq!(memtable.first_index().unwrap(), 30);
2196        assert_eq!(memtable.rewrite_count, 5);
2197        assert_eq!(
2198            memtable.global_stats.live_entries(LogQueue::Append),
2199            expected_append
2200        );
2201        assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
2202        assert_eq!(
2203            memtable.global_stats.deleted_rewrite_entries(),
2204            expected_deleted_rewrite
2205        );
2206        memtable.consistency_check();
2207
2208        let global_stats = Arc::clone(&memtable.global_stats);
2209        drop(memtable);
2210        assert_eq!(global_stats.live_entries(LogQueue::Append), 0);
2211        assert_eq!(global_stats.live_entries(LogQueue::Rewrite), 0);
2212    }
2213
2214    #[test]
2215    fn test_memtable_merge_append() {
2216        type TestMemTable = MemTable<VacantAllocator>;
2217        fn empty_table(id: u64) -> TestMemTable {
2218            MemTable::new(id, Arc::new(GlobalStats::default()))
2219        }
2220        let cases = [
2221            |mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
2222                match on {
2223                    None => {
2224                        memtable.append(generate_entry_indexes(
2225                            0,
2226                            10,
2227                            FileId::new(LogQueue::Append, 1),
2228                        ));
2229                        memtable.append(generate_entry_indexes(
2230                            7,
2231                            15,
2232                            FileId::new(LogQueue::Append, 2),
2233                        ));
2234                        memtable.rewrite(
2235                            generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
2236                            Some(1),
2237                        );
2238                    }
2239                    Some(LogQueue::Append) => {
2240                        memtable.append(generate_entry_indexes(
2241                            0,
2242                            10,
2243                            FileId::new(LogQueue::Append, 1),
2244                        ));
2245                        memtable.append(generate_entry_indexes(
2246                            7,
2247                            15,
2248                            FileId::new(LogQueue::Append, 2),
2249                        ));
2250                        memtable.compact_to(7);
2251                    }
2252                    Some(LogQueue::Rewrite) => {
2253                        memtable.replay_rewrite(generate_entry_indexes(
2254                            0,
2255                            7,
2256                            FileId::new(LogQueue::Rewrite, 1),
2257                        ));
2258                        memtable.replay_rewrite(Vec::new());
2259                    }
2260                }
2261                memtable
2262            },
2263            |mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
2264                match on {
2265                    None => {
2266                        memtable.append(generate_entry_indexes(
2267                            0,
2268                            10,
2269                            FileId::new(LogQueue::Append, 1),
2270                        ));
2271                        memtable.append(generate_entry_indexes(
2272                            7,
2273                            15,
2274                            FileId::new(LogQueue::Append, 2),
2275                        ));
2276                        memtable.rewrite(
2277                            generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
2278                            Some(1),
2279                        );
2280                        memtable.compact_to(10);
2281                    }
2282                    Some(LogQueue::Append) => {
2283                        memtable.append(generate_entry_indexes(
2284                            0,
2285                            10,
2286                            FileId::new(LogQueue::Append, 1),
2287                        ));
2288                        memtable.append(generate_entry_indexes(
2289                            7,
2290                            15,
2291                            FileId::new(LogQueue::Append, 2),
2292                        ));
2293                        memtable.compact_to(10);
2294                    }
2295                    Some(LogQueue::Rewrite) => {
2296                        memtable.replay_rewrite(generate_entry_indexes(
2297                            0,
2298                            7,
2299                            FileId::new(LogQueue::Rewrite, 1),
2300                        ));
2301                        // By MemTableRecoveryContext.
2302                        memtable.compact_to(10);
2303                    }
2304                }
2305                memtable
2306            },
2307            |mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
2308                match on {
2309                    None => {
2310                        memtable.append(generate_entry_indexes(
2311                            0,
2312                            10,
2313                            FileId::new(LogQueue::Append, 1),
2314                        ));
2315                        memtable.rewrite(
2316                            generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
2317                            Some(1),
2318                        );
2319                        memtable.append(generate_entry_indexes(
2320                            10,
2321                            15,
2322                            FileId::new(LogQueue::Append, 2),
2323                        ));
2324                        memtable.append(generate_entry_indexes(
2325                            5,
2326                            10,
2327                            FileId::new(LogQueue::Append, 2),
2328                        ));
2329                    }
2330                    Some(LogQueue::Append) => {
2331                        let mut m1 = empty_table(memtable.region_id);
2332                        m1.append(generate_entry_indexes(
2333                            10,
2334                            15,
2335                            FileId::new(LogQueue::Append, 2),
2336                        ));
2337                        let mut m2 = empty_table(memtable.region_id);
2338                        m2.append(generate_entry_indexes(
2339                            5,
2340                            10,
2341                            FileId::new(LogQueue::Append, 2),
2342                        ));
2343                        m1.merge_newer_neighbor(&mut m2);
2344                        memtable.merge_newer_neighbor(&mut m1);
2345                    }
2346                    Some(LogQueue::Rewrite) => {
2347                        memtable.replay_rewrite(generate_entry_indexes(
2348                            0,
2349                            10,
2350                            FileId::new(LogQueue::Rewrite, 1),
2351                        ));
2352                    }
2353                }
2354                memtable
2355            },
2356        ];
2357
2358        // merge against empty table.
2359        for (i, case) in cases.iter().enumerate() {
2360            let region_id = i as u64;
2361            let mut append = empty_table(region_id);
2362            let mut rewrite = case(empty_table(region_id), Some(LogQueue::Rewrite));
2363            rewrite.merge_append_table(&mut append);
2364            assert_eq!(
2365                rewrite.entry_indexes,
2366                case(empty_table(region_id), Some(LogQueue::Rewrite)).entry_indexes,
2367            );
2368            assert!(append.entry_indexes.is_empty());
2369
2370            let mut append = case(empty_table(region_id), Some(LogQueue::Append));
2371            let mut rewrite = empty_table(region_id);
2372            rewrite.merge_append_table(&mut append);
2373            assert_eq!(
2374                rewrite.entry_indexes,
2375                case(empty_table(region_id), Some(LogQueue::Append)).entry_indexes
2376            );
2377            assert!(append.entry_indexes.is_empty());
2378        }
2379
2380        for (i, case) in cases.iter().enumerate() {
2381            let region_id = i as u64;
2382            let mut append = case(empty_table(region_id), Some(LogQueue::Append));
2383            let mut rewrite = case(empty_table(region_id), Some(LogQueue::Rewrite));
2384            rewrite.merge_append_table(&mut append);
2385            let expected = case(empty_table(region_id), None);
2386            assert_eq!(
2387                rewrite.global_stats.live_entries(LogQueue::Append),
2388                expected.global_stats.live_entries(LogQueue::Append)
2389            );
2390            assert_eq!(
2391                rewrite.global_stats.live_entries(LogQueue::Rewrite),
2392                expected.global_stats.live_entries(LogQueue::Rewrite)
2393            );
2394            assert_eq!(rewrite.entry_indexes, expected.entry_indexes);
2395            assert!(append.entry_indexes.is_empty());
2396        }
2397    }
2398
2399    #[test]
2400    fn test_memtables_merge_append_neighbor() {
2401        let first_rid = 17;
2402        let mut last_rid = first_rid;
2403
2404        let mut batches = vec![
2405            LogItemBatch::with_capacity(0),
2406            LogItemBatch::with_capacity(0),
2407            LogItemBatch::with_capacity(0),
2408        ];
2409        let files: Vec<_> = (0..batches.len())
2410            .map(|i| FileId::new(LogQueue::Append, 10 + i as u64))
2411            .collect();
2412
2413        // put (key1, v1) => del (key1) => put (key1, v2)
2414        batches[0].put(last_rid, b"key1".to_vec(), b"val1".to_vec());
2415        batches[1].delete(last_rid, b"key1".to_vec());
2416        batches[2].put(last_rid, b"key1".to_vec(), b"val2".to_vec());
2417
2418        // put (k, _) => cleanup
2419        last_rid += 1;
2420        batches[0].put(last_rid, b"key".to_vec(), b"ANYTHING".to_vec());
2421        batches[1].add_command(last_rid, Command::Clean);
2422
2423        // entries [1, 10] => compact 5 => entries [11, 20]
2424        last_rid += 1;
2425        batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
2426        batches[1].add_command(last_rid, Command::Compact { index: 5 });
2427        batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));
2428
2429        // entries [1, 10] => entries [11, 20][5, 10] => compact 8
2430        last_rid += 1;
2431        batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
2432        batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
2433        batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
2434        batches[2].add_command(last_rid, Command::Compact { index: 8 });
2435
2436        for b in batches.iter_mut() {
2437            b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
2438        }
2439
2440        // reverse merge
2441        let mut ctxs = VecDeque::default();
2442        for (batch, file_id) in batches.clone().into_iter().zip(files) {
2443            let mut ctx = MemTableRecoverContext::default();
2444            ctx.replay(batch, file_id).unwrap();
2445            ctxs.push_back(ctx);
2446        }
2447        while ctxs.len() > 1 {
2448            let (y, mut x) = (ctxs.pop_back().unwrap(), ctxs.pop_back().unwrap());
2449            x.merge(y, LogQueue::Append).unwrap();
2450            ctxs.push_back(x);
2451        }
2452        let (merged_memtables, merged_global_stats) = ctxs.pop_front().unwrap().finish();
2453
2454        // sequential apply
2455        let sequential_global_stats = Arc::new(GlobalStats::default());
2456        let sequential_memtables = MemTableAccessor::new(sequential_global_stats.clone());
2457        for mut batch in batches.clone() {
2458            sequential_memtables.apply_append_writes(batch.drain());
2459        }
2460
2461        for rid in first_rid..=last_rid {
2462            let m = merged_memtables.get(rid);
2463            let s = sequential_memtables.get(rid);
2464            if m.is_none() {
2465                assert!(s.is_none());
2466                continue;
2467            }
2468            let merged = m.as_ref().unwrap().read();
2469            let sequential = s.as_ref().unwrap().read();
2470            let mut merged_vec = Vec::new();
2471            let mut sequential_vec = Vec::new();
2472            merged
2473                .fetch_entry_indexes_before(u64::MAX, &mut merged_vec)
2474                .unwrap();
2475            sequential
2476                .fetch_entry_indexes_before(u64::MAX, &mut sequential_vec)
2477                .unwrap();
2478            assert_eq!(merged_vec, sequential_vec);
2479            merged_vec.clear();
2480            sequential_vec.clear();
2481            merged
2482                .fetch_rewritten_entry_indexes(&mut merged_vec)
2483                .unwrap();
2484            sequential
2485                .fetch_rewritten_entry_indexes(&mut sequential_vec)
2486                .unwrap();
2487            assert_eq!(merged_vec, sequential_vec);
2488            let mut merged_vec = Vec::new();
2489            let mut sequential_vec = Vec::new();
2490            merged.fetch_kvs_before(u64::MAX, &mut merged_vec);
2491            sequential.fetch_kvs_before(u64::MAX, &mut sequential_vec);
2492            assert_eq!(merged_vec, sequential_vec);
2493            merged_vec.clear();
2494            sequential_vec.clear();
2495            merged.fetch_rewritten_kvs(&mut merged_vec);
2496            sequential.fetch_rewritten_kvs(&mut sequential_vec);
2497            assert_eq!(merged_vec, sequential_vec);
2498        }
2499        assert_eq!(
2500            merged_global_stats.live_entries(LogQueue::Append),
2501            sequential_global_stats.live_entries(LogQueue::Append),
2502        );
2503        assert_eq!(
2504            merged_global_stats.rewrite_entries(),
2505            sequential_global_stats.rewrite_entries(),
2506        );
2507        assert_eq!(
2508            merged_global_stats.deleted_rewrite_entries(),
2509            sequential_global_stats.deleted_rewrite_entries(),
2510        );
2511    }
2512
2513    #[cfg(feature = "nightly")]
2514    #[bench]
2515    fn bench_memtable_single_put(b: &mut test::Bencher) {
2516        let mut memtable = MemTable::new(0, Arc::new(GlobalStats::default()));
2517        let key = b"some_key".to_vec();
2518        let value = vec![7; 12];
2519        b.iter(move || {
2520            memtable.put(key.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2521        });
2522    }
2523
2524    #[cfg(feature = "nightly")]
2525    #[bench]
2526    fn bench_memtable_triple_puts(b: &mut test::Bencher) {
2527        let mut memtable = MemTable::new(0, Arc::new(GlobalStats::default()));
2528        let key0 = b"some_key0".to_vec();
2529        let key1 = b"some_key1".to_vec();
2530        let key2 = b"some_key2".to_vec();
2531        let value = vec![7; 12];
2532        b.iter(move || {
2533            memtable.put(key0.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2534            memtable.put(key1.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2535            memtable.put(key2.clone(), value.clone(), FileId::dummy(LogQueue::Append));
2536        });
2537    }
2538}