use std::borrow::BorrowMut;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::Bound;
use std::sync::Arc;
use fail::fail_point;
use hashbrown::HashMap;
use parking_lot::{Mutex, RwLock};
use crate::config::Config;
use crate::file_pipe_log::ReplayMachine;
use crate::log_batch::{
Command, CompressionType, KeyValue, LogBatch, LogItemBatch, LogItemContent, LogItemDrain,
OpType,
};
use crate::metrics::MEMORY_USAGE;
use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue};
use crate::util::{hash_u64, Factory};
use crate::{Error, GlobalStats, Result};
#[cfg(feature = "swap")]
mod swap_conditional_imports {
use crate::swappy_allocator::SwappyAllocator;
use std::convert::TryFrom;
use std::path::Path;
pub trait AllocatorTrait: std::alloc::Allocator + Clone + Send + Sync {}
impl<T: std::alloc::Allocator + Clone + Send + Sync> AllocatorTrait for T {}
pub type VacantAllocator = std::alloc::Global;
pub type SelectedAllocator = SwappyAllocator<std::alloc::Global>;
pub fn new_vacant_allocator() -> VacantAllocator {
std::alloc::Global
}
pub fn new_allocator(cfg: &crate::Config) -> SelectedAllocator {
let memory_limit =
usize::try_from(cfg.memory_limit.map_or(u64::MAX, |l| l.0)).unwrap_or(usize::MAX);
let path = Path::new(&cfg.dir).join("swap");
SwappyAllocator::new(&path, memory_limit)
}
}
#[cfg(not(feature = "swap"))]
mod swap_conditional_imports {
pub trait AllocatorTrait: Clone + Send + Sync {}
#[derive(Clone)]
pub struct DummyAllocator;
impl AllocatorTrait for DummyAllocator {}
pub type VacantAllocator = DummyAllocator;
pub type SelectedAllocator = DummyAllocator;
pub fn new_vacant_allocator() -> VacantAllocator {
DummyAllocator
}
pub fn new_allocator(_: &crate::Config) -> SelectedAllocator {
DummyAllocator
}
}
use swap_conditional_imports::*;
const CAPACITY_SHRINK_THRESHOLD: usize = 1024 - 1;
const CAPACITY_INIT: usize = 32 - 1;
const MEMTABLE_SLOT_COUNT: usize = 128;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct EntryIndex {
pub index: u64,
pub entries: Option<FileBlockHandle>,
pub compression_type: CompressionType,
pub entry_offset: u32,
pub entry_len: u32,
}
impl Default for EntryIndex {
fn default() -> EntryIndex {
EntryIndex {
index: 0,
entries: None,
compression_type: CompressionType::None,
entry_offset: 0,
entry_len: 0,
}
}
}
impl EntryIndex {
fn from_thin(index: u64, e: ThinEntryIndex) -> Self {
Self {
index,
entries: e.entries,
compression_type: e.compression_type,
entry_offset: e.entry_offset,
entry_len: e.entry_len,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct ThinEntryIndex {
entries: Option<FileBlockHandle>,
compression_type: CompressionType,
entry_offset: u32,
entry_len: u32,
}
impl From<&EntryIndex> for ThinEntryIndex {
fn from(e: &EntryIndex) -> Self {
Self {
entries: e.entries,
compression_type: e.compression_type,
entry_offset: e.entry_offset,
entry_len: e.entry_len,
}
}
}
pub struct MemTable<A: AllocatorTrait> {
region_id: u64,
#[cfg(feature = "swap")]
entry_indexes: VecDeque<ThinEntryIndex, A>,
#[cfg(not(feature = "swap"))]
entry_indexes: VecDeque<ThinEntryIndex>,
first_index: u64,
rewrite_count: usize,
kvs: BTreeMap<Vec<u8>, (Vec<u8>, FileId)>,
global_stats: Arc<GlobalStats>,
_phantom: PhantomData<A>,
}
impl MemTable<VacantAllocator> {
#[allow(dead_code)]
fn new(region_id: u64, global_stats: Arc<GlobalStats>) -> MemTable<VacantAllocator> {
Self::with_allocator(region_id, global_stats, &new_vacant_allocator())
}
}
impl<A: AllocatorTrait> MemTable<A> {
fn with_allocator(
region_id: u64,
global_stats: Arc<GlobalStats>,
_allocator: &A,
) -> MemTable<A> {
MemTable {
region_id,
#[cfg(feature = "swap")]
entry_indexes: VecDeque::with_capacity_in(CAPACITY_INIT, _allocator.clone()),
#[cfg(not(feature = "swap"))]
entry_indexes: VecDeque::with_capacity(CAPACITY_INIT),
first_index: 0,
rewrite_count: 0,
kvs: BTreeMap::default(),
global_stats,
_phantom: PhantomData,
}
}
pub fn merge_newer_neighbor(&mut self, rhs: &mut Self) {
debug_assert_eq!(self.region_id, rhs.region_id);
if let Some((rhs_first, _)) = rhs.span() {
self.prepare_append(
rhs_first,
rhs.rewrite_count > 0,
true,
);
self.global_stats.add(
rhs.entry_indexes[0].entries.unwrap().id.queue,
rhs.entry_indexes.len(),
);
self.rewrite_count += rhs.rewrite_count;
self.entry_indexes.append(&mut rhs.entry_indexes);
rhs.rewrite_count = 0;
}
for (key, (value, file_id)) in rhs.kvs.iter() {
self.put(key.clone(), value.clone(), *file_id);
}
let deleted = rhs.global_stats.deleted_rewrite_entries();
self.global_stats.add(LogQueue::Rewrite, deleted);
self.global_stats.delete(LogQueue::Rewrite, deleted);
}
pub fn merge_append_table(&mut self, rhs: &mut Self) {
debug_assert_eq!(self.region_id, rhs.region_id);
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
debug_assert_eq!(rhs.rewrite_count, 0);
if let Some((first, _)) = rhs.span() {
self.prepare_append(
first,
true,
true,
);
self.global_stats.add(
rhs.entry_indexes[0].entries.unwrap().id.queue,
rhs.entry_indexes.len(),
);
self.entry_indexes.append(&mut rhs.entry_indexes);
}
for (key, (value, file_id)) in rhs.kvs.iter() {
self.put(key.clone(), value.clone(), *file_id);
}
let deleted = rhs.global_stats.deleted_rewrite_entries();
self.global_stats.add(LogQueue::Rewrite, deleted);
self.global_stats.delete(LogQueue::Rewrite, deleted);
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
self.kvs.get(key).map(|v| v.0.clone())
}
pub fn scan<F>(
&self,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
reverse: bool,
mut f: F,
) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> bool,
{
let lower = start_key.map(Bound::Included).unwrap_or(Bound::Unbounded);
let upper = end_key.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
let iter = self.kvs.range::<[u8], _>((lower, upper));
if reverse {
for (key, (value, _)) in iter.rev() {
if !f(key, value) {
break;
}
}
} else {
for (key, (value, _)) in iter {
if !f(key, value) {
break;
}
}
}
Ok(())
}
pub fn delete(&mut self, key: &[u8]) {
if let Some(value) = self.kvs.remove(key) {
self.global_stats.delete(value.1.queue, 1);
}
}
pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>, file_id: FileId) {
if let Some(origin) = self.kvs.insert(key, (value, file_id)) {
self.global_stats.delete(origin.1.queue, 1);
}
self.global_stats.add(file_id.queue, 1);
}
pub fn rewrite_key(&mut self, key: Vec<u8>, gate: Option<FileSeq>, seq: FileSeq) {
self.global_stats.add(LogQueue::Rewrite, 1);
if let Some(origin) = self.kvs.get_mut(&key) {
if origin.1.queue == LogQueue::Append {
if let Some(gate) = gate {
if origin.1.seq <= gate {
origin.1 = FileId {
queue: LogQueue::Rewrite,
seq,
};
self.global_stats.delete(LogQueue::Append, 1);
return;
}
}
} else {
assert!(origin.1.seq <= seq);
origin.1.seq = seq;
}
}
self.global_stats.delete(LogQueue::Rewrite, 1);
}
pub fn get_entry(&self, index: u64) -> Option<EntryIndex> {
if let Some((first, last)) = self.span() {
if index < first || index > last {
return None;
}
let ioffset = (index - first) as usize;
let entry_index = self.entry_indexes[ioffset];
Some(EntryIndex::from_thin(index, entry_index))
} else {
None
}
}
pub fn append(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
self.prepare_append(
entry_indexes[0].index,
false,
false,
);
self.global_stats.add(LogQueue::Append, len);
for ei in &entry_indexes {
self.entry_indexes.push_back(ei.into());
}
}
}
pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, 0);
self.prepare_append(
entry_indexes[0].index,
false,
true,
);
self.global_stats.add(LogQueue::Append, len);
for ei in &entry_indexes {
debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
self.entry_indexes.push_back(ei.into());
}
}
}
pub fn rewrite(&mut self, rewrite_indexes: Vec<EntryIndex>, gate: Option<FileSeq>) {
if rewrite_indexes.is_empty() {
return;
}
self.global_stats
.add(LogQueue::Rewrite, rewrite_indexes.len());
let len = self.entry_indexes.len();
if len == 0 {
self.global_stats
.delete(LogQueue::Rewrite, rewrite_indexes.len());
return;
}
let first = self.first_index;
let last = self.first_index + len as u64 - 1;
let rewrite_first = std::cmp::max(rewrite_indexes[0].index, first);
let rewrite_last = std::cmp::min(rewrite_indexes[rewrite_indexes.len() - 1].index, last);
let mut rewrite_len = (rewrite_last + 1).saturating_sub(rewrite_first) as usize;
if rewrite_len == 0 {
self.global_stats
.delete(LogQueue::Rewrite, rewrite_indexes.len());
return;
}
let pos = (rewrite_first - first) as usize;
assert!(
pos == 0 || self.entry_indexes[pos - 1].entries.unwrap().id.queue == LogQueue::Rewrite
);
let rewrite_pos = (rewrite_first - rewrite_indexes[0].index) as usize;
for (i, rindex) in rewrite_indexes[rewrite_pos..rewrite_pos + rewrite_len]
.iter()
.enumerate()
{
let index = &mut self.entry_indexes[i + pos];
if let Some(gate) = gate {
debug_assert_eq!(index.entries.unwrap().id.queue, LogQueue::Append);
if index.entries.unwrap().id.seq > gate {
rewrite_len = i;
break;
}
} else if index.entries.unwrap().id.queue == LogQueue::Append {
rewrite_len = i;
break;
}
*index = rindex.into();
}
if gate.is_none() {
self.global_stats
.delete(LogQueue::Rewrite, rewrite_indexes.len());
} else {
self.global_stats.delete(LogQueue::Append, rewrite_len);
self.global_stats
.delete(LogQueue::Rewrite, rewrite_indexes.len() - rewrite_len);
}
self.rewrite_count = pos + rewrite_len;
}
pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
self.prepare_append(
entry_indexes[0].index,
true,
true,
);
self.global_stats.add(LogQueue::Rewrite, len);
for ei in &entry_indexes {
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}
pub fn compact_to(&mut self, index: u64) -> u64 {
if self.entry_indexes.is_empty() {
return 0;
}
let first = self.first_index;
if index <= first {
return 0;
}
let count = std::cmp::min((index - first) as usize, self.entry_indexes.len());
self.first_index = index;
self.entry_indexes.drain(..count);
self.maybe_shrink_entry_indexes();
let compacted_rewrite = std::cmp::min(count, self.rewrite_count);
self.rewrite_count -= compacted_rewrite;
self.global_stats
.delete(LogQueue::Rewrite, compacted_rewrite);
self.global_stats
.delete(LogQueue::Append, count - compacted_rewrite);
count as u64
}
fn unsafe_truncate_back(&mut self, first: u64, index: u64, last: u64) -> usize {
debug_assert!(index <= last);
let len = self.entry_indexes.len();
debug_assert_eq!(len as u64, last - first + 1);
self.entry_indexes
.truncate(index.saturating_sub(first) as usize);
let new_len = self.entry_indexes.len();
let truncated = len - new_len;
if self.rewrite_count > new_len {
let truncated_rewrite = self.rewrite_count - new_len;
self.rewrite_count = new_len;
self.global_stats
.delete(LogQueue::Rewrite, truncated_rewrite);
self.global_stats
.delete(LogQueue::Append, truncated - truncated_rewrite);
} else {
self.global_stats.delete(LogQueue::Append, truncated);
}
truncated
}
#[inline]
fn prepare_append(
&mut self,
first_index_to_add: u64,
allow_hole: bool,
allow_overwrite_compacted: bool,
) {
if let Some((first, last)) = self.span() {
if first_index_to_add < first {
if allow_overwrite_compacted {
self.unsafe_truncate_back(first, 0, last);
} else {
panic!(
"attempt to overwrite compacted entries in {}",
self.region_id
);
}
self.first_index = first_index_to_add;
} else if last + 1 < first_index_to_add {
if allow_hole {
self.unsafe_truncate_back(first, 0, last);
} else {
panic!("memtable {} has a hole", self.region_id);
}
self.first_index = first_index_to_add;
} else if first_index_to_add != last + 1 {
self.unsafe_truncate_back(first, first_index_to_add, last);
}
} else {
self.first_index = first_index_to_add;
}
}
#[inline]
fn maybe_shrink_entry_indexes(&mut self) {
if self.entry_indexes.capacity() >= CAPACITY_SHRINK_THRESHOLD {
self.entry_indexes.shrink_to_fit();
}
}
pub fn fetch_entries_to(
&self,
begin: u64,
end: u64,
max_size: Option<usize>,
vec_idx: &mut Vec<EntryIndex>,
) -> Result<()> {
if end <= begin {
return Ok(());
}
let len = self.entry_indexes.len();
if len == 0 {
return Err(Error::EntryNotFound);
}
let first = self.first_index;
if begin < first {
return Err(Error::EntryCompacted);
}
if end > self.first_index + len as u64 {
return Err(Error::EntryNotFound);
}
let start_pos = (begin - first) as usize;
let end_pos = (end - begin) as usize + start_pos;
let mut total_size = 0;
let mut index = begin;
for idx in self.entry_indexes.range(start_pos..end_pos) {
total_size += idx.entry_len;
if let Some(max_size) = max_size {
if total_size as usize > max_size && total_size > idx.entry_len {
break;
}
}
vec_idx.push(EntryIndex::from_thin(index, *idx));
index += 1;
}
Ok(())
}
pub fn fetch_entry_indexes_before(
&self,
gate: FileSeq,
vec_idx: &mut Vec<EntryIndex>,
) -> Result<()> {
if let Some((first, last)) = self.span() {
let mut i = self.rewrite_count;
while first + i as u64 <= last && self.entry_indexes[i].entries.unwrap().id.seq <= gate
{
vec_idx.push(EntryIndex::from_thin(
first + i as u64,
self.entry_indexes[i],
));
i += 1;
}
}
Ok(())
}
pub fn fetch_rewritten_entry_indexes(&self, vec_idx: &mut Vec<EntryIndex>) -> Result<()> {
if self.rewrite_count > 0 {
let first = self.first_index;
let end = self.first_index + self.rewrite_count as u64;
self.fetch_entries_to(first, end, None, vec_idx)
} else {
Ok(())
}
}
pub fn fetch_kvs_before(&self, gate: FileSeq, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
for (key, (value, file_id)) in &self.kvs {
if file_id.queue == LogQueue::Append && file_id.seq <= gate {
vec.push((key.clone(), value.clone()));
}
}
}
pub fn fetch_rewritten_kvs(&self, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
for (key, (value, file_id)) in &self.kvs {
if file_id.queue == LogQueue::Rewrite {
vec.push((key.clone(), value.clone()));
}
}
}
pub fn min_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
let entry = match queue {
LogQueue::Append => self.entry_indexes.get(self.rewrite_count),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
LogQueue::Rewrite => self.entry_indexes.front(),
};
let ents_min = entry.map(|e| e.entries.unwrap().id.seq);
let kvs_min = self
.kvs
.values()
.filter(|v| v.1.queue == queue)
.fold(None, |min, v| {
if let Some(min) = min {
Some(std::cmp::min(min, v.1.seq))
} else {
Some(v.1.seq)
}
});
match (ents_min, kvs_min) {
(Some(ents_min), Some(kvs_min)) => Some(std::cmp::min(kvs_min, ents_min)),
(Some(ents_min), None) => Some(ents_min),
(None, Some(kvs_min)) => Some(kvs_min),
(None, None) => None,
}
}
#[inline]
pub fn has_at_least_some_entries_before(&self, gate: FileId, count: usize) -> bool {
debug_assert!(count > 0);
self.entry_indexes
.get(count - 1)
.map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
}
pub fn region_id(&self) -> u64 {
self.region_id
}
pub(crate) fn rewrite_count(&self) -> usize {
self.rewrite_count
}
pub fn first_index(&self) -> Option<u64> {
self.span().map(|s| s.0)
}
pub fn last_index(&self) -> Option<u64> {
self.span().map(|s| s.1)
}
#[allow(dead_code)]
fn heap_size(&self) -> usize {
self.entry_indexes.capacity() * std::mem::size_of::<EntryIndex>()
}
#[inline]
fn span(&self) -> Option<(u64, u64)> {
let len = self.entry_indexes.len();
if len > 0 {
Some((self.first_index, self.first_index + len as u64 - 1))
} else {
None
}
}
#[cfg(test)]
fn consistency_check(&self) {
let mut seen_append = false;
for idx in self.entry_indexes.iter() {
let queue = idx.entries.unwrap().id.queue;
if queue == LogQueue::Append {
seen_append = true;
}
assert_eq!(
queue,
if seen_append {
LogQueue::Append
} else {
LogQueue::Rewrite
}
);
}
}
}
impl<A: AllocatorTrait> Drop for MemTable<A> {
fn drop(&mut self) {
let mut append_kvs = 0;
let mut rewrite_kvs = 0;
for (_v, id) in self.kvs.values() {
match id.queue {
LogQueue::Rewrite => rewrite_kvs += 1,
LogQueue::Append => append_kvs += 1,
}
}
self.global_stats
.delete(LogQueue::Rewrite, self.rewrite_count + rewrite_kvs);
self.global_stats.delete(
LogQueue::Append,
self.entry_indexes.len() - self.rewrite_count + append_kvs,
);
}
}
type MemTableMap<A> = HashMap<u64, Arc<RwLock<MemTable<A>>>>;
pub type MemTableHandle = Arc<RwLock<MemTable<SelectedAllocator>>>;
pub type MemTables = MemTableAccessor<SelectedAllocator>;
#[derive(Clone)]
pub struct MemTableAccessor<A: AllocatorTrait> {
global_stats: Arc<GlobalStats>,
allocator: A,
slots: Vec<Arc<RwLock<MemTableMap<A>>>>,
removed_memtables: Arc<Mutex<VecDeque<u64>>>,
}
impl MemTableAccessor<VacantAllocator> {
pub fn new(global_stats: Arc<GlobalStats>) -> MemTableAccessor<VacantAllocator> {
let mut slots = Vec::with_capacity(MEMTABLE_SLOT_COUNT);
for _ in 0..MEMTABLE_SLOT_COUNT {
slots.push(Arc::new(RwLock::new(MemTableMap::default())));
}
MemTableAccessor {
global_stats,
allocator: new_vacant_allocator(),
slots,
removed_memtables: Default::default(),
}
}
}
impl MemTableAccessor<SelectedAllocator> {
pub fn memory_usage(&self) -> usize {
#[cfg(not(feature = "swap"))]
{
let mut total = 0;
for tables in &self.slots {
tables.read().values().for_each(|t| {
total += t.read().heap_size();
});
}
total
}
#[cfg(feature = "swap")]
{
self.allocator.memory_usage()
}
}
pub(crate) fn flush_metrics(&self) {
MEMORY_USAGE.set(self.memory_usage() as i64);
}
}
impl<A: AllocatorTrait> MemTableAccessor<A> {
pub fn new_with_allocator(global_stats: Arc<GlobalStats>, allocator: A) -> MemTableAccessor<A> {
let mut slots = Vec::with_capacity(MEMTABLE_SLOT_COUNT);
for _ in 0..MEMTABLE_SLOT_COUNT {
slots.push(Arc::new(RwLock::new(MemTableMap::default())));
}
MemTableAccessor {
global_stats,
allocator,
slots,
removed_memtables: Default::default(),
}
}
pub fn get_or_insert(&self, raft_group_id: u64) -> Arc<RwLock<MemTable<A>>> {
let global_stats = self.global_stats.clone();
let mut memtables = self.slots[Self::slot_index(raft_group_id)].write();
let memtable = memtables.entry(raft_group_id).or_insert_with(|| {
let memtable =
MemTable::with_allocator(raft_group_id, global_stats.clone(), &self.allocator);
Arc::new(RwLock::new(memtable))
});
memtable.clone()
}
pub fn get(&self, raft_group_id: u64) -> Option<Arc<RwLock<MemTable<A>>>> {
self.slots[Self::slot_index(raft_group_id)]
.read()
.get(&raft_group_id)
.cloned()
}
pub fn insert(&self, raft_group_id: u64, memtable: Arc<RwLock<MemTable<A>>>) {
self.slots[Self::slot_index(raft_group_id)]
.write()
.insert(raft_group_id, memtable);
}
pub fn remove(&self, raft_group_id: u64, record_tombstone: bool) {
self.slots[Self::slot_index(raft_group_id)]
.write()
.remove(&raft_group_id);
if record_tombstone {
let mut removed_memtables = self.removed_memtables.lock();
removed_memtables.push_back(raft_group_id);
}
}
pub fn fold<B, F: Fn(B, &MemTable<A>) -> B>(&self, mut init: B, fold: F) -> B {
for tables in &self.slots {
for memtable in tables.read().values() {
init = fold(init, &*memtable.read());
}
}
init
}
pub fn collect<F: FnMut(&MemTable<A>) -> bool>(
&self,
mut condition: F,
) -> Vec<Arc<RwLock<MemTable<A>>>> {
let mut memtables = Vec::new();
for tables in &self.slots {
memtables.extend(tables.read().values().filter_map(|t| {
if condition(&*t.read()) {
return Some(t.clone());
}
None
}));
}
memtables
}
pub fn take_cleaned_region_logs(&self) -> LogBatch {
let mut log_batch = LogBatch::default();
let mut removed_memtables = self.removed_memtables.lock();
for id in removed_memtables.drain(..) {
log_batch.add_command(id, Command::Clean);
}
log_batch
}
#[allow(dead_code)]
pub fn cleaned_region_ids(&self) -> HashSet<u64> {
let mut ids = HashSet::default();
let removed_memtables = self.removed_memtables.lock();
for raft_id in removed_memtables.iter() {
ids.insert(*raft_id);
}
ids
}
pub fn is_empty(&self) -> bool {
for i in 0..MEMTABLE_SLOT_COUNT {
if !self.slots[i].read().is_empty() {
return false;
}
}
true
}
pub fn merge_newer_neighbor(&self, mut rhs: Self) {
for slot in rhs.slots.iter_mut() {
for (raft_group_id, memtable) in slot.write().drain() {
self.get_or_insert(raft_group_id)
.write()
.merge_newer_neighbor(memtable.write().borrow_mut());
}
}
}
pub fn merge_append_table(&self, mut rhs: Self) {
for slot in rhs.slots.iter_mut() {
for (id, memtable) in std::mem::take(&mut *slot.write()) {
if let Some(existing_memtable) = self.get(id) {
existing_memtable
.write()
.merge_append_table(&mut *memtable.write());
} else {
self.insert(id, memtable);
}
}
}
debug_assert_eq!(
self.removed_memtables.lock().len(),
rhs.removed_memtables.lock().len()
);
}
pub fn apply_append_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
fail_point!(
"memtable_accessor::apply_append_writes::region_3",
raft == 3,
|_| {}
);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().append(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, true );
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}
pub fn replay_append_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().replay_append(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, true );
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}
pub fn apply_rewrite_writes(
&self,
log_items: LogItemDrain,
watermark: Option<FileSeq>,
new_file: FileSeq,
) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().rewrite(entries_to_add.0, watermark);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let key = kv.key;
memtable.write().rewrite_key(key, watermark, new_file);
}
_ => unreachable!(),
},
LogItemContent::Command(Command::Clean) => {}
_ => unreachable!(),
}
}
}
pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().replay_rewrite(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, false );
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}
#[inline]
fn slot_index(id: u64) -> usize {
debug_assert!(MEMTABLE_SLOT_COUNT.is_power_of_two());
hash_u64(id) as usize & (MEMTABLE_SLOT_COUNT - 1)
}
}
pub struct MemTableRecoverContext<A: AllocatorTrait> {
stats: Arc<GlobalStats>,
log_batch: LogItemBatch,
memtables: MemTableAccessor<A>,
}
impl MemTableRecoverContext<VacantAllocator> {
fn new() -> Self {
let stats = Arc::new(GlobalStats::default());
Self {
stats: stats.clone(),
log_batch: LogItemBatch::default(),
memtables: MemTableAccessor::new(stats),
}
}
}
impl<A: AllocatorTrait> MemTableRecoverContext<A> {
fn new_with_allocator(allocator: A) -> Self {
let stats = Arc::new(GlobalStats::default());
Self {
stats: stats.clone(),
log_batch: LogItemBatch::default(),
memtables: MemTableAccessor::new_with_allocator(stats, allocator),
}
}
pub fn finish(self) -> (MemTableAccessor<A>, Arc<GlobalStats>) {
(self.memtables, self.stats)
}
pub fn merge_append_context(&self, append: MemTableRecoverContext<A>) {
self.memtables
.apply_append_writes(append.log_batch.clone().drain());
self.memtables.merge_append_table(append.memtables);
}
}
impl Default for MemTableRecoverContext<VacantAllocator> {
fn default() -> Self {
Self::new()
}
}
impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
fn replay(&mut self, mut item_batch: LogItemBatch, file_id: FileId) -> Result<()> {
for item in item_batch.iter() {
match &item.content {
LogItemContent::Command(Command::Clean)
| LogItemContent::Command(Command::Compact { .. }) => {
self.log_batch.push((*item).clone());
}
LogItemContent::Kv(KeyValue { op_type, .. }) if *op_type == OpType::Del => {
self.log_batch.push((*item).clone());
}
_ => {}
}
}
match file_id.queue {
LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()),
}
Ok(())
}
fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
self.log_batch.merge(&mut rhs.log_batch.clone());
match queue {
LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()),
}
self.memtables.merge_newer_neighbor(rhs.memtables);
Ok(())
}
}
pub struct MemTableRecoverContextFactory {
allocator: SelectedAllocator,
}
impl MemTableRecoverContextFactory {
pub fn new(cfg: &Config) -> Self {
Self {
allocator: new_allocator(cfg),
}
}
}
impl Factory<MemTableRecoverContext<SelectedAllocator>> for MemTableRecoverContextFactory {
fn new_target(&self) -> MemTableRecoverContext<SelectedAllocator> {
MemTableRecoverContext::new_with_allocator(self.allocator.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::{catch_unwind_silent, generate_entry_indexes};
impl<A: AllocatorTrait> MemTable<A> {
fn max_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
let entry = match queue {
LogQueue::Append if self.rewrite_count == self.entry_indexes.len() => None,
LogQueue::Append => self.entry_indexes.back(),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
LogQueue::Rewrite => self.entry_indexes.get(self.rewrite_count - 1),
};
let ents_max = entry.map(|e| e.entries.unwrap().id.seq);
let kvs_max = self.kvs_max_file_seq(queue);
match (ents_max, kvs_max) {
(Some(ents_max), Some(kvs_max)) => Some(FileSeq::max(kvs_max, ents_max)),
(Some(ents_max), None) => Some(ents_max),
(None, Some(kvs_max)) => Some(kvs_max),
(None, None) => None,
}
}
pub fn kvs_max_file_seq(&self, queue: LogQueue) -> Option<FileSeq> {
self.kvs
.values()
.filter(|v| v.1.queue == queue)
.fold(None, |max, v| {
if let Some(max) = max {
Some(std::cmp::max(max, v.1.seq))
} else {
Some(v.1.seq)
}
})
}
pub fn fetch_all(&self, vec_idx: &mut Vec<EntryIndex>) {
if let Some((first, last)) = self.span() {
self.fetch_entries_to(first, last + 1, None, vec_idx)
.unwrap();
}
}
fn entries_size(&self) -> usize {
self.entry_indexes
.iter()
.fold(0, |acc, e| acc + e.entry_len) as usize
}
}
#[test]
fn test_memtable_append() {
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
memtable.append(generate_entry_indexes(
10,
20,
FileId::new(LogQueue::Append, 1),
));
assert_eq!(memtable.entries_size(), 10);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1);
memtable.consistency_check();
memtable.append(Vec::new());
assert!(
catch_unwind_silent(|| memtable.append(generate_entry_indexes(
21,
22,
FileId::dummy(LogQueue::Append)
)))
.is_err()
);
memtable.consistency_check();
memtable.append(generate_entry_indexes(
20,
30,
FileId::new(LogQueue::Append, 2),
));
assert_eq!(memtable.entries_size(), 20);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 2);
memtable.consistency_check();
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
memtable.entries_size()
);
memtable.append(generate_entry_indexes(
25,
35,
FileId::new(LogQueue::Append, 3),
));
assert_eq!(memtable.entries_size(), 25);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
memtable.consistency_check();
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
memtable.entries_size()
);
memtable.append(generate_entry_indexes(
10,
40,
FileId::new(LogQueue::Append, 4),
));
assert_eq!(memtable.entries_size(), 30);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
memtable.consistency_check();
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
memtable.entries_size()
);
let global_stats = Arc::clone(&memtable.global_stats);
drop(memtable);
assert_eq!(global_stats.live_entries(LogQueue::Append), 0);
}
#[test]
fn test_memtable_compact() {
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
10,
15,
FileId::new(LogQueue::Append, 2),
));
memtable.append(generate_entry_indexes(
15,
20,
FileId::new(LogQueue::Append, 2),
));
memtable.append(generate_entry_indexes(
20,
25,
FileId::new(LogQueue::Append, 3),
));
assert_eq!(memtable.entries_size(), 25);
assert_eq!(memtable.first_index().unwrap(), 0);
assert_eq!(memtable.last_index().unwrap(), 24);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
memtable.entries_size()
);
memtable.consistency_check();
assert_eq!(memtable.compact_to(5), 5);
assert_eq!(memtable.entries_size(), 20);
assert_eq!(memtable.first_index().unwrap(), 5);
assert_eq!(memtable.last_index().unwrap(), 24);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
memtable.entries_size()
);
assert!(
catch_unwind_silent(|| memtable.append(generate_entry_indexes(
4,
5,
FileId::dummy(LogQueue::Append)
)))
.is_err()
);
memtable.consistency_check();
assert_eq!(memtable.compact_to(20), 15);
assert_eq!(memtable.entries_size(), 5);
assert_eq!(memtable.first_index().unwrap(), 20);
assert_eq!(memtable.last_index().unwrap(), 24);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 3);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 3);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
memtable.entries_size()
);
memtable.consistency_check();
assert_eq!(memtable.compact_to(20), 0);
assert_eq!(memtable.compact_to(15), 0);
assert_eq!(memtable.entries_size(), 5);
assert_eq!(memtable.first_index().unwrap(), 20);
assert_eq!(memtable.last_index().unwrap(), 24);
memtable.consistency_check();
}
#[test]
fn test_memtable_fetch() {
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
let mut ents_idx = vec![];
memtable.fetch_all(&mut ents_idx);
assert!(ents_idx.is_empty());
memtable
.fetch_entries_to(0, 0, None, &mut ents_idx)
.unwrap();
assert!(matches!(
memtable
.fetch_entries_to(0, 1, None, &mut ents_idx)
.unwrap_err(),
Error::EntryNotFound
));
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
10,
20,
FileId::new(LogQueue::Append, 2),
));
memtable.append(generate_entry_indexes(
20,
25,
FileId::new(LogQueue::Append, 3),
));
memtable.fetch_all(&mut ents_idx);
assert_eq!(ents_idx.len(), 25);
assert_eq!(ents_idx[0].index, 0);
assert_eq!(ents_idx[24].index, 24);
assert_eq!(memtable.compact_to(10), 10);
ents_idx.clear();
assert!(matches!(
memtable
.fetch_entries_to(5, 15, None, &mut ents_idx)
.unwrap_err(),
Error::EntryCompacted
));
ents_idx.clear();
assert!(matches!(
memtable
.fetch_entries_to(20, 30, None, &mut ents_idx)
.unwrap_err(),
Error::EntryNotFound
));
ents_idx.clear();
memtable
.fetch_entries_to(20, 25, None, &mut ents_idx)
.unwrap();
assert_eq!(ents_idx.len(), 5);
assert_eq!(ents_idx[0].index, 20);
assert_eq!(ents_idx[4].index, 24);
ents_idx.clear();
memtable
.fetch_entries_to(10, 15, None, &mut ents_idx)
.unwrap();
assert_eq!(ents_idx.len(), 5);
assert_eq!(ents_idx[0].index, 10);
assert_eq!(ents_idx[4].index, 14);
ents_idx.clear();
memtable
.fetch_entries_to(10, 25, None, &mut ents_idx)
.unwrap();
assert_eq!(ents_idx.len(), 15);
assert_eq!(ents_idx[0].index, 10);
assert_eq!(ents_idx[14].index, 24);
ents_idx.clear();
let max_size = Some(10);
memtable
.fetch_entries_to(10, 25, max_size, &mut ents_idx)
.unwrap();
assert_eq!(ents_idx.len(), 10);
assert_eq!(ents_idx[0].index, 10);
assert_eq!(ents_idx[9].index, 19);
ents_idx.clear();
memtable
.fetch_entries_to(20, 25, Some(0), &mut ents_idx)
.unwrap();
assert_eq!(ents_idx.len(), 1);
assert_eq!(ents_idx[0].index, 20);
}
#[test]
fn test_memtable_fetch_rewrite() {
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
let (k1, v1) = (b"key1", b"value1");
let (k2, v2) = (b"key2", b"value2");
let (k3, v3) = (b"key3", b"value3");
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.put(k1.to_vec(), v1.to_vec(), FileId::new(LogQueue::Append, 1));
memtable.append(generate_entry_indexes(
10,
20,
FileId::new(LogQueue::Append, 2),
));
memtable.put(k2.to_vec(), v2.to_vec(), FileId::new(LogQueue::Append, 2));
memtable.append(generate_entry_indexes(
20,
25,
FileId::new(LogQueue::Append, 3),
));
memtable.put(k3.to_vec(), v3.to_vec(), FileId::new(LogQueue::Append, 3));
memtable.consistency_check();
memtable.rewrite_key(k1.to_vec(), Some(1), 50);
let mut kvs = Vec::new();
memtable.fetch_kvs_before(1, &mut kvs);
assert!(kvs.is_empty());
memtable.fetch_rewritten_kvs(&mut kvs);
assert_eq!(kvs.len(), 1);
assert_eq!(kvs.pop().unwrap(), (k1.to_vec(), v1.to_vec()));
memtable.delete(k1.as_ref());
assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1);
memtable.rewrite_key(k1.to_vec(), Some(1), 50);
assert_eq!(memtable.get(k1.as_ref()), None);
memtable.fetch_rewritten_kvs(&mut kvs);
assert!(kvs.is_empty());
assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 2);
memtable.rewrite_key(k2.to_vec(), Some(1), 50);
memtable.fetch_rewritten_kvs(&mut kvs);
assert!(kvs.is_empty());
memtable.rewrite_key(k3.to_vec(), None, 50); memtable.fetch_rewritten_kvs(&mut kvs);
assert!(kvs.is_empty());
assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 4);
memtable.rewrite_key(k3.to_vec(), Some(10), 50);
memtable.rewrite_key(k3.to_vec(), None, 51);
memtable.rewrite_key(k3.to_vec(), Some(11), 52);
memtable.fetch_rewritten_kvs(&mut kvs);
assert_eq!(kvs.len(), 1);
assert_eq!(kvs.pop().unwrap(), (k3.to_vec(), v3.to_vec()));
let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1));
memtable.rewrite(ents_idx, Some(1));
assert_eq!(memtable.entries_size(), 25);
memtable.consistency_check();
let mut ents_idx = vec![];
assert!(memtable
.fetch_entry_indexes_before(2, &mut ents_idx)
.is_ok());
assert_eq!(ents_idx.len(), 10);
assert_eq!(ents_idx.last().unwrap().index, 19);
ents_idx.clear();
assert!(memtable
.fetch_entry_indexes_before(1, &mut ents_idx)
.is_ok());
assert!(ents_idx.is_empty());
ents_idx.clear();
assert!(memtable
.fetch_rewritten_entry_indexes(&mut ents_idx)
.is_ok());
assert_eq!(ents_idx.len(), 10);
assert_eq!(ents_idx.first().unwrap().index, 0);
assert_eq!(ents_idx.last().unwrap().index, 9);
}
#[test]
fn test_memtable_kv_operations() {
fn key(i: u64) -> Vec<u8> {
format!("k{}", i).as_bytes().to_vec()
}
fn value(i: u64) -> Vec<u8> {
format!("v{}", i).as_bytes().to_vec()
}
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
memtable.put(key(1), value(1), FileId::new(LogQueue::Append, 1));
memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 5));
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5);
assert_eq!(memtable.get(&key(1)), Some(value(1)));
assert_eq!(memtable.get(&key(5)), Some(value(5)));
let mut res = Vec::new();
memtable
.scan(None, None, false, |k, v| {
res.push((k.to_vec(), v.to_vec()));
false
})
.unwrap();
assert_eq!(res, vec![(key(1), value(1))]);
res.clear();
memtable
.scan(None, None, true, |k, v| {
res.push((k.to_vec(), v.to_vec()));
false
})
.unwrap();
assert_eq!(res, vec![(key(5), value(5))]);
res.clear();
memtable
.scan(Some(&key(5)), None, false, |key, value| {
res.push((key.to_vec(), value.to_vec()));
true
})
.unwrap();
assert_eq!(res, vec![(key(5), value(5))]);
res.clear();
memtable
.scan(Some(&key(1)), Some(&key(5)), false, |key, value| {
res.push((key.to_vec(), value.to_vec()));
true
})
.unwrap();
assert_eq!(res, vec![(key(1), value(1))]);
memtable.delete(&key(5));
assert_eq!(memtable.get(&key(5)), None);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1);
memtable.put(key(1), value(1), FileId::new(LogQueue::Rewrite, 2));
memtable.put(key(5), value(5), FileId::new(LogQueue::Rewrite, 3));
assert_eq!(memtable.min_file_seq(LogQueue::Append), None);
assert_eq!(memtable.max_file_seq(LogQueue::Append), None);
assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 2);
assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3);
assert_eq!(memtable.global_stats.rewrite_entries(), 2);
memtable.delete(&key(1));
assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 3);
assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3);
assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1);
memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 7));
assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None);
assert_eq!(memtable.max_file_seq(LogQueue::Rewrite), None);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 7);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 7);
assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 2);
}
#[test]
fn test_memtable_get_entry() {
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
assert_eq!(memtable.get_entry(0), None);
memtable.append(generate_entry_indexes(
5,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
10,
20,
FileId::new(LogQueue::Append, 2),
));
assert_eq!(memtable.get_entry(2), None);
assert_eq!(memtable.get_entry(25), None);
let entry_idx = memtable.get_entry(5);
assert_eq!(entry_idx.unwrap().index, 5);
}
#[test]
fn test_memtable_rewrite() {
let region_id = 8;
let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default()));
let mut expected_append = 0;
let mut expected_rewrite = 0;
let mut expected_deleted_rewrite = 0;
let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1));
memtable.rewrite(ents_idx, Some(1));
expected_rewrite += 10;
expected_deleted_rewrite += 10;
assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
assert_eq!(
memtable.global_stats.deleted_rewrite_entries(),
expected_deleted_rewrite
);
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
10,
20,
FileId::new(LogQueue::Append, 2),
));
memtable.put(
b"kk1".to_vec(),
b"vv1".to_vec(),
FileId::new(LogQueue::Append, 2),
);
memtable.append(generate_entry_indexes(
20,
30,
FileId::new(LogQueue::Append, 3),
));
memtable.put(
b"kk2".to_vec(),
b"vv2".to_vec(),
FileId::new(LogQueue::Append, 3),
);
memtable.append(generate_entry_indexes(
30,
40,
FileId::new(LogQueue::Append, 4),
));
memtable.put(
b"kk3".to_vec(),
b"vv3".to_vec(),
FileId::new(LogQueue::Append, 4),
);
expected_append += 4 * 10 + 3;
memtable.compact_to(10);
expected_append -= 10;
assert_eq!(memtable.entries_size(), 30);
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 2);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
memtable.consistency_check();
let ents_idx = generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 50));
memtable.rewrite(ents_idx, Some(1));
memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
expected_rewrite += 10 + 1;
expected_deleted_rewrite += 10 + 1;
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 2);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
assert!(memtable.min_file_seq(LogQueue::Rewrite).is_none());
assert!(memtable.max_file_seq(LogQueue::Rewrite).is_none());
assert_eq!(memtable.rewrite_count, 0);
assert_eq!(memtable.get(b"kk0"), None);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
assert_eq!(
memtable.global_stats.deleted_rewrite_entries(),
expected_deleted_rewrite
);
memtable.consistency_check();
let ents_idx = generate_entry_indexes(0, 20, FileId::new(LogQueue::Rewrite, 100));
memtable.rewrite(ents_idx, Some(2));
memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
memtable.rewrite_key(b"kk1".to_vec(), Some(2), 100);
expected_append -= 10 + 1;
expected_rewrite += 20 + 2;
expected_deleted_rewrite += 10 + 1;
let ents_idx = generate_entry_indexes(20, 30, FileId::new(LogQueue::Rewrite, 101));
memtable.rewrite(ents_idx, Some(3));
memtable.rewrite_key(b"kk2".to_vec(), Some(3), 101);
expected_append -= 10 + 1;
expected_rewrite += 10 + 1;
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 101);
assert_eq!(memtable.rewrite_count, 20);
assert_eq!(memtable.get(b"kk1"), Some(b"vv1".to_vec()));
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
assert_eq!(
memtable.global_stats.deleted_rewrite_entries(),
expected_deleted_rewrite
);
memtable.consistency_check();
memtable.append(generate_entry_indexes(
35,
36,
FileId::new(LogQueue::Append, 5),
));
expected_append -= 4;
memtable.put(
b"kk3".to_vec(),
b"vv33".to_vec(),
FileId::new(LogQueue::Append, 5),
);
assert_eq!(memtable.last_index().unwrap(), 35);
memtable.consistency_check();
let ents_idx = generate_entry_indexes(30, 40, FileId::new(LogQueue::Rewrite, 102));
memtable.rewrite(ents_idx, Some(4));
expected_append -= 5;
expected_rewrite += 10;
expected_deleted_rewrite += 5;
assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 5);
assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5);
assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 102);
assert_eq!(memtable.rewrite_count, 25);
assert_eq!(memtable.get(b"kk3"), Some(b"vv33".to_vec()));
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
assert_eq!(
memtable.global_stats.deleted_rewrite_entries(),
expected_deleted_rewrite
);
memtable.consistency_check();
memtable.append(generate_entry_indexes(
35,
50,
FileId::new(LogQueue::Append, 6),
));
expected_append += 15 - 1;
memtable.compact_to(30);
expected_deleted_rewrite += 20;
assert_eq!(memtable.last_index().unwrap(), 49);
assert_eq!(memtable.rewrite_count, 5);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
assert_eq!(
memtable.global_stats.deleted_rewrite_entries(),
expected_deleted_rewrite
);
memtable.consistency_check();
let ents_idx = generate_entry_indexes(10, 60, FileId::new(LogQueue::Rewrite, 103));
memtable.rewrite(ents_idx, None);
expected_rewrite += 50;
expected_deleted_rewrite += 50;
assert_eq!(memtable.first_index().unwrap(), 30);
assert_eq!(memtable.rewrite_count, 5);
assert_eq!(
memtable.global_stats.live_entries(LogQueue::Append),
expected_append
);
assert_eq!(memtable.global_stats.rewrite_entries(), expected_rewrite);
assert_eq!(
memtable.global_stats.deleted_rewrite_entries(),
expected_deleted_rewrite
);
memtable.consistency_check();
let global_stats = Arc::clone(&memtable.global_stats);
drop(memtable);
assert_eq!(global_stats.live_entries(LogQueue::Append), 0);
assert_eq!(global_stats.live_entries(LogQueue::Rewrite), 0);
}
#[test]
fn test_memtable_merge_append() {
type TestMemTable = MemTable<VacantAllocator>;
fn empty_table(id: u64) -> TestMemTable {
MemTable::new(id, Arc::new(GlobalStats::default()))
}
let cases = [
|mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
match on {
None => {
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
7,
15,
FileId::new(LogQueue::Append, 2),
));
memtable.rewrite(
generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
Some(1),
);
}
Some(LogQueue::Append) => {
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
7,
15,
FileId::new(LogQueue::Append, 2),
));
memtable.compact_to(7);
}
Some(LogQueue::Rewrite) => {
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
));
}
}
memtable
},
|mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
match on {
None => {
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
7,
15,
FileId::new(LogQueue::Append, 2),
));
memtable.rewrite(
generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
Some(1),
);
memtable.compact_to(10);
}
Some(LogQueue::Append) => {
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.append(generate_entry_indexes(
7,
15,
FileId::new(LogQueue::Append, 2),
));
memtable.compact_to(10);
}
Some(LogQueue::Rewrite) => {
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
));
memtable.compact_to(10);
}
}
memtable
},
|mut memtable: TestMemTable, on: Option<LogQueue>| -> TestMemTable {
match on {
None => {
memtable.append(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Append, 1),
));
memtable.rewrite(
generate_entry_indexes(0, 10, FileId::new(LogQueue::Rewrite, 1)),
Some(1),
);
memtable.append(generate_entry_indexes(
10,
15,
FileId::new(LogQueue::Append, 2),
));
memtable.append(generate_entry_indexes(
5,
10,
FileId::new(LogQueue::Append, 2),
));
}
Some(LogQueue::Append) => {
let mut m1 = empty_table(memtable.region_id);
m1.append(generate_entry_indexes(
10,
15,
FileId::new(LogQueue::Append, 2),
));
let mut m2 = empty_table(memtable.region_id);
m2.append(generate_entry_indexes(
5,
10,
FileId::new(LogQueue::Append, 2),
));
m1.merge_newer_neighbor(&mut m2);
memtable.merge_newer_neighbor(&mut m1);
}
Some(LogQueue::Rewrite) => {
memtable.replay_rewrite(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Rewrite, 1),
));
}
}
memtable
},
];
for (i, case) in cases.iter().enumerate() {
let region_id = i as u64;
let mut append = empty_table(region_id);
let mut rewrite = case(empty_table(region_id), Some(LogQueue::Rewrite));
rewrite.merge_append_table(&mut append);
assert_eq!(
rewrite.entry_indexes,
case(empty_table(region_id), Some(LogQueue::Rewrite)).entry_indexes,
);
assert!(append.entry_indexes.is_empty());
let mut append = case(empty_table(region_id), Some(LogQueue::Append));
let mut rewrite = empty_table(region_id);
rewrite.merge_append_table(&mut append);
assert_eq!(
rewrite.entry_indexes,
case(empty_table(region_id), Some(LogQueue::Append)).entry_indexes
);
assert!(append.entry_indexes.is_empty());
}
for (i, case) in cases.iter().enumerate() {
let region_id = i as u64;
let mut append = case(empty_table(region_id), Some(LogQueue::Append));
let mut rewrite = case(empty_table(region_id), Some(LogQueue::Rewrite));
rewrite.merge_append_table(&mut append);
let expected = case(empty_table(region_id), None);
assert_eq!(
rewrite.global_stats.live_entries(LogQueue::Append),
expected.global_stats.live_entries(LogQueue::Append)
);
assert_eq!(
rewrite.global_stats.live_entries(LogQueue::Rewrite),
expected.global_stats.live_entries(LogQueue::Rewrite)
);
assert_eq!(rewrite.entry_indexes, expected.entry_indexes);
assert!(append.entry_indexes.is_empty());
}
}
#[test]
fn test_memtables_merge_append_neighbor() {
let first_rid = 17;
let mut last_rid = first_rid;
let mut batches = vec![
LogItemBatch::with_capacity(0),
LogItemBatch::with_capacity(0),
LogItemBatch::with_capacity(0),
];
let files: Vec<_> = (0..batches.len())
.map(|i| FileId::new(LogQueue::Append, 10 + i as u64))
.collect();
batches[0].put(last_rid, b"key1".to_vec(), b"val1".to_vec());
batches[1].delete(last_rid, b"key1".to_vec());
batches[2].put(last_rid, b"key1".to_vec(), b"val2".to_vec());
last_rid += 1;
batches[0].put(last_rid, b"key".to_vec(), b"ANYTHING".to_vec());
batches[1].add_command(last_rid, Command::Clean);
last_rid += 1;
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
batches[1].add_command(last_rid, Command::Compact { index: 5 });
batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));
last_rid += 1;
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
batches[2].add_command(last_rid, Command::Compact { index: 8 });
for b in batches.iter_mut() {
b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
}
let mut ctxs = VecDeque::default();
for (batch, file_id) in batches.clone().into_iter().zip(files) {
let mut ctx = MemTableRecoverContext::default();
ctx.replay(batch, file_id).unwrap();
ctxs.push_back(ctx);
}
while ctxs.len() > 1 {
let (y, mut x) = (ctxs.pop_back().unwrap(), ctxs.pop_back().unwrap());
x.merge(y, LogQueue::Append).unwrap();
ctxs.push_back(x);
}
let (merged_memtables, merged_global_stats) = ctxs.pop_front().unwrap().finish();
let sequential_global_stats = Arc::new(GlobalStats::default());
let sequential_memtables = MemTableAccessor::new(sequential_global_stats.clone());
for mut batch in batches.clone() {
sequential_memtables.apply_append_writes(batch.drain());
}
for rid in first_rid..=last_rid {
let m = merged_memtables.get(rid);
let s = sequential_memtables.get(rid);
if m.is_none() {
assert!(s.is_none());
continue;
}
let merged = m.as_ref().unwrap().read();
let sequential = s.as_ref().unwrap().read();
let mut merged_vec = Vec::new();
let mut sequential_vec = Vec::new();
merged
.fetch_entry_indexes_before(u64::MAX, &mut merged_vec)
.unwrap();
sequential
.fetch_entry_indexes_before(u64::MAX, &mut sequential_vec)
.unwrap();
assert_eq!(merged_vec, sequential_vec);
merged_vec.clear();
sequential_vec.clear();
merged
.fetch_rewritten_entry_indexes(&mut merged_vec)
.unwrap();
sequential
.fetch_rewritten_entry_indexes(&mut sequential_vec)
.unwrap();
assert_eq!(merged_vec, sequential_vec);
let mut merged_vec = Vec::new();
let mut sequential_vec = Vec::new();
merged.fetch_kvs_before(u64::MAX, &mut merged_vec);
sequential.fetch_kvs_before(u64::MAX, &mut sequential_vec);
assert_eq!(merged_vec, sequential_vec);
merged_vec.clear();
sequential_vec.clear();
merged.fetch_rewritten_kvs(&mut merged_vec);
sequential.fetch_rewritten_kvs(&mut sequential_vec);
assert_eq!(merged_vec, sequential_vec);
}
assert_eq!(
merged_global_stats.live_entries(LogQueue::Append),
sequential_global_stats.live_entries(LogQueue::Append),
);
assert_eq!(
merged_global_stats.rewrite_entries(),
sequential_global_stats.rewrite_entries(),
);
assert_eq!(
merged_global_stats.deleted_rewrite_entries(),
sequential_global_stats.deleted_rewrite_entries(),
);
}
#[cfg(feature = "nightly")]
#[bench]
fn bench_memtable_single_put(b: &mut test::Bencher) {
let mut memtable = MemTable::new(0, Arc::new(GlobalStats::default()));
let key = b"some_key".to_vec();
let value = vec![7; 12];
b.iter(move || {
memtable.put(key.clone(), value.clone(), FileId::dummy(LogQueue::Append));
});
}
#[cfg(feature = "nightly")]
#[bench]
fn bench_memtable_triple_puts(b: &mut test::Bencher) {
let mut memtable = MemTable::new(0, Arc::new(GlobalStats::default()));
let key0 = b"some_key0".to_vec();
let key1 = b"some_key1".to_vec();
let key2 = b"some_key2".to_vec();
let value = vec![7; 12];
b.iter(move || {
memtable.put(key0.clone(), value.clone(), FileId::dummy(LogQueue::Append));
memtable.put(key1.clone(), value.clone(), FileId::dummy(LogQueue::Append));
memtable.put(key2.clone(), value.clone(), FileId::dummy(LogQueue::Append));
});
}
}