use std::cell::{Cell, RefCell};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use std::time::{Duration, Instant};
use log::{error, info};
use protobuf::{parse_from_bytes, Message};
use crate::config::{Config, RecoveryMode};
use crate::consistency::ConsistencyChecker;
use crate::env::{DefaultFileSystem, FileSystem};
use crate::event_listener::EventListener;
use crate::file_pipe_log::debug::LogItemReader;
use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
use crate::log_batch::{Command, LogBatch, MessageExt};
use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{perf_context, Error, GlobalStats, Result};
const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
const MAX_WRITE_ATTEMPT: u64 = 2;
pub struct Engine<F = DefaultFileSystem, P = FilePipeLog<F>>
where
F: FileSystem,
P: PipeLog,
{
cfg: Arc<Config>,
listeners: Vec<Arc<dyn EventListener>>,
#[allow(dead_code)]
stats: Arc<GlobalStats>,
memtables: MemTables,
pipe_log: Arc<P>,
purge_manager: PurgeManager<P>,
write_barrier: WriteBarrier<LogBatch, Result<FileBlockHandle>>,
tx: Mutex<mpsc::Sender<()>>,
metrics_flusher: Option<JoinHandle<()>>,
_phantom: PhantomData<F>,
}
impl Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>> {
pub fn open(cfg: Config) -> Result<Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>>> {
Self::open_with_listeners(cfg, vec![])
}
pub fn open_with_listeners(
cfg: Config,
listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>>> {
Self::open_with(cfg, Arc::new(DefaultFileSystem), listeners)
}
}
impl<F> Engine<F, FilePipeLog<F>>
where
F: FileSystem,
{
pub fn open_with_file_system(
cfg: Config,
file_system: Arc<F>,
) -> Result<Engine<F, FilePipeLog<F>>> {
Self::open_with(cfg, file_system, vec![])
}
pub fn open_with(
mut cfg: Config,
file_system: Arc<F>,
mut listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<F, FilePipeLog<F>>> {
cfg.sanitize()?;
listeners.push(Arc::new(PurgeHook::default()) as Arc<dyn EventListener>);
let start = Instant::now();
let mut builder = FilePipeLogBuilder::new(cfg.clone(), file_system, listeners.clone());
builder.scan()?;
let factory = MemTableRecoverContextFactory::new(&cfg);
let (append, rewrite) = builder.recover(&factory)?;
let pipe_log = Arc::new(builder.finish()?);
rewrite.merge_append_context(append);
let (memtables, stats) = rewrite.finish();
info!("Recovering raft logs takes {:?}", start.elapsed());
let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(
cfg.clone(),
memtables.clone(),
pipe_log.clone(),
stats.clone(),
listeners.clone(),
);
let (tx, rx) = mpsc::channel();
let stats_clone = stats.clone();
let memtables_clone = memtables.clone();
let metrics_flusher = ThreadBuilder::new()
.name("re-metrics".into())
.spawn(move || loop {
stats_clone.flush_metrics();
memtables_clone.flush_metrics();
if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
break;
}
})?;
Ok(Self {
cfg,
listeners,
stats,
memtables,
pipe_log,
purge_manager,
write_barrier: Default::default(),
tx: Mutex::new(tx),
metrics_flusher: Some(metrics_flusher),
_phantom: PhantomData,
})
}
}
impl<F, P> Engine<F, P>
where
F: FileSystem,
P: PipeLog,
{
pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> {
if log_batch.is_empty() {
return Ok(0);
}
let start = Instant::now();
let len = log_batch.finish_populate(
self.cfg.batch_compression_threshold.0 as usize,
self.cfg.compression_level,
)?;
debug_assert!(len > 0);
let mut attempt_count = 0_u64;
let block_handle = loop {
attempt_count += 1;
let mut writer = Writer::new(log_batch, sync);
let mut perf_context = take_perf_context();
let before_enter = Instant::now();
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let now = Instant::now();
let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
for writer in group.iter_mut() {
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.mut_payload();
let res = self.pipe_log.append(LogQueue::Append, log_batch);
writer.set_output(res);
}
perf_context!(log_write_duration).observe_since(now);
if sync {
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
}
let diff = get_perf_context();
for writer in group.iter_mut() {
writer.perf_context_diff = diff.clone();
}
}
let entered_time = writer.entered_time.unwrap();
perf_context.write_wait_duration +=
entered_time.saturating_duration_since(before_enter);
debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO);
perf_context += &writer.perf_context_diff;
set_perf_context(perf_context);
match writer.finish() {
Ok(handle) => {
ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM
.observe(entered_time.saturating_duration_since(start).as_secs_f64());
break handle;
}
Err(Error::TryAgain(e)) => {
if attempt_count >= MAX_WRITE_ATTEMPT {
return Err(Error::TryAgain(format!(
"Failed to write logbatch, exceed MAX_WRITE_ATTEMPT: ({MAX_WRITE_ATTEMPT}), err: {e}",
)));
}
info!("got err: {e}, try to write this LogBatch again");
}
Err(e) => {
return Err(e);
}
}
};
let mut now = Instant::now();
log_batch.finish_write(block_handle);
self.memtables.apply_append_writes(log_batch.drain());
for listener in &self.listeners {
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(apply_duration).observe(apply_duration);
now = end;
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
Ok(len)
}
pub fn sync(&self) -> Result<()> {
self.write(&mut LogBatch::default(), true)?;
Ok(())
}
pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> {
let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(value) = memtable.read().get(key) {
return Ok(Some(parse_from_bytes(&value)?));
}
}
Ok(None)
}
pub fn get(&self, region_id: u64, key: &[u8]) -> Option<Vec<u8>> {
let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
return memtable.read().get(key);
}
None
}
pub fn scan_messages<S, C>(
&self,
region_id: u64,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
reverse: bool,
mut callback: C,
) -> Result<()>
where
S: Message,
C: FnMut(&[u8], S) -> bool,
{
self.scan_raw_messages(region_id, start_key, end_key, reverse, move |k, raw_v| {
if let Ok(v) = parse_from_bytes(raw_v) {
callback(k, v)
} else {
true
}
})
}
pub fn scan_raw_messages<C>(
&self,
region_id: u64,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
reverse: bool,
callback: C,
) -> Result<()>
where
C: FnMut(&[u8], &[u8]) -> bool,
{
let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
memtable
.read()
.scan(start_key, end_key, reverse, callback)?;
}
Ok(())
}
pub fn get_entry<M: MessageExt>(
&self,
region_id: u64,
log_idx: u64,
) -> Result<Option<M::Entry>> {
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(idx) = memtable.read().get_entry(log_idx) {
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0);
return Ok(Some(read_entry_from_file::<M, _>(
self.pipe_log.as_ref(),
&idx,
)?));
}
}
Ok(None)
}
pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
self.purge_manager.purge_expired_files()
}
pub fn fetch_entries_to<M: MessageExt>(
&self,
region_id: u64,
begin: u64,
end: u64,
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
return Ok(ents_idx.len());
}
Ok(0)
}
pub fn first_index(&self, region_id: u64) -> Option<u64> {
if let Some(memtable) = self.memtables.get(region_id) {
return memtable.read().first_index();
}
None
}
pub fn last_index(&self, region_id: u64) -> Option<u64> {
if let Some(memtable) = self.memtables.get(region_id) {
return memtable.read().last_index();
}
None
}
pub fn compact_to(&self, region_id: u64, index: u64) -> u64 {
let first_index = match self.first_index(region_id) {
Some(index) => index,
None => return 0,
};
let mut log_batch = LogBatch::default();
log_batch.add_command(region_id, Command::Compact { index });
if let Err(e) = self.write(&mut log_batch, false) {
error!("Failed to write Compact command: {e}");
}
self.first_index(region_id).unwrap_or(index) - first_index
}
pub fn raft_groups(&self) -> Vec<u64> {
self.memtables.fold(vec![], |mut v, m| {
v.push(m.region_id());
v
})
}
pub fn is_empty(&self) -> bool {
self.memtables.is_empty()
}
pub fn file_span(&self, queue: LogQueue) -> (u64, u64) {
self.pipe_log.file_span(queue)
}
pub fn get_used_size(&self) -> usize {
self.pipe_log.total_size(LogQueue::Append) + self.pipe_log.total_size(LogQueue::Rewrite)
}
pub fn path(&self) -> &str {
self.cfg.dir.as_str()
}
#[cfg(feature = "internals")]
pub fn purge_manager(&self) -> &PurgeManager<P> {
&self.purge_manager
}
}
impl<F, P> Drop for Engine<F, P>
where
F: FileSystem,
P: PipeLog,
{
fn drop(&mut self) {
self.tx.lock().unwrap().send(()).unwrap();
if let Some(t) = self.metrics_flusher.take() {
t.join().unwrap();
}
}
}
impl Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>> {
pub fn consistency_check(path: &Path) -> Result<Vec<(u64, u64)>> {
Self::consistency_check_with_file_system(path, Arc::new(DefaultFileSystem))
}
#[cfg(feature = "scripting")]
pub fn unsafe_repair(path: &Path, queue: Option<LogQueue>, script: String) -> Result<()> {
Self::unsafe_repair_with_file_system(path, queue, script, Arc::new(DefaultFileSystem))
}
pub fn dump(path: &Path) -> Result<LogItemReader<DefaultFileSystem>> {
Self::dump_with_file_system(path, Arc::new(DefaultFileSystem))
}
}
impl<F> Engine<F, FilePipeLog<F>>
where
F: FileSystem,
{
pub fn consistency_check_with_file_system(
path: &Path,
file_system: Arc<F>,
) -> Result<Vec<(u64, u64)>> {
if !path.exists() {
return Err(Error::InvalidArgument(format!(
"raft-engine directory '{}' does not exist.",
path.to_str().unwrap()
)));
}
let cfg = Config {
dir: path.to_str().unwrap().to_owned(),
recovery_mode: RecoveryMode::TolerateAnyCorruption,
..Default::default()
};
let mut builder = FilePipeLogBuilder::new(cfg, file_system, Vec::new());
builder.scan()?;
let (append, rewrite) =
builder.recover(&DefaultMachineFactory::<ConsistencyChecker>::default())?;
let mut map = rewrite.finish();
for (id, index) in append.finish() {
map.entry(id).or_insert(index);
}
let mut list: Vec<(u64, u64)> = map.into_iter().collect();
list.sort_unstable();
Ok(list)
}
#[cfg(feature = "scripting")]
pub fn unsafe_repair_with_file_system(
path: &Path,
queue: Option<LogQueue>,
script: String,
file_system: Arc<F>,
) -> Result<()> {
use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};
if !path.exists() {
return Err(Error::InvalidArgument(format!(
"raft-engine directory '{}' does not exist.",
path.to_str().unwrap()
)));
}
let cfg = Config {
dir: path.to_str().unwrap().to_owned(),
recovery_mode: RecoveryMode::TolerateAnyCorruption,
..Default::default()
};
let recovery_mode = cfg.recovery_mode;
let read_block_size = cfg.recovery_read_block_size.0;
let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
builder.scan()?;
let factory = crate::filter::RhaiFilterMachineFactory::from_script(script);
let mut machine = None;
if queue.is_none() || queue.unwrap() == LogQueue::Append {
machine = Some(builder.recover_queue(
file_system.clone(),
RecoveryConfig {
queue: LogQueue::Append,
mode: recovery_mode,
concurrency: 1,
read_block_size,
},
&factory,
)?);
}
if queue.is_none() || queue.unwrap() == LogQueue::Rewrite {
let machine2 = builder.recover_queue(
file_system.clone(),
RecoveryConfig {
queue: LogQueue::Rewrite,
mode: recovery_mode,
concurrency: 1,
read_block_size,
},
&factory,
)?;
if let Some(machine) = &mut machine {
machine.merge(machine2, LogQueue::Rewrite)?;
}
}
if let Some(machine) = machine {
machine.finish(file_system.as_ref(), path)?;
}
Ok(())
}
pub fn dump_with_file_system(path: &Path, file_system: Arc<F>) -> Result<LogItemReader<F>> {
if !path.exists() {
return Err(Error::InvalidArgument(format!(
"raft-engine directory or file '{}' does not exist.",
path.to_str().unwrap()
)));
}
if path.is_dir() {
LogItemReader::new_directory_reader(file_system, path)
} else {
LogItemReader::new_file_reader(file_system, path)
}
}
}
struct BlockCache {
key: Cell<FileBlockHandle>,
block: RefCell<Vec<u8>>,
}
impl BlockCache {
fn new() -> Self {
BlockCache {
key: Cell::new(FileBlockHandle {
id: FileId::new(LogQueue::Append, 0),
offset: 0,
len: 0,
}),
block: RefCell::new(Vec::new()),
}
}
fn insert(&self, key: FileBlockHandle, block: Vec<u8>) {
self.key.set(key);
self.block.replace(block);
}
}
thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}
pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
P: PipeLog,
{
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&pipe_log.read_bytes(idx.entries.unwrap())?,
idx.entries.unwrap(),
idx.compression_type,
)?,
);
}
let e = parse_from_bytes(
&cache.block.borrow()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)?;
assert_eq!(M::index(&e), idx.index);
Ok(e)
})
}
pub(crate) fn read_entry_bytes_from_file<P>(pipe_log: &P, idx: &EntryIndex) -> Result<Vec<u8>>
where
P: PipeLog,
{
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&pipe_log.read_bytes(idx.entries.unwrap())?,
idx.entries.unwrap(),
idx.compression_type,
)?,
);
}
Ok(cache.block.borrow()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize]
.to_owned())
})
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::env::{ObfuscatedFileSystem, Permission};
use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt};
use crate::log_batch::AtomicGroupBuilder;
use crate::pipe_log::Version;
use crate::test_util::{generate_entries, PanicGuard};
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions;
use std::path::PathBuf;
pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
impl<F: FileSystem> RaftLogEngine<F> {
fn append(&self, rid: u64, start_index: u64, end_index: u64, data: Option<&[u8]>) {
let entries = generate_entries(start_index, end_index, data);
if !entries.is_empty() {
let mut batch = LogBatch::default();
batch.add_entries::<Entry>(rid, &entries).unwrap();
batch
.put_message(
rid,
b"last_index".to_vec(),
&RaftLocalState {
last_index: entries[entries.len() - 1].index,
..Default::default()
},
)
.unwrap();
self.write(&mut batch, true).unwrap();
}
}
fn clean(&self, rid: u64) {
let mut log_batch = LogBatch::default();
log_batch.add_command(rid, Command::Clean);
self.write(&mut log_batch, true).unwrap();
}
fn decode_last_index(&self, rid: u64) -> Option<u64> {
self.get_message::<RaftLocalState>(rid, b"last_index")
.unwrap()
.map(|s| s.last_index)
}
fn reopen(self) -> Self {
let cfg: Config = self.cfg.as_ref().clone();
let file_system = self.pipe_log.file_system();
let mut listeners = self.listeners.clone();
listeners.pop();
drop(self);
RaftLogEngine::open_with(cfg, file_system, listeners).unwrap()
}
fn scan_entries<FR: Fn(u64, LogQueue, &[u8])>(
&self,
rid: u64,
start: u64,
end: u64,
reader: FR,
) {
let mut entries = Vec::new();
self.fetch_entries_to::<Entry>(
rid,
self.first_index(rid).unwrap(),
self.last_index(rid).unwrap() + 1,
None,
&mut entries,
)
.unwrap();
assert_eq!(entries.first().unwrap().index, start, "{rid}");
assert_eq!(entries.last().unwrap().index + 1, end);
assert_eq!(
entries.last().unwrap().index,
self.decode_last_index(rid).unwrap()
);
assert_eq!(entries.len(), (end - start) as usize);
for e in entries.iter() {
let entry_index = self
.memtables
.get(rid)
.unwrap()
.read()
.get_entry(e.index)
.unwrap();
assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e);
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}
fn file_count(&self, queue: Option<LogQueue>) -> usize {
if let Some(queue) = queue {
let (a, b) = self.file_span(queue);
(b - a + 1) as usize
} else {
self.file_count(Some(LogQueue::Append)) + self.file_count(Some(LogQueue::Rewrite))
}
}
}
#[test]
fn test_empty_engine() {
let dir = tempfile::Builder::new()
.prefix("test_empty_engine")
.tempdir()
.unwrap();
let mut sub_dir = PathBuf::from(dir.as_ref());
sub_dir.push("raft-engine");
let cfg = Config {
dir: sub_dir.to_str().unwrap().to_owned(),
..Default::default()
};
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
}
#[test]
fn test_get_entry() {
let normal_batch_size = 10;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size, compressed_batch_size] {
let dir = tempfile::Builder::new()
.prefix("test_get_entry")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let engine = RaftLogEngine::open_with_file_system(
cfg.clone(),
Arc::new(ObfuscatedFileSystem::default()),
)
.unwrap();
assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];
for i in 10..20 {
let rid = i;
let index = i;
engine.append(rid, index, index + 2, Some(&data));
}
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
let engine = engine.reopen();
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
}
}
#[test]
fn test_clean_raft_group() {
fn run_steps(steps: &[Option<(u64, u64)>]) {
let rid = 1;
let data = vec![b'x'; 1024];
for rewrite_step in 1..=steps.len() {
for exit_purge in [None, Some(1), Some(2)] {
let _guard = PanicGuard::with_prompt(format!(
"case: [{steps:?}, {rewrite_step}, {exit_purge:?}]",
));
let dir = tempfile::Builder::new()
.prefix("test_clean_raft_group")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let engine = RaftLogEngine::open_with_file_system(
cfg.clone(),
Arc::new(ObfuscatedFileSystem::default()),
)
.unwrap();
for (i, step) in steps.iter().enumerate() {
if let Some((start, end)) = *step {
engine.append(rid, start, end, Some(&data));
} else {
engine.clean(rid);
}
if i + 1 == rewrite_step {
engine
.purge_manager
.must_rewrite_append_queue(None, exit_purge);
}
}
let engine = engine.reopen();
if let Some((start, end)) = *steps.last().unwrap() {
engine.scan_entries(rid, start, end, |_, _, d| {
assert_eq!(d, &data);
});
} else {
assert!(engine.raft_groups().is_empty());
}
engine.purge_manager.must_rewrite_append_queue(None, None);
let engine = engine.reopen();
if let Some((start, end)) = *steps.last().unwrap() {
engine.scan_entries(rid, start, end, |_, _, d| {
assert_eq!(d, &data);
});
} else {
assert!(engine.raft_groups().is_empty());
}
}
}
}
run_steps(&[Some((1, 5)), None, Some((2, 6)), None, Some((3, 7)), None]);
run_steps(&[Some((1, 5)), None, Some((2, 6)), None, Some((3, 7))]);
run_steps(&[Some((1, 5)), None, Some((2, 6)), None]);
run_steps(&[Some((1, 5)), None, Some((2, 6))]);
run_steps(&[Some((1, 5)), None]);
}
#[test]
fn test_key_value_scan() {
fn key(i: u64) -> Vec<u8> {
format!("k{i}").as_bytes().to_vec()
}
fn value(i: u64) -> Vec<u8> {
format!("v{i}").as_bytes().to_vec()
}
fn rich_value(i: u64) -> RaftLocalState {
RaftLocalState {
last_index: i,
..Default::default()
}
}
let dir = tempfile::Builder::new()
.prefix("test_key_value_scan")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let rid = 1;
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
engine
.scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
panic!("unexpected message.");
})
.unwrap();
let mut batch = LogBatch::default();
let mut res = Vec::new();
let mut rich_res = Vec::new();
batch.put(rid, key(1), value(1)).unwrap();
batch.put(rid, key(2), value(2)).unwrap();
batch.put(rid, key(3), value(3)).unwrap();
engine.write(&mut batch, false).unwrap();
engine
.scan_raw_messages(rid, None, None, false, |k, v| {
res.push((k.to_vec(), v.to_vec()));
true
})
.unwrap();
assert_eq!(
res,
vec![(key(1), value(1)), (key(2), value(2)), (key(3), value(3))]
);
res.clear();
engine
.scan_raw_messages(rid, None, None, true, |k, v| {
res.push((k.to_vec(), v.to_vec()));
true
})
.unwrap();
assert_eq!(
res,
vec![(key(3), value(3)), (key(2), value(2)), (key(1), value(1))]
);
res.clear();
engine
.scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
panic!("unexpected message.")
})
.unwrap();
batch.put_message(rid, key(22), &rich_value(22)).unwrap();
batch.put_message(rid, key(33), &rich_value(33)).unwrap();
engine.write(&mut batch, false).unwrap();
engine
.scan_messages(rid, None, None, false, |k, v| {
rich_res.push((k.to_vec(), v));
false
})
.unwrap();
assert_eq!(rich_res, vec![(key(22), rich_value(22))]);
rich_res.clear();
engine
.scan_messages(rid, None, None, true, |k, v| {
rich_res.push((k.to_vec(), v));
false
})
.unwrap();
assert_eq!(rich_res, vec![(key(33), rich_value(33))]);
rich_res.clear();
}
#[test]
fn test_delete_key_value() {
let dir = tempfile::Builder::new()
.prefix("test_delete_key_value")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let rid = 1;
let key = b"key".to_vec();
let (v1, v2) = (b"v1".to_vec(), b"v2".to_vec());
let mut batch_1 = LogBatch::default();
batch_1.put(rid, key.clone(), v1).unwrap();
let mut batch_2 = LogBatch::default();
batch_2.put(rid, key.clone(), v2.clone()).unwrap();
let mut delete_batch = LogBatch::default();
delete_batch.delete(rid, key.clone());
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
assert_eq!(
engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
None
);
assert_eq!(engine.get(rid, &key), None);
engine.write(&mut batch_1.clone(), true).unwrap();
assert!(engine.get_message::<RaftLocalState>(rid, &key).is_err());
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.write(&mut delete_batch.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key), None);
assert_eq!(
engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
None
);
engine.write(&mut batch_1.clone(), true).unwrap();
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
engine.write(&mut delete_batch.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key), None);
let engine = engine.reopen();
engine.write(&mut batch_1.clone(), true).unwrap();
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.write(&mut delete_batch.clone(), true).unwrap();
engine.write(&mut batch_2.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
engine.write(&mut batch_1.clone(), true).unwrap();
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
engine.write(&mut delete_batch.clone(), true).unwrap();
engine.write(&mut batch_2.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
let engine = engine.reopen();
engine.write(&mut batch_1.clone(), true).unwrap();
engine.write(&mut delete_batch.clone(), true).unwrap();
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.write(&mut batch_2.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
engine.write(&mut batch_1.clone(), true).unwrap();
engine.write(&mut delete_batch.clone(), true).unwrap();
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
engine.write(&mut batch_2.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
let engine = engine.reopen();
engine.write(&mut batch_1.clone(), true).unwrap();
engine.write(&mut delete_batch.clone(), true).unwrap();
engine.write(&mut batch_2.clone(), true).unwrap();
engine.purge_manager.must_rewrite_append_queue(None, None);
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
let engine = engine.reopen();
engine.write(&mut batch_1.clone(), true).unwrap();
engine.write(&mut delete_batch.clone(), true).unwrap();
engine.write(&mut batch_2.clone(), true).unwrap();
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
}
#[test]
fn test_compact_raft_group() {
let dir = tempfile::Builder::new()
.prefix("test_compact_raft_group")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 1024];
let mut rid = 7;
engine.append(rid, 1, 10, Some(&data));
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
let mut compact_log = LogBatch::default();
compact_log.add_command(rid, Command::Compact { index: 5 });
engine.write(&mut compact_log, true).unwrap();
let engine = engine.reopen();
engine.scan_entries(rid, 5, 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
assert_eq!(engine.stats.live_entries(LogQueue::Append), 6); rid += 1;
engine.append(rid, 5, 15, Some(&data));
let mut compact_log = LogBatch::default();
compact_log.add_command(rid, Command::Compact { index: 10 });
engine.write(&mut compact_log, true).unwrap();
engine.append(rid, 15, 25, Some(&data));
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
let mut compact_log = LogBatch::default();
compact_log.add_command(rid, Command::Compact { index: 20 });
engine.memtables.apply_append_writes(compact_log.drain());
engine.purge_manager.must_rewrite_rewrite_queue();
let engine = engine.reopen();
engine.scan_entries(rid, 10, 25, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
assert_eq!(engine.stats.live_entries(LogQueue::Append), 22); engine.clean(rid - 1);
assert_eq!(engine.stats.live_entries(LogQueue::Append), 16);
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
let engine = engine.reopen();
engine.scan_entries(rid, 10, 25, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
rid += 1;
engine.append(rid, 5, 15, Some(&data));
let mut compact_log = LogBatch::default();
compact_log.add_command(rid, Command::Compact { index: 10 });
engine.write(&mut compact_log, true).unwrap();
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.append(rid, 15, 25, Some(&data));
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
let mut compact_log = LogBatch::default();
compact_log.add_command(rid, Command::Compact { index: 20 });
engine.write(&mut compact_log, true).unwrap();
let engine = engine.reopen();
engine.scan_entries(rid, 20, 25, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
rid += 1;
engine.append(rid, 1, 5, Some(&data));
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.append(rid, 5, 15, Some(&data));
let mut compact_log = LogBatch::default();
compact_log.add_command(rid, Command::Compact { index: 10 });
engine.write(&mut compact_log, true).unwrap();
engine
.purge_manager
.must_rewrite_append_queue(None, Some(2));
let engine = engine.reopen();
engine.scan_entries(rid, 10, 15, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
#[test]
fn test_purge_triggered_by_compact() {
let dir = tempfile::Builder::new()
.prefix("test_purge_triggered_by_compact")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize::kb(5),
purge_threshold: ReadableSize::kb(150),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 1024];
for index in 0..100 {
engine.append(1, index, index + 1, Some(&data));
}
let count = engine.compact_to(1, 100);
assert_eq!(count, 100);
assert!(!engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
for index in 100..250 {
engine.append(1, index, index + 1, Some(&data));
}
assert_eq!(engine.compact_to(1, 101), 1);
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
let old_min_file_seq = engine.file_span(LogQueue::Append).0;
let will_force_compact = engine.purge_expired_files().unwrap();
let new_min_file_seq = engine.file_span(LogQueue::Append).0;
assert!(new_min_file_seq > old_min_file_seq);
assert!(will_force_compact.is_empty());
assert!(engine.get_entry::<Entry>(1, 101).unwrap().is_some());
assert_eq!(engine.compact_to(1, 102), 1);
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
let will_force_compact = engine.purge_expired_files().unwrap();
assert!(!will_force_compact.is_empty());
assert_eq!(will_force_compact[0], 1);
}
#[test]
fn test_purge_trigger_force_rewrite() {
let dir = tempfile::Builder::new()
.prefix("test_purge_trigger_force_write")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize::kb(1),
purge_threshold: ReadableSize::kb(10),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 1024];
for rid in 1..=3 {
for index in 0..50 {
engine.append(rid, index, index + 1, Some(&data[..10]));
}
}
for rid in 4..=50 {
engine.append(rid, 1, 2, Some(&data));
}
let check_purge = |pending_regions: Vec<u64>| {
let mut compact_regions = engine.purge_expired_files().unwrap();
compact_regions.sort_unstable();
assert_eq!(compact_regions, pending_regions);
};
for _ in 0..9 {
check_purge(vec![1, 2, 3]);
}
check_purge(vec![1, 2, 3]);
for rid in 1..=3 {
let memtable = engine.memtables.get(rid).unwrap();
assert_eq!(memtable.read().rewrite_count(), 50);
}
for rid in 2..=50 {
let last_idx = engine.last_index(rid).unwrap();
engine.compact_to(rid, last_idx);
engine.append(rid, last_idx, last_idx + 1, Some(&data));
}
check_purge(vec![1]);
}
#[test]
fn test_rewrite_and_recover() {
let dir = tempfile::Builder::new()
.prefix("test_rewrite_and_recover")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize::kb(5),
purge_threshold: ReadableSize::kb(80),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 1024];
for index in 1..=10 {
for rid in 1..=10 {
engine.append(rid, index, index + 1, Some(&data));
}
}
engine.append(11, 1, 11, Some(&data));
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.file_span(LogQueue::Append).0 > 1);
let rewrite_file_size = engine.pipe_log.total_size(LogQueue::Rewrite);
assert!(rewrite_file_size > 59); for rid in 1..=10 {
engine.scan_entries(rid, 1, 11, |_, _, d| {
assert_eq!(d, &data);
});
}
engine.clean(11);
let cleaned_region_ids = engine.memtables.cleaned_region_ids();
assert_eq!(cleaned_region_ids.len(), 1);
let engine = engine.reopen();
assert_eq!(engine.memtables.cleaned_region_ids(), cleaned_region_ids);
for rid in 1..=10 {
engine.scan_entries(rid, 1, 11, |_, _, d| {
assert_eq!(d, &data);
});
}
for index in 11..=20 {
for rid in 1..=10 {
engine.append(rid, index, index + 1, Some(&data));
}
}
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());
}
#[test]
fn test_empty_protobuf_message() {
let dir = tempfile::Builder::new()
.prefix("test_empty_protobuf_message")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let mut log_batch = LogBatch::default();
let empty_entry = Entry::new();
assert_eq!(empty_entry.compute_size(), 0);
log_batch
.add_entries::<Entry>(0, &[empty_entry.clone()])
.unwrap();
engine.write(&mut log_batch, false).unwrap();
let empty_state = RaftLocalState::new();
assert_eq!(empty_state.compute_size(), 0);
log_batch
.put_message(1, b"key".to_vec(), &empty_state)
.unwrap();
engine.write(&mut log_batch, false).unwrap();
log_batch
.add_entries::<Entry>(2, &[empty_entry.clone()])
.unwrap();
log_batch
.put_message(2, b"key".to_vec(), &empty_state)
.unwrap();
engine.write(&mut log_batch, true).unwrap();
let engine = engine.reopen();
assert_eq!(
engine.get_entry::<Entry>(0, 0).unwrap().unwrap(),
empty_entry
);
assert_eq!(
engine.get_entry::<Entry>(2, 0).unwrap().unwrap(),
empty_entry
);
assert_eq!(
engine
.get_message::<RaftLocalState>(1, b"key")
.unwrap()
.unwrap(),
empty_state
);
assert_eq!(
engine
.get_message::<RaftLocalState>(2, b"key")
.unwrap()
.unwrap(),
empty_state
);
}
#[test]
fn test_empty_batch() {
let dir = tempfile::Builder::new()
.prefix("test_empty_batch")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 16];
let cases = [[false, false], [false, true], [true, true]];
for (i, writes) in cases.iter().enumerate() {
let rid = i as u64;
let mut batch = LogBatch::default();
for &has_data in writes {
if has_data {
batch.put(rid, b"key".to_vec(), data.clone()).unwrap();
}
engine.write(&mut batch, true).unwrap();
assert!(batch.is_empty());
}
}
}
#[test]
fn test_dirty_recovery() {
let dir = tempfile::Builder::new()
.prefix("test_dirty_recovery")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 1024];
for rid in 1..21 {
engine.append(rid, 1, 21, Some(&data));
}
std::fs::create_dir(dir.path().join(Path::new("random_dir"))).unwrap();
let _f = std::fs::File::create(dir.path().join(Path::new("random_file"))).unwrap();
let engine = engine.reopen();
for rid in 1..21 {
engine.scan_entries(rid, 1, 21, |_, _, d| {
assert_eq!(d, &data);
});
}
}
#[test]
fn test_large_rewrite_batch() {
let dir = tempfile::Builder::new()
.prefix("test_large_rewrite_batch")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 2 * 1024 * 1024];
for rid in 1..=3 {
engine.append(rid, 1, 11, Some(&data));
}
let old_active_file = engine.file_span(LogQueue::Append).1;
engine.purge_manager.must_rewrite_append_queue(None, None);
assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1);
let old_active_file = engine.file_span(LogQueue::Rewrite).1;
engine.purge_manager.must_rewrite_rewrite_queue();
assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file);
for rid in engine.raft_groups() {
let mut total = 0;
engine
.scan_raw_messages(rid, None, None, false, |k, _| {
assert!(!crate::is_internal_key(k, None));
total += 1;
true
})
.unwrap();
assert_eq!(total, 1);
}
assert_eq!(engine.raft_groups().len(), 3);
let engine = engine.reopen();
for rid in 1..=3 {
engine.scan_entries(rid, 1, 11, |_, _, d| {
assert_eq!(d, &data);
});
}
}
#[test]
fn test_combination_of_version_and_recycle() {
fn test_engine_ops(cfg_v1: &Config, cfg_v2: &Config) {
let rid = 1;
let data = vec![b'7'; 1024];
{
let engine = RaftLogEngine::open(cfg_v1.clone()).unwrap();
engine.append(rid, 0, 20, Some(&data));
let append_first = engine.file_span(LogQueue::Append).0;
engine.compact_to(rid, 18);
engine.purge_expired_files().unwrap();
assert!(engine.file_span(LogQueue::Append).0 > append_first);
assert_eq!(engine.first_index(rid).unwrap(), 18);
assert_eq!(engine.last_index(rid).unwrap(), 19);
}
{
let engine = RaftLogEngine::open(cfg_v2.clone()).unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 18);
assert_eq!(engine.last_index(rid).unwrap(), 19);
engine.append(rid, 20, 40, Some(&data));
let append_first = engine.file_span(LogQueue::Append).0;
engine.compact_to(rid, 38);
engine.purge_expired_files().unwrap();
assert!(engine.file_span(LogQueue::Append).0 > append_first);
assert_eq!(engine.first_index(rid).unwrap(), 38);
assert_eq!(engine.last_index(rid).unwrap(), 39);
}
{
let engine = RaftLogEngine::open(cfg_v1.clone()).unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 38);
assert_eq!(engine.last_index(rid).unwrap(), 39);
}
}
{
let dir = tempfile::Builder::new()
.prefix("test_mutable_format_version")
.tempdir()
.unwrap();
let cfg_v1 = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V1,
enable_log_recycle: false,
..Default::default()
};
let cfg_v2 = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V2,
enable_log_recycle: false,
..Default::default()
};
test_engine_ops(&cfg_v1, &cfg_v2);
}
{
let dir = tempfile::Builder::new()
.prefix("test_enable_log_recycle")
.tempdir()
.unwrap();
let cfg_v1 = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V1,
enable_log_recycle: false,
..Default::default()
};
let cfg_v2 = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
format_version: Version::V2,
enable_log_recycle: true,
prefill_for_recycle: true,
..Default::default()
};
test_engine_ops(&cfg_v1, &cfg_v2);
}
}
#[test]
fn test_dump_file_or_directory() {
let dir = tempfile::Builder::new()
.prefix("test_dump_file_or_directory")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 1024];
let fs = Arc::new(ObfuscatedFileSystem::default());
let mut batches = vec![vec![LogBatch::default()]];
let mut batch = LogBatch::default();
batch
.add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
.unwrap();
batch.add_command(7, Command::Clean);
batch.put(7, b"key".to_vec(), b"value".to_vec()).unwrap();
batch.delete(7, b"key2".to_vec());
batches.push(vec![batch.clone()]);
let mut batch2 = LogBatch::default();
batch2.put(8, b"key3".to_vec(), b"value".to_vec()).unwrap();
batch2
.add_entries::<Entry>(8, &generate_entries(5, 15, Some(&entry_data)))
.unwrap();
batches.push(vec![batch, batch2]);
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
for bs in batches.iter_mut() {
for batch in bs.iter_mut() {
engine.write(batch, false).unwrap();
}
engine.sync().unwrap();
}
drop(engine);
let dump_it = Engine::dump_with_file_system(dir.path(), fs.clone()).unwrap();
let total = dump_it
.inspect(|i| {
i.as_ref().unwrap();
})
.count();
assert!(total == 10);
let file_id = FileId {
queue: LogQueue::Rewrite,
seq: 1,
};
let dump_it = Engine::dump_with_file_system(
file_id.build_file_path(dir.path()).as_path(),
fs.clone(),
)
.unwrap();
let total = dump_it
.inspect(|i| {
i.as_ref().unwrap();
})
.count();
assert!(0 == total);
assert!(Engine::dump_with_file_system(Path::new("/not_exists_dir"), fs.clone()).is_err());
let mut not_exists_file = PathBuf::from(dir.as_ref());
not_exists_file.push("not_exists_file");
assert!(Engine::dump_with_file_system(not_exists_file.as_path(), fs).is_err());
}
#[cfg(feature = "scripting")]
#[test]
fn test_repair_default() {
let dir = tempfile::Builder::new()
.prefix("test_repair_default")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1), ..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
for rid in 1..=50 {
engine.append(rid, 1, 6, Some(&entry_data));
}
for rid in 25..=50 {
engine.append(rid, 6, 11, Some(&entry_data));
}
drop(engine);
let script1 = "".to_owned();
RaftLogEngine::unsafe_repair_with_file_system(
dir.path(),
None, script1,
fs.clone(),
)
.unwrap();
let script2 = "
fn filter_append(id, first, count, rewrite_count, queue, ifirst, ilast) {
0
}
fn filter_compact(id, first, count, rewrite_count, queue, compact_to) {
0
}
fn filter_clean(id, first, count, rewrite_count, queue) {
0
}
"
.to_owned();
RaftLogEngine::unsafe_repair_with_file_system(
dir.path(),
None, script2,
fs.clone(),
)
.unwrap();
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
for rid in 1..25 {
engine.scan_entries(rid, 1, 6, |_, _, d| {
assert_eq!(d, &entry_data);
});
}
for rid in 25..=50 {
engine.scan_entries(rid, 1, 11, |_, _, d| {
assert_eq!(d, &entry_data);
});
}
}
#[cfg(feature = "scripting")]
#[test]
fn test_repair_discard_entries() {
let dir = tempfile::Builder::new()
.prefix("test_repair_discard")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1), ..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
for rid in 1..=50 {
engine.append(rid, 1, 6, Some(&entry_data));
}
for rid in 25..=50 {
engine.append(rid, 6, 11, Some(&entry_data));
}
drop(engine);
let incoming_emptied = [1, 25];
let existing_emptied = [2, 26];
let script = "
fn filter_append(id, first, count, rewrite_count, queue, ifirst, ilast) {
if id == 1 {
return 1;
} else if id == 2 {
return 2;
} else if id == 25 {
return 1;
} else if id == 26 {
return 2;
}
0 // default
}
"
.to_owned();
RaftLogEngine::unsafe_repair_with_file_system(
dir.path(),
None, script,
fs.clone(),
)
.unwrap();
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
for rid in 1..25 {
if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) {
continue;
}
engine.scan_entries(rid, 1, 6, |_, _, d| {
assert_eq!(d, &entry_data);
});
}
for rid in 25..=50 {
if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) {
continue;
}
engine.scan_entries(rid, 1, 11, |_, _, d| {
assert_eq!(d, &entry_data);
});
}
for rid in existing_emptied {
let first_index = if rid < 25 { 1 } else { 6 };
let last_index = if rid < 25 { 5 } else { 10 };
engine.scan_entries(rid, first_index, last_index + 1, |_, _, d| {
assert_eq!(d, &entry_data);
});
}
for rid in incoming_emptied {
let last_index = if rid < 25 { 5 } else { 10 };
assert_eq!(engine.first_index(rid), None);
assert_eq!(engine.last_index(rid), None);
assert_eq!(engine.decode_last_index(rid), Some(last_index));
}
}
#[test]
fn test_tail_corruption() {
let dir = tempfile::Builder::new()
.prefix("test_tail_corruption")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 16];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize::gb(10),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
for rid in 1..=50 {
engine.append(rid, 1, 6, Some(&entry_data));
}
for rid in 25..=50 {
engine.append(rid, 6, 11, Some(&entry_data));
}
let (_, last_file_seq) = engine.file_span(LogQueue::Append);
drop(engine);
let last_file = FileId {
queue: LogQueue::Append,
seq: last_file_seq,
};
let f = OpenOptions::new()
.write(true)
.open(last_file.build_file_path(dir.path()))
.unwrap();
f.set_len(f.metadata().unwrap().len() - 1).unwrap();
RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
f.set_len(1).unwrap();
RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
}
#[test]
fn test_reopen_with_wrong_file_system() {
let dir = tempfile::Builder::new()
.prefix("test_reopen_with_wrong_file_system")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
drop(engine);
assert!(RaftLogEngine::open(cfg.clone()).is_err());
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
for rid in 1..10 {
engine.scan_entries(rid, 1, 11, |_, _, d| {
assert_eq!(d, &entry_data);
});
}
}
#[cfg(feature = "nightly")]
#[bench]
fn bench_engine_fetch_entries(b: &mut test::Bencher) {
use rand::{thread_rng, Rng};
let dir = tempfile::Builder::new()
.prefix("bench_engine_fetch_entries")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 1024];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine = RaftLogEngine::open(cfg).unwrap();
for i in 0..10 {
for rid in 1..=100 {
engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
}
}
let mut vec: Vec<Entry> = Vec::new();
b.iter(move || {
let region_id = thread_rng().gen_range(1..=100);
engine
.fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
.unwrap();
vec.clear();
});
}
#[test]
fn test_engine_is_empty() {
let dir = tempfile::Builder::new()
.prefix("test_engine_is_empty")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let rid = 1;
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
assert!(engine.is_empty());
engine.append(rid, 1, 11, Some(&entry_data));
assert!(!engine.is_empty());
let mut log_batch = LogBatch::default();
log_batch.add_command(rid, Command::Compact { index: 11 });
log_batch.delete(rid, b"last_index".to_vec());
engine.write(&mut log_batch, true).unwrap();
assert!(!engine.is_empty());
engine.clean(rid);
assert!(engine.is_empty());
}
pub struct DeleteMonitoredFileSystem {
inner: ObfuscatedFileSystem,
append_metadata: Mutex<BTreeSet<u64>>,
reserved_metadata: Mutex<BTreeSet<u64>>,
}
impl DeleteMonitoredFileSystem {
fn new() -> Self {
Self {
inner: ObfuscatedFileSystem::default(),
append_metadata: Mutex::new(BTreeSet::new()),
reserved_metadata: Mutex::new(BTreeSet::new()),
}
}
fn update_metadata(&self, path: &Path, delete: bool) -> bool {
let path = path.file_name().unwrap().to_str().unwrap();
let parse_append = FileId::parse_file_name(path);
let parse_reserved = parse_reserved_file_name(path);
match (parse_append, parse_reserved) {
(Some(id), None) if id.queue == LogQueue::Append => {
if delete {
self.append_metadata.lock().unwrap().remove(&id.seq)
} else {
self.append_metadata.lock().unwrap().insert(id.seq)
}
}
(None, Some(seq)) => {
if delete {
self.reserved_metadata.lock().unwrap().remove(&seq)
} else {
self.reserved_metadata.lock().unwrap().insert(seq)
}
}
_ => false,
}
}
}
impl FileSystem for DeleteMonitoredFileSystem {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
self.update_metadata(path.as_ref(), false);
Ok(handle)
}
fn open<P: AsRef<Path>>(&self, path: P, perm: Permission) -> std::io::Result<Self::Handle> {
let handle = self.inner.open(&path, perm)?;
self.update_metadata(path.as_ref(), false);
Ok(handle)
}
fn delete<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
self.inner.delete(&path)?;
self.update_metadata(path.as_ref(), true);
Ok(())
}
fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
self.inner.rename(src_path.as_ref(), dst_path.as_ref())?;
self.update_metadata(src_path.as_ref(), true);
self.update_metadata(dst_path.as_ref(), false);
Ok(())
}
fn reuse<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
self.inner.reuse(src_path.as_ref(), dst_path.as_ref())?;
self.update_metadata(src_path.as_ref(), true);
self.update_metadata(dst_path.as_ref(), false);
Ok(())
}
fn delete_metadata<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
self.inner.delete_metadata(&path)?;
self.update_metadata(path.as_ref(), true);
Ok(())
}
fn exists_metadata<P: AsRef<Path>>(&self, path: P) -> bool {
if self.inner.exists_metadata(&path) {
return true;
}
let path = path.as_ref().file_name().unwrap().to_str().unwrap();
let parse_append = FileId::parse_file_name(path);
let parse_reserved = parse_reserved_file_name(path);
match (parse_append, parse_reserved) {
(Some(id), None) if id.queue == LogQueue::Append => {
self.append_metadata.lock().unwrap().contains(&id.seq)
}
(None, Some(seq)) => self.reserved_metadata.lock().unwrap().contains(&seq),
_ => false,
}
}
fn new_reader(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Reader> {
self.inner.new_reader(h)
}
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}
}
#[test]
fn test_managed_file_deletion() {
let dir = tempfile::Builder::new()
.prefix("test_managed_file_deletion")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
enable_log_recycle: false,
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
for rid in 1..=5 {
engine.clean(rid);
}
let (start, _) = engine.file_span(LogQueue::Append);
engine.purge_expired_files().unwrap();
assert!(start < engine.file_span(LogQueue::Append).0);
assert_eq!(engine.file_count(None), fs.inner.file_count());
let start = engine.file_span(LogQueue::Append).0;
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
let engine = engine.reopen();
assert_eq!(engine.file_count(None), fs.inner.file_count());
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
for i in start / 2..start {
fs.append_metadata.lock().unwrap().insert(i);
}
let engine = engine.reopen();
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
}
#[test]
fn test_managed_file_reuse() {
let dir = tempfile::Builder::new()
.prefix("test_managed_file_reuse")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 16];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(50),
format_version: Version::V2,
enable_log_recycle: true,
prefill_for_recycle: true,
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
let reserved_start = *fs.reserved_metadata.lock().unwrap().first().unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
for rid in 1..=10 {
engine.clean(rid);
}
engine.purge_manager.must_rewrite_append_queue(None, None);
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
let reserved_start_1 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
assert!(reserved_start < reserved_start_1);
for rid in 1..=5 {
engine.append(rid, 1, 11, Some(&entry_data));
}
let reserved_start_2 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
assert!(reserved_start_1 < reserved_start_2);
let file_count = fs.inner.file_count();
let start_1 = *fs.append_metadata.lock().unwrap().first().unwrap();
let engine = engine.reopen();
assert_eq!(file_count, fs.inner.file_count());
let start_2 = *fs.append_metadata.lock().unwrap().first().unwrap();
assert!(start_1 < start_2);
let reserved_start_3 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
assert_eq!(reserved_start_2, reserved_start_3);
for rid in 1..=50 {
engine.append(rid, 1, 11, Some(&entry_data));
}
assert!(fs.reserved_metadata.lock().unwrap().is_empty());
for rid in 1..=50 {
engine.clean(rid);
}
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.append(1, 1, 11, Some(&entry_data));
assert_eq!(engine.file_count(Some(LogQueue::Append)), 2);
let start_3 = *fs.append_metadata.lock().unwrap().first().unwrap();
assert!(start_2 < start_3);
}
#[test]
fn test_simple_write_perf_context() {
let dir = tempfile::Builder::new()
.prefix("test_simple_write_perf_context")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let rid = 1;
let entry_size = 5120;
let engine = RaftLogEngine::open(cfg).unwrap();
let data = vec![b'x'; entry_size];
let old_perf_context = get_perf_context();
engine.append(rid, 1, 5, Some(&data));
let new_perf_context = get_perf_context();
assert_ne!(
old_perf_context.log_populating_duration,
new_perf_context.log_populating_duration
);
assert_ne!(
old_perf_context.log_write_duration,
new_perf_context.log_write_duration
);
assert_ne!(
old_perf_context.apply_duration,
new_perf_context.apply_duration
);
}
#[test]
fn test_recycle_no_signing_files() {
let dir = tempfile::Builder::new()
.prefix("test_recycle_no_signing_files")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let cfg_v1 = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1024),
format_version: Version::V1,
enable_log_recycle: false,
..Default::default()
};
let cfg_v2 = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(15),
format_version: Version::V2,
enable_log_recycle: true,
prefill_for_recycle: false,
..Default::default()
};
assert!(cfg_v2.recycle_capacity() > 0);
{
let engine = RaftLogEngine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
}
{
let engine = RaftLogEngine::open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap();
let (start, _) = engine.file_span(LogQueue::Append);
for rid in 6..=10 {
engine.append(rid, 11, 20, Some(&entry_data));
}
engine.clean(6);
engine.purge_expired_files().unwrap();
assert_eq!(engine.file_count(Some(LogQueue::Append)), 5);
assert!(start < engine.file_span(LogQueue::Append).0);
}
{
let engine = RaftLogEngine::open_with_file_system(cfg_v1, fs.clone()).unwrap();
let (start, _) = engine.file_span(LogQueue::Append);
for rid in 6..=10 {
engine.append(rid, 20, 30, Some(&entry_data));
}
for rid in 6..=10 {
engine.append(rid, 30, 40, Some(&entry_data));
}
for rid in 1..=5 {
engine.append(rid, 11, 20, Some(&entry_data));
}
assert_eq!(engine.file_span(LogQueue::Append).0, start);
let file_count = engine.file_count(Some(LogQueue::Append));
drop(engine);
let engine = RaftLogEngine::open_with_file_system(cfg_v2, fs).unwrap();
assert_eq!(engine.file_span(LogQueue::Append).0, start);
assert_eq!(engine.file_count(Some(LogQueue::Append)), file_count);
for rid in 1..=10 {
engine.clean(rid);
}
let (start, _) = engine.file_span(LogQueue::Append);
engine.purge_expired_files().unwrap();
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
assert!(engine.file_span(LogQueue::Append).0 > start);
}
}
#[test]
fn test_start_engine_with_resize_recycle_capacity() {
let dir = tempfile::Builder::new()
.prefix("test_start_engine_with_resize_recycle_capacity")
.tempdir()
.unwrap();
let path = dir.path().to_str().unwrap();
let file_system = Arc::new(DeleteMonitoredFileSystem::new());
let entry_data = vec![b'x'; 512];
let cfg = Config {
dir: path.to_owned(),
enable_log_recycle: false,
..Default::default()
};
let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap();
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
for rid in 1..=5 {
engine.append(rid, 1, 10, Some(&entry_data));
}
assert_eq!(engine.file_span(LogQueue::Append).0, start);
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
drop(engine);
let cfg = Config {
dir: path.to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(80), enable_log_recycle: true,
prefill_for_recycle: true,
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
let (start, end) = engine.file_span(LogQueue::Append);
assert_eq!(start, end);
let recycled_count = file_system.inner.file_count() - engine.file_count(None);
assert!(recycled_count > 0);
for rid in 1..=5 {
engine.append(rid, 10, 20, Some(&entry_data));
}
assert_eq!(engine.file_span(LogQueue::Append).0, start);
assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
let (start, end) = engine.file_span(LogQueue::Append);
let recycled_count = file_system.inner.file_count() - engine.file_count(None);
drop(engine);
let cfg_v2 = Config {
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(50),
..cfg
};
let engine =
RaftLogEngine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap();
assert_eq!(engine.file_span(LogQueue::Append), (start, end));
assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
engine.purge_expired_files().unwrap();
assert_eq!(engine.file_span(LogQueue::Append), (start, end));
for rid in 1..=10 {
engine.append(rid, 20, 31, Some(&entry_data));
}
assert!(engine.file_span(LogQueue::Append).1 > end);
let engine = engine.reopen();
assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
drop(engine);
let cfg_v3 = Config {
target_file_size: ReadableSize::kb(2),
purge_threshold: ReadableSize::kb(100),
enable_log_recycle: false,
prefill_for_recycle: false,
..cfg_v2
};
let engine = RaftLogEngine::open_with_file_system(cfg_v3, file_system.clone()).unwrap();
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
}
#[test]
fn test_rewrite_atomic_group() {
let dir = tempfile::Builder::new()
.prefix("test_rewrite_atomic_group")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
recovery_threads: 100,
target_file_size: ReadableSize(1),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let key = vec![b'x'; 2];
let value = vec![b'y'; 8];
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
let mut data = HashSet::new();
let mut rid = 1;
let mut log_batch = LogBatch::default();
let flush = |lb: &mut LogBatch| {
lb.finish_populate(0, None).unwrap();
engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap();
lb.drain();
};
{
let mut builder = AtomicGroupBuilder::with_id(3);
builder.begin(&mut log_batch);
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
let mut builder = AtomicGroupBuilder::with_id(3);
builder.begin(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
let mut builder = AtomicGroupBuilder::with_id(3);
builder.begin(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.add(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.add(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
let mut builder = AtomicGroupBuilder::with_id(3);
builder.begin(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
let mut builder = AtomicGroupBuilder::with_id(3);
builder.begin(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
let mut builder = AtomicGroupBuilder::with_id(4);
builder.begin(&mut LogBatch::default());
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
let mut builder = AtomicGroupBuilder::with_id(4);
builder.begin(&mut LogBatch::default());
builder.add(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
let mut builder = AtomicGroupBuilder::with_id(5);
builder.begin(&mut LogBatch::default());
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
let mut builder = AtomicGroupBuilder::with_id(5);
builder.begin(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
let mut builder = AtomicGroupBuilder::with_id(6);
builder.begin(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
builder.end(&mut log_batch);
flush(&mut log_batch);
let mut builder = AtomicGroupBuilder::with_id(7);
builder.begin(&mut log_batch);
flush(&mut log_batch);
builder.end(&mut log_batch);
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
engine.pipe_log.sync(LogQueue::Rewrite).unwrap();
let engine = engine.reopen();
for rid in engine.raft_groups() {
assert!(data.remove(&rid), "{}", rid);
assert_eq!(engine.get(rid, &key).unwrap(), value);
}
assert!(data.is_empty(), "data loss {:?}", data);
}
#[test]
fn test_internal_key_filter() {
let dir = tempfile::Builder::new()
.prefix("test_internal_key_filter")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
let value = vec![b'y'; 8];
let mut log_batch = LogBatch::default();
log_batch.put_unchecked(1, crate::make_internal_key(&[1]), value.clone());
log_batch.put_unchecked(2, crate::make_internal_key(&[1]), value.clone());
engine.write(&mut log_batch, false).unwrap();
assert!(engine.raft_groups().is_empty());
let engine = engine.reopen();
assert!(engine.raft_groups().is_empty());
log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone());
log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value);
log_batch.finish_populate(0, None).unwrap();
let block_handle = engine
.pipe_log
.append(LogQueue::Rewrite, &mut log_batch)
.unwrap();
log_batch.finish_write(block_handle);
engine
.memtables
.apply_rewrite_writes(log_batch.drain(), None, 0);
assert!(engine.raft_groups().is_empty());
let engine = engine.reopen();
assert!(engine.raft_groups().is_empty());
}
#[test]
fn test_start_engine_with_multi_dirs() {
let dir = tempfile::Builder::new()
.prefix("test_start_engine_with_multi_dirs_default")
.tempdir()
.unwrap();
let spill_dir = tempfile::Builder::new()
.prefix("test_start_engine_with_multi_dirs_spill")
.tempdir()
.unwrap();
fn number_of_files(p: &Path) -> usize {
let mut r = 0;
std::fs::read_dir(p).unwrap().for_each(|e| {
if e.unwrap()
.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with("000")
{
r += 1;
}
});
r
}
let file_system = Arc::new(DeleteMonitoredFileSystem::new());
let entry_data = vec![b'x'; 512];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
spill_dir: Some(spill_dir.path().to_str().unwrap().to_owned()),
enable_log_recycle: false,
target_file_size: ReadableSize(1),
..Default::default()
};
{
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 10, Some(&entry_data));
}
drop(engine);
let mut moved = 0;
for e in std::fs::read_dir(dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if let Some(FileId {
queue: LogQueue::Append,
seq: _,
}) = FileId::parse_file_name(file_name)
{
file_system
.rename(&p, &spill_dir.path().join(file_name))
.unwrap();
moved += 1;
if moved == 4 {
break;
}
}
}
}
let cfg_2 = Config {
enable_log_recycle: true,
prefill_for_recycle: true,
purge_threshold: ReadableSize(40),
..cfg.clone()
};
let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap();
assert!(number_of_files(spill_dir.path()) > 0);
for rid in 1..=10 {
assert_eq!(engine.first_index(rid).unwrap(), 1);
engine.clean(rid);
}
engine.purge_manager.must_rewrite_append_queue(None, None);
let file_count = file_system.inner.file_count();
assert_eq!(
number_of_files(spill_dir.path()) + number_of_files(dir.path()),
file_count
);
assert!(file_count > engine.file_count(None));
for rid in 1..=30 {
engine.append(rid, 20, 30, Some(&entry_data));
}
assert_eq!(file_count, file_system.inner.file_count());
assert!(number_of_files(spill_dir.path()) > 0);
let cfg_3 = Config {
enable_log_recycle: false,
purge_threshold: ReadableSize(40),
..cfg
};
drop(engine);
let engine = RaftLogEngine::open_with_file_system(cfg_3, file_system).unwrap();
assert!(number_of_files(spill_dir.path()) > 0);
for rid in 1..=10 {
assert_eq!(engine.first_index(rid).unwrap(), 20);
}
{
let mut file_count = 0;
for e in std::fs::read_dir(spill_dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if let Some(FileId {
queue: LogQueue::Append,
seq: _,
}) = FileId::parse_file_name(file_name)
{
if file_count % 2 == 0 {
std::fs::copy(&p, dir.path().join(file_name)).unwrap();
}
file_count += 1;
}
}
}
let start = engine.file_span(LogQueue::Append).0;
let engine = engine.reopen();
assert!(engine.file_span(LogQueue::Append).0 > start);
}
}