1use std::cell::{Cell, RefCell};
4use std::marker::PhantomData;
5use std::path::Path;
6use std::sync::{mpsc, Arc, Mutex};
7use std::thread::{Builder as ThreadBuilder, JoinHandle};
8use std::time::{Duration, Instant};
9
10use log::{error, info};
11use protobuf::{parse_from_bytes, Message};
12
13use crate::config::{Config, RecoveryMode};
14use crate::consistency::ConsistencyChecker;
15use crate::env::{DefaultFileSystem, FileSystem};
16use crate::event_listener::EventListener;
17use crate::file_pipe_log::debug::LogItemReader;
18use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
19use crate::log_batch::{Command, LogBatch, MessageExt};
20use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
21use crate::metrics::*;
22use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
23use crate::purge::{PurgeHook, PurgeManager};
24use crate::write_barrier::{WriteBarrier, Writer};
25use crate::{perf_context, Error, GlobalStats, Result};
26
27const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
28const MAX_WRITE_ATTEMPT: u64 = 2;
30
31pub struct Engine<F = DefaultFileSystem, P = FilePipeLog<F>>
32where
33 F: FileSystem,
34 P: PipeLog,
35{
36 cfg: Arc<Config>,
37 listeners: Vec<Arc<dyn EventListener>>,
38
39 #[allow(dead_code)]
40 stats: Arc<GlobalStats>,
41 memtables: MemTables,
42 pipe_log: Arc<P>,
43 purge_manager: PurgeManager<P>,
44
45 write_barrier: WriteBarrier<LogBatch, Result<FileBlockHandle>>,
46
47 tx: Mutex<mpsc::Sender<()>>,
48 metrics_flusher: Option<JoinHandle<()>>,
49
50 _phantom: PhantomData<F>,
51}
52
53impl Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>> {
54 pub fn open(cfg: Config) -> Result<Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>>> {
55 Self::open_with_listeners(cfg, vec![])
56 }
57
58 pub fn open_with_listeners(
59 cfg: Config,
60 listeners: Vec<Arc<dyn EventListener>>,
61 ) -> Result<Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>>> {
62 Self::open_with(cfg, Arc::new(DefaultFileSystem), listeners)
63 }
64}
65
66impl<F> Engine<F, FilePipeLog<F>>
67where
68 F: FileSystem,
69{
70 pub fn open_with_file_system(
71 cfg: Config,
72 file_system: Arc<F>,
73 ) -> Result<Engine<F, FilePipeLog<F>>> {
74 Self::open_with(cfg, file_system, vec![])
75 }
76
77 pub fn open_with(
78 mut cfg: Config,
79 file_system: Arc<F>,
80 mut listeners: Vec<Arc<dyn EventListener>>,
81 ) -> Result<Engine<F, FilePipeLog<F>>> {
82 cfg.sanitize()?;
83 listeners.push(Arc::new(PurgeHook::default()) as Arc<dyn EventListener>);
84
85 let start = Instant::now();
86 let mut builder = FilePipeLogBuilder::new(cfg.clone(), file_system, listeners.clone());
87 builder.scan()?;
88 let factory = MemTableRecoverContextFactory::new(&cfg);
89 let (append, rewrite) = builder.recover(&factory)?;
90 let pipe_log = Arc::new(builder.finish()?);
91 rewrite.merge_append_context(append);
92 let (memtables, stats) = rewrite.finish();
93 info!("Recovering raft logs takes {:?}", start.elapsed());
94
95 let cfg = Arc::new(cfg);
96 let purge_manager = PurgeManager::new(
97 cfg.clone(),
98 memtables.clone(),
99 pipe_log.clone(),
100 stats.clone(),
101 listeners.clone(),
102 );
103
104 let (tx, rx) = mpsc::channel();
105 let stats_clone = stats.clone();
106 let memtables_clone = memtables.clone();
107 let metrics_flusher = ThreadBuilder::new()
108 .name("re-metrics".into())
109 .spawn(move || loop {
110 stats_clone.flush_metrics();
111 memtables_clone.flush_metrics();
112 if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
113 break;
114 }
115 })?;
116
117 Ok(Self {
118 cfg,
119 listeners,
120 stats,
121 memtables,
122 pipe_log,
123 purge_manager,
124 write_barrier: Default::default(),
125 tx: Mutex::new(tx),
126 metrics_flusher: Some(metrics_flusher),
127 _phantom: PhantomData,
128 })
129 }
130}
131
132impl<F, P> Engine<F, P>
133where
134 F: FileSystem,
135 P: PipeLog,
136{
137 pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> {
141 if log_batch.is_empty() {
142 return Ok(0);
143 }
144 let start = Instant::now();
145 let len = log_batch.finish_populate(
146 self.cfg.batch_compression_threshold.0 as usize,
147 self.cfg.compression_level,
148 )?;
149 debug_assert!(len > 0);
150
151 let mut attempt_count = 0_u64;
152 let block_handle = loop {
153 attempt_count += 1;
158 let mut writer = Writer::new(log_batch, sync);
159 let mut perf_context = take_perf_context();
162 let before_enter = Instant::now();
163 if let Some(mut group) = self.write_barrier.enter(&mut writer) {
164 let now = Instant::now();
165 let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
166 for writer in group.iter_mut() {
167 writer.entered_time = Some(now);
168 sync |= writer.sync;
169 let log_batch = writer.mut_payload();
170 let res = self.pipe_log.append(LogQueue::Append, log_batch);
171 writer.set_output(res);
172 }
173 perf_context!(log_write_duration).observe_since(now);
174 if sync {
175 self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
178 }
179 let diff = get_perf_context();
181 for writer in group.iter_mut() {
182 writer.perf_context_diff = diff.clone();
183 }
184 }
185 let entered_time = writer.entered_time.unwrap();
186 perf_context.write_wait_duration +=
187 entered_time.saturating_duration_since(before_enter);
188 debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO);
189 perf_context += &writer.perf_context_diff;
190 set_perf_context(perf_context);
191 match writer.finish() {
194 Ok(handle) => {
195 ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM
196 .observe(entered_time.saturating_duration_since(start).as_secs_f64());
197 break handle;
198 }
199 Err(Error::TryAgain(e)) => {
200 if attempt_count >= MAX_WRITE_ATTEMPT {
201 return Err(Error::TryAgain(format!(
205 "Failed to write logbatch, exceed MAX_WRITE_ATTEMPT: ({MAX_WRITE_ATTEMPT}), err: {e}",
206 )));
207 }
208 info!("got err: {e}, try to write this LogBatch again");
209 }
210 Err(e) => {
211 return Err(e);
212 }
213 }
214 };
215 let mut now = Instant::now();
216 log_batch.finish_write(block_handle);
217 self.memtables.apply_append_writes(log_batch.drain());
218 for listener in &self.listeners {
219 listener.post_apply_memtables(block_handle.id);
220 }
221 let end = Instant::now();
222 let apply_duration = end.saturating_duration_since(now);
223 ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
224 perf_context!(apply_duration).observe(apply_duration);
225 now = end;
226 ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
227 ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
228 Ok(len)
229 }
230
231 pub fn sync(&self) -> Result<()> {
233 self.write(&mut LogBatch::default(), true)?;
234 Ok(())
235 }
236
237 pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> {
238 let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
239 if let Some(memtable) = self.memtables.get(region_id) {
240 if let Some(value) = memtable.read().get(key) {
241 return Ok(Some(parse_from_bytes(&value)?));
242 }
243 }
244 Ok(None)
245 }
246
247 pub fn get(&self, region_id: u64, key: &[u8]) -> Option<Vec<u8>> {
248 let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
249 if let Some(memtable) = self.memtables.get(region_id) {
250 return memtable.read().get(key);
251 }
252 None
253 }
254
255 pub fn scan_messages<S, C>(
258 &self,
259 region_id: u64,
260 start_key: Option<&[u8]>,
261 end_key: Option<&[u8]>,
262 reverse: bool,
263 mut callback: C,
264 ) -> Result<()>
265 where
266 S: Message,
267 C: FnMut(&[u8], S) -> bool,
268 {
269 self.scan_raw_messages(region_id, start_key, end_key, reverse, move |k, raw_v| {
270 if let Ok(v) = parse_from_bytes(raw_v) {
271 callback(k, v)
272 } else {
273 true
274 }
275 })
276 }
277
278 pub fn scan_raw_messages<C>(
281 &self,
282 region_id: u64,
283 start_key: Option<&[u8]>,
284 end_key: Option<&[u8]>,
285 reverse: bool,
286 callback: C,
287 ) -> Result<()>
288 where
289 C: FnMut(&[u8], &[u8]) -> bool,
290 {
291 let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
292 if let Some(memtable) = self.memtables.get(region_id) {
293 memtable
294 .read()
295 .scan(start_key, end_key, reverse, callback)?;
296 }
297 Ok(())
298 }
299
300 pub fn get_entry<M: MessageExt>(
301 &self,
302 region_id: u64,
303 log_idx: u64,
304 ) -> Result<Option<M::Entry>> {
305 let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
306 if let Some(memtable) = self.memtables.get(region_id) {
307 if let Some(idx) = memtable.read().get_entry(log_idx) {
308 ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0);
309 return Ok(Some(read_entry_from_file::<M, _>(
310 self.pipe_log.as_ref(),
311 &idx,
312 )?));
313 }
314 }
315 Ok(None)
316 }
317
318 pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
321 self.purge_manager.purge_expired_files()
322 }
323
324 pub fn fetch_entries_to<M: MessageExt>(
326 &self,
327 region_id: u64,
328 begin: u64,
329 end: u64,
330 max_size: Option<usize>,
331 vec: &mut Vec<M::Entry>,
332 ) -> Result<usize> {
333 let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
334 if let Some(memtable) = self.memtables.get(region_id) {
335 let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
336 memtable
337 .read()
338 .fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
339 for i in ents_idx.iter() {
340 vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
341 }
342 ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
343 return Ok(ents_idx.len());
344 }
345 Ok(0)
346 }
347
348 pub fn first_index(&self, region_id: u64) -> Option<u64> {
349 if let Some(memtable) = self.memtables.get(region_id) {
350 return memtable.read().first_index();
351 }
352 None
353 }
354
355 pub fn last_index(&self, region_id: u64) -> Option<u64> {
356 if let Some(memtable) = self.memtables.get(region_id) {
357 return memtable.read().last_index();
358 }
359 None
360 }
361
362 pub fn compact_to(&self, region_id: u64, index: u64) -> u64 {
365 let first_index = match self.first_index(region_id) {
366 Some(index) => index,
367 None => return 0,
368 };
369
370 let mut log_batch = LogBatch::default();
371 log_batch.add_command(region_id, Command::Compact { index });
372 if let Err(e) = self.write(&mut log_batch, false) {
373 error!("Failed to write Compact command: {e}");
374 }
375
376 self.first_index(region_id).unwrap_or(index) - first_index
377 }
378
379 pub fn raft_groups(&self) -> Vec<u64> {
380 self.memtables.fold(vec![], |mut v, m| {
381 v.push(m.region_id());
382 v
383 })
384 }
385
386 pub fn is_empty(&self) -> bool {
389 self.memtables.is_empty()
390 }
391
392 pub fn file_span(&self, queue: LogQueue) -> (u64, u64) {
396 self.pipe_log.file_span(queue)
397 }
398
399 pub fn get_used_size(&self) -> usize {
400 self.pipe_log.total_size(LogQueue::Append) + self.pipe_log.total_size(LogQueue::Rewrite)
401 }
402
403 pub fn path(&self) -> &str {
404 self.cfg.dir.as_str()
405 }
406
407 #[cfg(feature = "internals")]
408 pub fn purge_manager(&self) -> &PurgeManager<P> {
409 &self.purge_manager
410 }
411}
412
413impl<F, P> Drop for Engine<F, P>
414where
415 F: FileSystem,
416 P: PipeLog,
417{
418 fn drop(&mut self) {
419 self.tx.lock().unwrap().send(()).unwrap();
420 if let Some(t) = self.metrics_flusher.take() {
421 t.join().unwrap();
422 }
423 }
424}
425
426impl Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>> {
427 pub fn consistency_check(path: &Path) -> Result<Vec<(u64, u64)>> {
428 Self::consistency_check_with_file_system(path, Arc::new(DefaultFileSystem))
429 }
430
431 #[cfg(feature = "scripting")]
432 pub fn unsafe_repair(path: &Path, queue: Option<LogQueue>, script: String) -> Result<()> {
433 Self::unsafe_repair_with_file_system(path, queue, script, Arc::new(DefaultFileSystem))
434 }
435
436 pub fn dump(path: &Path) -> Result<LogItemReader<DefaultFileSystem>> {
437 Self::dump_with_file_system(path, Arc::new(DefaultFileSystem))
438 }
439}
440
441impl<F> Engine<F, FilePipeLog<F>>
442where
443 F: FileSystem,
444{
445 pub fn consistency_check_with_file_system(
448 path: &Path,
449 file_system: Arc<F>,
450 ) -> Result<Vec<(u64, u64)>> {
451 if !path.exists() {
452 return Err(Error::InvalidArgument(format!(
453 "raft-engine directory '{}' does not exist.",
454 path.to_str().unwrap()
455 )));
456 }
457
458 let cfg = Config {
459 dir: path.to_str().unwrap().to_owned(),
460 recovery_mode: RecoveryMode::TolerateAnyCorruption,
461 ..Default::default()
462 };
463 let mut builder = FilePipeLogBuilder::new(cfg, file_system, Vec::new());
464 builder.scan()?;
465 let (append, rewrite) =
466 builder.recover(&DefaultMachineFactory::<ConsistencyChecker>::default())?;
467 let mut map = rewrite.finish();
468 for (id, index) in append.finish() {
469 map.entry(id).or_insert(index);
470 }
471 let mut list: Vec<(u64, u64)> = map.into_iter().collect();
472 list.sort_unstable();
473 Ok(list)
474 }
475
476 #[cfg(feature = "scripting")]
477 pub fn unsafe_repair_with_file_system(
478 path: &Path,
479 queue: Option<LogQueue>,
480 script: String,
481 file_system: Arc<F>,
482 ) -> Result<()> {
483 use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};
484
485 if !path.exists() {
486 return Err(Error::InvalidArgument(format!(
487 "raft-engine directory '{}' does not exist.",
488 path.to_str().unwrap()
489 )));
490 }
491
492 let cfg = Config {
493 dir: path.to_str().unwrap().to_owned(),
494 recovery_mode: RecoveryMode::TolerateAnyCorruption,
495 ..Default::default()
496 };
497 let recovery_mode = cfg.recovery_mode;
498 let read_block_size = cfg.recovery_read_block_size.0;
499 let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
500 builder.scan()?;
501 let factory = crate::filter::RhaiFilterMachineFactory::from_script(script);
502 let mut machine = None;
503 if queue.is_none() || queue.unwrap() == LogQueue::Append {
504 machine = Some(builder.recover_queue(
505 file_system.clone(),
506 RecoveryConfig {
507 queue: LogQueue::Append,
508 mode: recovery_mode,
509 concurrency: 1,
510 read_block_size,
511 },
512 &factory,
513 )?);
514 }
515 if queue.is_none() || queue.unwrap() == LogQueue::Rewrite {
516 let machine2 = builder.recover_queue(
517 file_system.clone(),
518 RecoveryConfig {
519 queue: LogQueue::Rewrite,
520 mode: recovery_mode,
521 concurrency: 1,
522 read_block_size,
523 },
524 &factory,
525 )?;
526 if let Some(machine) = &mut machine {
527 machine.merge(machine2, LogQueue::Rewrite)?;
528 }
529 }
530 if let Some(machine) = machine {
531 machine.finish(file_system.as_ref(), path)?;
532 }
533 Ok(())
534 }
535
536 pub fn dump_with_file_system(path: &Path, file_system: Arc<F>) -> Result<LogItemReader<F>> {
538 if !path.exists() {
539 return Err(Error::InvalidArgument(format!(
540 "raft-engine directory or file '{}' does not exist.",
541 path.to_str().unwrap()
542 )));
543 }
544
545 if path.is_dir() {
546 LogItemReader::new_directory_reader(file_system, path)
547 } else {
548 LogItemReader::new_file_reader(file_system, path)
549 }
550 }
551}
552
553struct BlockCache {
554 key: Cell<FileBlockHandle>,
555 block: RefCell<Vec<u8>>,
556}
557
558impl BlockCache {
559 fn new() -> Self {
560 BlockCache {
561 key: Cell::new(FileBlockHandle {
562 id: FileId::new(LogQueue::Append, 0),
563 offset: 0,
564 len: 0,
565 }),
566 block: RefCell::new(Vec::new()),
567 }
568 }
569
570 fn insert(&self, key: FileBlockHandle, block: Vec<u8>) {
571 self.key.set(key);
572 self.block.replace(block);
573 }
574}
575
576thread_local! {
577 static BLOCK_CACHE: BlockCache = BlockCache::new();
578}
579
580pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
581where
582 M: MessageExt,
583 P: PipeLog,
584{
585 BLOCK_CACHE.with(|cache| {
586 if cache.key.get() != idx.entries.unwrap() {
587 cache.insert(
588 idx.entries.unwrap(),
589 LogBatch::decode_entries_block(
590 &pipe_log.read_bytes(idx.entries.unwrap())?,
591 idx.entries.unwrap(),
592 idx.compression_type,
593 )?,
594 );
595 }
596 let e = parse_from_bytes(
597 &cache.block.borrow()
598 [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
599 )?;
600 assert_eq!(M::index(&e), idx.index);
601 Ok(e)
602 })
603}
604
605pub(crate) fn read_entry_bytes_from_file<P>(pipe_log: &P, idx: &EntryIndex) -> Result<Vec<u8>>
606where
607 P: PipeLog,
608{
609 BLOCK_CACHE.with(|cache| {
610 if cache.key.get() != idx.entries.unwrap() {
611 cache.insert(
612 idx.entries.unwrap(),
613 LogBatch::decode_entries_block(
614 &pipe_log.read_bytes(idx.entries.unwrap())?,
615 idx.entries.unwrap(),
616 idx.compression_type,
617 )?,
618 );
619 }
620 Ok(cache.block.borrow()
621 [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize]
622 .to_owned())
623 })
624}
625
626#[cfg(test)]
627pub(crate) mod tests {
628 use super::*;
629 use crate::env::{ObfuscatedFileSystem, Permission};
630 use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt};
631 use crate::log_batch::AtomicGroupBuilder;
632 use crate::pipe_log::Version;
633 use crate::test_util::{generate_entries, PanicGuard};
634 use crate::util::ReadableSize;
635 use kvproto::raft_serverpb::RaftLocalState;
636 use raft::eraftpb::Entry;
637 use std::collections::{BTreeSet, HashSet};
638 use std::fs::OpenOptions;
639 use std::path::PathBuf;
640
641 pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
642 impl<F: FileSystem> RaftLogEngine<F> {
643 fn append(&self, rid: u64, start_index: u64, end_index: u64, data: Option<&[u8]>) {
644 let entries = generate_entries(start_index, end_index, data);
645 if !entries.is_empty() {
646 let mut batch = LogBatch::default();
647 batch.add_entries::<Entry>(rid, &entries).unwrap();
648 batch
649 .put_message(
650 rid,
651 b"last_index".to_vec(),
652 &RaftLocalState {
653 last_index: entries[entries.len() - 1].index,
654 ..Default::default()
655 },
656 )
657 .unwrap();
658 self.write(&mut batch, true).unwrap();
659 }
660 }
661
662 fn clean(&self, rid: u64) {
663 let mut log_batch = LogBatch::default();
664 log_batch.add_command(rid, Command::Clean);
665 self.write(&mut log_batch, true).unwrap();
666 }
667
668 fn decode_last_index(&self, rid: u64) -> Option<u64> {
669 self.get_message::<RaftLocalState>(rid, b"last_index")
670 .unwrap()
671 .map(|s| s.last_index)
672 }
673
674 fn reopen(self) -> Self {
675 let cfg: Config = self.cfg.as_ref().clone();
676 let file_system = self.pipe_log.file_system();
677 let mut listeners = self.listeners.clone();
678 listeners.pop();
679 drop(self);
680 RaftLogEngine::open_with(cfg, file_system, listeners).unwrap()
681 }
682
683 fn scan_entries<FR: Fn(u64, LogQueue, &[u8])>(
684 &self,
685 rid: u64,
686 start: u64,
687 end: u64,
688 reader: FR,
689 ) {
690 let mut entries = Vec::new();
691 self.fetch_entries_to::<Entry>(
692 rid,
693 self.first_index(rid).unwrap(),
694 self.last_index(rid).unwrap() + 1,
695 None,
696 &mut entries,
697 )
698 .unwrap();
699 assert_eq!(entries.first().unwrap().index, start, "{rid}");
700 assert_eq!(entries.last().unwrap().index + 1, end);
701 assert_eq!(
702 entries.last().unwrap().index,
703 self.decode_last_index(rid).unwrap()
704 );
705 assert_eq!(entries.len(), (end - start) as usize);
706 for e in entries.iter() {
707 let entry_index = self
708 .memtables
709 .get(rid)
710 .unwrap()
711 .read()
712 .get_entry(e.index)
713 .unwrap();
714 assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e);
715 reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
716 }
717 }
718
719 fn file_count(&self, queue: Option<LogQueue>) -> usize {
720 if let Some(queue) = queue {
721 let (a, b) = self.file_span(queue);
722 (b - a + 1) as usize
723 } else {
724 self.file_count(Some(LogQueue::Append)) + self.file_count(Some(LogQueue::Rewrite))
725 }
726 }
727 }
728
729 #[test]
730 fn test_empty_engine() {
731 let dir = tempfile::Builder::new()
732 .prefix("test_empty_engine")
733 .tempdir()
734 .unwrap();
735 let mut sub_dir = PathBuf::from(dir.as_ref());
736 sub_dir.push("raft-engine");
737 let cfg = Config {
738 dir: sub_dir.to_str().unwrap().to_owned(),
739 ..Default::default()
740 };
741 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
742 .unwrap();
743 }
744
745 #[test]
746 fn test_get_entry() {
747 let normal_batch_size = 10;
748 let compressed_batch_size = 5120;
749 for &entry_size in &[normal_batch_size, compressed_batch_size] {
750 let dir = tempfile::Builder::new()
751 .prefix("test_get_entry")
752 .tempdir()
753 .unwrap();
754 let cfg = Config {
755 dir: dir.path().to_str().unwrap().to_owned(),
756 target_file_size: ReadableSize(1),
757 ..Default::default()
758 };
759
760 let engine = RaftLogEngine::open_with_file_system(
761 cfg.clone(),
762 Arc::new(ObfuscatedFileSystem::default()),
763 )
764 .unwrap();
765 assert_eq!(engine.path(), dir.path().to_str().unwrap());
766 let data = vec![b'x'; entry_size];
767 for i in 10..20 {
768 let rid = i;
769 let index = i;
770 engine.append(rid, index, index + 2, Some(&data));
771 }
772 for i in 10..20 {
773 let rid = i;
774 let index = i;
775 engine.scan_entries(rid, index, index + 2, |_, q, d| {
776 assert_eq!(q, LogQueue::Append);
777 assert_eq!(d, &data);
778 });
779 }
780
781 let engine = engine.reopen();
783 for i in 10..20 {
784 let rid = i;
785 let index = i;
786 engine.scan_entries(rid, index, index + 2, |_, q, d| {
787 assert_eq!(q, LogQueue::Append);
788 assert_eq!(d, &data);
789 });
790 }
791 }
792 }
793
794 #[test]
795 fn test_clean_raft_group() {
796 fn run_steps(steps: &[Option<(u64, u64)>]) {
797 let rid = 1;
798 let data = vec![b'x'; 1024];
799
800 for rewrite_step in 1..=steps.len() {
801 for exit_purge in [None, Some(1), Some(2)] {
802 let _guard = PanicGuard::with_prompt(format!(
803 "case: [{steps:?}, {rewrite_step}, {exit_purge:?}]",
804 ));
805 let dir = tempfile::Builder::new()
806 .prefix("test_clean_raft_group")
807 .tempdir()
808 .unwrap();
809 let cfg = Config {
810 dir: dir.path().to_str().unwrap().to_owned(),
811 target_file_size: ReadableSize(1),
812 ..Default::default()
813 };
814 let engine = RaftLogEngine::open_with_file_system(
815 cfg.clone(),
816 Arc::new(ObfuscatedFileSystem::default()),
817 )
818 .unwrap();
819
820 for (i, step) in steps.iter().enumerate() {
821 if let Some((start, end)) = *step {
822 engine.append(rid, start, end, Some(&data));
823 } else {
824 engine.clean(rid);
825 }
826 if i + 1 == rewrite_step {
827 engine
828 .purge_manager
829 .must_rewrite_append_queue(None, exit_purge);
830 }
831 }
832
833 let engine = engine.reopen();
834 if let Some((start, end)) = *steps.last().unwrap() {
835 engine.scan_entries(rid, start, end, |_, _, d| {
836 assert_eq!(d, &data);
837 });
838 } else {
839 assert!(engine.raft_groups().is_empty());
840 }
841
842 engine.purge_manager.must_rewrite_append_queue(None, None);
843 let engine = engine.reopen();
844 if let Some((start, end)) = *steps.last().unwrap() {
845 engine.scan_entries(rid, start, end, |_, _, d| {
846 assert_eq!(d, &data);
847 });
848 } else {
849 assert!(engine.raft_groups().is_empty());
850 }
851 }
852 }
853 }
854
855 run_steps(&[Some((1, 5)), None, Some((2, 6)), None, Some((3, 7)), None]);
856 run_steps(&[Some((1, 5)), None, Some((2, 6)), None, Some((3, 7))]);
857 run_steps(&[Some((1, 5)), None, Some((2, 6)), None]);
858 run_steps(&[Some((1, 5)), None, Some((2, 6))]);
859 run_steps(&[Some((1, 5)), None]);
860 }
861
862 #[test]
863 fn test_key_value_scan() {
864 fn key(i: u64) -> Vec<u8> {
865 format!("k{i}").as_bytes().to_vec()
866 }
867 fn value(i: u64) -> Vec<u8> {
868 format!("v{i}").as_bytes().to_vec()
869 }
870 fn rich_value(i: u64) -> RaftLocalState {
871 RaftLocalState {
872 last_index: i,
873 ..Default::default()
874 }
875 }
876
877 let dir = tempfile::Builder::new()
878 .prefix("test_key_value_scan")
879 .tempdir()
880 .unwrap();
881 let cfg = Config {
882 dir: dir.path().to_str().unwrap().to_owned(),
883 target_file_size: ReadableSize(1),
884 ..Default::default()
885 };
886 let rid = 1;
887 let engine =
888 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
889 .unwrap();
890
891 engine
892 .scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
893 panic!("unexpected message.");
894 })
895 .unwrap();
896
897 let mut batch = LogBatch::default();
898 let mut res = Vec::new();
899 let mut rich_res = Vec::new();
900 batch.put(rid, key(1), value(1)).unwrap();
901 batch.put(rid, key(2), value(2)).unwrap();
902 batch.put(rid, key(3), value(3)).unwrap();
903 engine.write(&mut batch, false).unwrap();
904
905 engine
906 .scan_raw_messages(rid, None, None, false, |k, v| {
907 res.push((k.to_vec(), v.to_vec()));
908 true
909 })
910 .unwrap();
911 assert_eq!(
912 res,
913 vec![(key(1), value(1)), (key(2), value(2)), (key(3), value(3))]
914 );
915 res.clear();
916 engine
917 .scan_raw_messages(rid, None, None, true, |k, v| {
918 res.push((k.to_vec(), v.to_vec()));
919 true
920 })
921 .unwrap();
922 assert_eq!(
923 res,
924 vec![(key(3), value(3)), (key(2), value(2)), (key(1), value(1))]
925 );
926 res.clear();
927 engine
928 .scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
929 panic!("unexpected message.")
930 })
931 .unwrap();
932
933 batch.put_message(rid, key(22), &rich_value(22)).unwrap();
934 batch.put_message(rid, key(33), &rich_value(33)).unwrap();
935 engine.write(&mut batch, false).unwrap();
936
937 engine
938 .scan_messages(rid, None, None, false, |k, v| {
939 rich_res.push((k.to_vec(), v));
940 false
941 })
942 .unwrap();
943 assert_eq!(rich_res, vec![(key(22), rich_value(22))]);
944 rich_res.clear();
945 engine
946 .scan_messages(rid, None, None, true, |k, v| {
947 rich_res.push((k.to_vec(), v));
948 false
949 })
950 .unwrap();
951 assert_eq!(rich_res, vec![(key(33), rich_value(33))]);
952 rich_res.clear();
953 }
954
955 #[test]
956 fn test_delete_key_value() {
957 let dir = tempfile::Builder::new()
958 .prefix("test_delete_key_value")
959 .tempdir()
960 .unwrap();
961 let cfg = Config {
962 dir: dir.path().to_str().unwrap().to_owned(),
963 target_file_size: ReadableSize(1),
964 ..Default::default()
965 };
966 let rid = 1;
967 let key = b"key".to_vec();
968 let (v1, v2) = (b"v1".to_vec(), b"v2".to_vec());
969 let mut batch_1 = LogBatch::default();
970 batch_1.put(rid, key.clone(), v1).unwrap();
971 let mut batch_2 = LogBatch::default();
972 batch_2.put(rid, key.clone(), v2.clone()).unwrap();
973 let mut delete_batch = LogBatch::default();
974 delete_batch.delete(rid, key.clone());
975
976 let engine =
977 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
978 .unwrap();
979 assert_eq!(
980 engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
981 None
982 );
983 assert_eq!(engine.get(rid, &key), None);
984
985 engine.write(&mut batch_1.clone(), true).unwrap();
988 assert!(engine.get_message::<RaftLocalState>(rid, &key).is_err());
989 engine.purge_manager.must_rewrite_append_queue(None, None);
990 engine.write(&mut delete_batch.clone(), true).unwrap();
991 let engine = engine.reopen();
992 assert_eq!(engine.get(rid, &key), None);
993 assert_eq!(
994 engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
995 None
996 );
997
998 engine.write(&mut batch_1.clone(), true).unwrap();
1000 engine
1001 .purge_manager
1002 .must_rewrite_append_queue(None, Some(2));
1003 engine.write(&mut delete_batch.clone(), true).unwrap();
1004 let engine = engine.reopen();
1005 assert_eq!(engine.get(rid, &key), None);
1006
1007 let engine = engine.reopen();
1020 engine.write(&mut batch_1.clone(), true).unwrap();
1021 engine.purge_manager.must_rewrite_append_queue(None, None);
1022 engine.write(&mut delete_batch.clone(), true).unwrap();
1023 engine.write(&mut batch_2.clone(), true).unwrap();
1024 let engine = engine.reopen();
1025 assert_eq!(engine.get(rid, &key).unwrap(), v2);
1026 engine.write(&mut batch_1.clone(), true).unwrap();
1028 engine
1029 .purge_manager
1030 .must_rewrite_append_queue(None, Some(2));
1031 engine.write(&mut delete_batch.clone(), true).unwrap();
1032 engine.write(&mut batch_2.clone(), true).unwrap();
1033 let engine = engine.reopen();
1034 assert_eq!(engine.get(rid, &key).unwrap(), v2);
1035
1036 let engine = engine.reopen();
1039 engine.write(&mut batch_1.clone(), true).unwrap();
1040 engine.write(&mut delete_batch.clone(), true).unwrap();
1041 engine.purge_manager.must_rewrite_append_queue(None, None);
1042 engine.write(&mut batch_2.clone(), true).unwrap();
1043 let engine = engine.reopen();
1044 assert_eq!(engine.get(rid, &key).unwrap(), v2);
1045 engine.write(&mut batch_1.clone(), true).unwrap();
1047 engine.write(&mut delete_batch.clone(), true).unwrap();
1048 engine
1049 .purge_manager
1050 .must_rewrite_append_queue(None, Some(2));
1051 engine.write(&mut batch_2.clone(), true).unwrap();
1052 let engine = engine.reopen();
1053 assert_eq!(engine.get(rid, &key).unwrap(), v2);
1054
1055 let engine = engine.reopen();
1058 engine.write(&mut batch_1.clone(), true).unwrap();
1059 engine.write(&mut delete_batch.clone(), true).unwrap();
1060 engine.write(&mut batch_2.clone(), true).unwrap();
1061 engine.purge_manager.must_rewrite_append_queue(None, None);
1062 let engine = engine.reopen();
1063 assert_eq!(engine.get(rid, &key).unwrap(), v2);
1064 let engine = engine.reopen();
1066 engine.write(&mut batch_1.clone(), true).unwrap();
1067 engine.write(&mut delete_batch.clone(), true).unwrap();
1068 engine.write(&mut batch_2.clone(), true).unwrap();
1069 engine
1070 .purge_manager
1071 .must_rewrite_append_queue(None, Some(2));
1072 let engine = engine.reopen();
1073 assert_eq!(engine.get(rid, &key).unwrap(), v2);
1074 }
1075
1076 #[test]
1077 fn test_compact_raft_group() {
1078 let dir = tempfile::Builder::new()
1079 .prefix("test_compact_raft_group")
1080 .tempdir()
1081 .unwrap();
1082 let cfg = Config {
1083 dir: dir.path().to_str().unwrap().to_owned(),
1084 target_file_size: ReadableSize(1),
1085 ..Default::default()
1086 };
1087 let engine =
1088 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1089 .unwrap();
1090 let data = vec![b'x'; 1024];
1091
1092 let mut rid = 7;
1095 engine.append(rid, 1, 10, Some(&data));
1096 engine
1098 .purge_manager
1099 .must_rewrite_append_queue(None, Some(2));
1100 let mut compact_log = LogBatch::default();
1101 compact_log.add_command(rid, Command::Compact { index: 5 });
1102 engine.write(&mut compact_log, true).unwrap();
1103 let engine = engine.reopen();
1104 engine.scan_entries(rid, 5, 10, |_, q, d| {
1105 assert_eq!(q, LogQueue::Append);
1106 assert_eq!(d, &data);
1107 });
1108 assert_eq!(engine.stats.live_entries(LogQueue::Append), 6); rid += 1;
1113 engine.append(rid, 5, 15, Some(&data));
1114 let mut compact_log = LogBatch::default();
1115 compact_log.add_command(rid, Command::Compact { index: 10 });
1116 engine.write(&mut compact_log, true).unwrap();
1117 engine.append(rid, 15, 25, Some(&data));
1118 engine
1120 .purge_manager
1121 .must_rewrite_append_queue(None, Some(2));
1122 let mut compact_log = LogBatch::default();
1124 compact_log.add_command(rid, Command::Compact { index: 20 });
1125 engine.memtables.apply_append_writes(compact_log.drain());
1126 engine.purge_manager.must_rewrite_rewrite_queue();
1127 let engine = engine.reopen();
1128 engine.scan_entries(rid, 10, 25, |_, q, d| {
1129 assert_eq!(q, LogQueue::Append);
1130 assert_eq!(d, &data);
1131 });
1132 assert_eq!(engine.stats.live_entries(LogQueue::Append), 22); engine.clean(rid - 1);
1134 assert_eq!(engine.stats.live_entries(LogQueue::Append), 16);
1135 engine
1138 .purge_manager
1139 .must_rewrite_append_queue(None, Some(2));
1140 let engine = engine.reopen();
1141 engine.scan_entries(rid, 10, 25, |_, q, d| {
1142 assert_eq!(q, LogQueue::Append);
1143 assert_eq!(d, &data);
1144 });
1145
1146 rid += 1;
1149 engine.append(rid, 5, 15, Some(&data));
1150 let mut compact_log = LogBatch::default();
1151 compact_log.add_command(rid, Command::Compact { index: 10 });
1152 engine.write(&mut compact_log, true).unwrap();
1153 engine.purge_manager.must_rewrite_append_queue(None, None);
1154 engine.append(rid, 15, 25, Some(&data));
1155 engine
1156 .purge_manager
1157 .must_rewrite_append_queue(None, Some(2));
1158 let mut compact_log = LogBatch::default();
1159 compact_log.add_command(rid, Command::Compact { index: 20 });
1160 engine.write(&mut compact_log, true).unwrap();
1161 let engine = engine.reopen();
1162 engine.scan_entries(rid, 20, 25, |_, q, d| {
1163 assert_eq!(q, LogQueue::Append);
1164 assert_eq!(d, &data);
1165 });
1166
1167 rid += 1;
1170 engine.append(rid, 1, 5, Some(&data));
1171 engine.purge_manager.must_rewrite_append_queue(None, None);
1172 engine.append(rid, 5, 15, Some(&data));
1173 let mut compact_log = LogBatch::default();
1174 compact_log.add_command(rid, Command::Compact { index: 10 });
1175 engine.write(&mut compact_log, true).unwrap();
1176 engine
1178 .purge_manager
1179 .must_rewrite_append_queue(None, Some(2));
1180 let engine = engine.reopen();
1181 engine.scan_entries(rid, 10, 15, |_, q, d| {
1182 assert_eq!(q, LogQueue::Append);
1183 assert_eq!(d, &data);
1184 });
1185 }
1186
1187 #[test]
1188 fn test_purge_triggered_by_compact() {
1189 let dir = tempfile::Builder::new()
1190 .prefix("test_purge_triggered_by_compact")
1191 .tempdir()
1192 .unwrap();
1193 let cfg = Config {
1194 dir: dir.path().to_str().unwrap().to_owned(),
1195 target_file_size: ReadableSize::kb(5),
1196 purge_threshold: ReadableSize::kb(150),
1197 ..Default::default()
1198 };
1199
1200 let engine =
1201 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1202 .unwrap();
1203 let data = vec![b'x'; 1024];
1204 for index in 0..100 {
1205 engine.append(1, index, index + 1, Some(&data));
1206 }
1207
1208 let count = engine.compact_to(1, 100);
1210 assert_eq!(count, 100);
1211 assert!(!engine
1212 .purge_manager
1213 .needs_rewrite_log_files(LogQueue::Append));
1214
1215 for index in 100..250 {
1217 engine.append(1, index, index + 1, Some(&data));
1218 }
1219
1220 assert_eq!(engine.compact_to(1, 101), 1);
1222 assert!(engine
1224 .purge_manager
1225 .needs_rewrite_log_files(LogQueue::Append));
1226
1227 let old_min_file_seq = engine.file_span(LogQueue::Append).0;
1228 let will_force_compact = engine.purge_expired_files().unwrap();
1229 let new_min_file_seq = engine.file_span(LogQueue::Append).0;
1230 assert!(new_min_file_seq > old_min_file_seq);
1232 assert!(will_force_compact.is_empty());
1234 assert!(engine.get_entry::<Entry>(1, 101).unwrap().is_some());
1236
1237 assert_eq!(engine.compact_to(1, 102), 1);
1238 assert!(engine
1240 .purge_manager
1241 .needs_rewrite_log_files(LogQueue::Append));
1242 let will_force_compact = engine.purge_expired_files().unwrap();
1243 assert!(!will_force_compact.is_empty());
1245 assert_eq!(will_force_compact[0], 1);
1246 }
1247
1248 #[test]
1249 fn test_purge_trigger_force_rewrite() {
1250 let dir = tempfile::Builder::new()
1251 .prefix("test_purge_trigger_force_write")
1252 .tempdir()
1253 .unwrap();
1254 let cfg = Config {
1255 dir: dir.path().to_str().unwrap().to_owned(),
1256 target_file_size: ReadableSize::kb(1),
1257 purge_threshold: ReadableSize::kb(10),
1258 ..Default::default()
1259 };
1260
1261 let engine =
1262 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1263 .unwrap();
1264 let data = vec![b'x'; 1024];
1265 for rid in 1..=3 {
1267 for index in 0..50 {
1268 engine.append(rid, index, index + 1, Some(&data[..10]));
1269 }
1270 }
1271 for rid in 4..=50 {
1273 engine.append(rid, 1, 2, Some(&data));
1274 }
1275
1276 let check_purge = |pending_regions: Vec<u64>| {
1277 let mut compact_regions = engine.purge_expired_files().unwrap();
1278 compact_regions.sort_unstable();
1280 assert_eq!(compact_regions, pending_regions);
1281 };
1282
1283 for _ in 0..9 {
1284 check_purge(vec![1, 2, 3]);
1285 }
1286
1287 check_purge(vec![1, 2, 3]);
1289 for rid in 1..=3 {
1290 let memtable = engine.memtables.get(rid).unwrap();
1291 assert_eq!(memtable.read().rewrite_count(), 50);
1292 }
1293
1294 for rid in 2..=50 {
1296 let last_idx = engine.last_index(rid).unwrap();
1297 engine.compact_to(rid, last_idx);
1298 engine.append(rid, last_idx, last_idx + 1, Some(&data));
1299 }
1300 check_purge(vec![1]);
1302 }
1303
1304 #[test]
1305 fn test_rewrite_and_recover() {
1306 let dir = tempfile::Builder::new()
1307 .prefix("test_rewrite_and_recover")
1308 .tempdir()
1309 .unwrap();
1310 let cfg = Config {
1311 dir: dir.path().to_str().unwrap().to_owned(),
1312 target_file_size: ReadableSize::kb(5),
1313 purge_threshold: ReadableSize::kb(80),
1314 ..Default::default()
1315 };
1316 let engine =
1317 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1318 .unwrap();
1319 let data = vec![b'x'; 1024];
1320
1321 for index in 1..=10 {
1323 for rid in 1..=10 {
1324 engine.append(rid, index, index + 1, Some(&data));
1325 }
1326 }
1327 engine.append(11, 1, 11, Some(&data));
1328
1329 assert!(engine
1331 .purge_manager
1332 .needs_rewrite_log_files(LogQueue::Append));
1333 assert!(engine.purge_expired_files().unwrap().is_empty());
1334 assert!(engine.file_span(LogQueue::Append).0 > 1);
1335
1336 let rewrite_file_size = engine.pipe_log.total_size(LogQueue::Rewrite);
1337 assert!(rewrite_file_size > 59); for rid in 1..=10 {
1341 engine.scan_entries(rid, 1, 11, |_, _, d| {
1342 assert_eq!(d, &data);
1343 });
1344 }
1345
1346 engine.clean(11);
1347 let cleaned_region_ids = engine.memtables.cleaned_region_ids();
1348 assert_eq!(cleaned_region_ids.len(), 1);
1349
1350 let engine = engine.reopen();
1351 assert_eq!(engine.memtables.cleaned_region_ids(), cleaned_region_ids);
1352
1353 for rid in 1..=10 {
1354 engine.scan_entries(rid, 1, 11, |_, _, d| {
1355 assert_eq!(d, &data);
1356 });
1357 }
1358
1359 for index in 11..=20 {
1361 for rid in 1..=10 {
1362 engine.append(rid, index, index + 1, Some(&data));
1363 }
1364 }
1365
1366 assert!(engine
1367 .purge_manager
1368 .needs_rewrite_log_files(LogQueue::Append));
1369 assert!(engine.purge_expired_files().unwrap().is_empty());
1370 }
1371
1372 #[test]
1373 fn test_empty_protobuf_message() {
1374 let dir = tempfile::Builder::new()
1375 .prefix("test_empty_protobuf_message")
1376 .tempdir()
1377 .unwrap();
1378 let cfg = Config {
1379 dir: dir.path().to_str().unwrap().to_owned(),
1380 ..Default::default()
1381 };
1382 let engine =
1383 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1384 .unwrap();
1385
1386 let mut log_batch = LogBatch::default();
1387 let empty_entry = Entry::new();
1388 assert_eq!(empty_entry.compute_size(), 0);
1389 log_batch
1390 .add_entries::<Entry>(0, &[empty_entry.clone()])
1391 .unwrap();
1392 engine.write(&mut log_batch, false).unwrap();
1393 let empty_state = RaftLocalState::new();
1394 assert_eq!(empty_state.compute_size(), 0);
1395 log_batch
1396 .put_message(1, b"key".to_vec(), &empty_state)
1397 .unwrap();
1398 engine.write(&mut log_batch, false).unwrap();
1399 log_batch
1400 .add_entries::<Entry>(2, &[empty_entry.clone()])
1401 .unwrap();
1402 log_batch
1403 .put_message(2, b"key".to_vec(), &empty_state)
1404 .unwrap();
1405 engine.write(&mut log_batch, true).unwrap();
1406
1407 let engine = engine.reopen();
1408 assert_eq!(
1409 engine.get_entry::<Entry>(0, 0).unwrap().unwrap(),
1410 empty_entry
1411 );
1412 assert_eq!(
1413 engine.get_entry::<Entry>(2, 0).unwrap().unwrap(),
1414 empty_entry
1415 );
1416 assert_eq!(
1417 engine
1418 .get_message::<RaftLocalState>(1, b"key")
1419 .unwrap()
1420 .unwrap(),
1421 empty_state
1422 );
1423 assert_eq!(
1424 engine
1425 .get_message::<RaftLocalState>(2, b"key")
1426 .unwrap()
1427 .unwrap(),
1428 empty_state
1429 );
1430 }
1431
1432 #[test]
1433 fn test_empty_batch() {
1434 let dir = tempfile::Builder::new()
1435 .prefix("test_empty_batch")
1436 .tempdir()
1437 .unwrap();
1438 let cfg = Config {
1439 dir: dir.path().to_str().unwrap().to_owned(),
1440 ..Default::default()
1441 };
1442 let engine =
1443 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1444 .unwrap();
1445 let data = vec![b'x'; 16];
1446 let cases = [[false, false], [false, true], [true, true]];
1447 for (i, writes) in cases.iter().enumerate() {
1448 let rid = i as u64;
1449 let mut batch = LogBatch::default();
1450 for &has_data in writes {
1451 if has_data {
1452 batch.put(rid, b"key".to_vec(), data.clone()).unwrap();
1453 }
1454 engine.write(&mut batch, true).unwrap();
1455 assert!(batch.is_empty());
1456 }
1457 }
1458 }
1459
1460 #[test]
1461 fn test_dirty_recovery() {
1462 let dir = tempfile::Builder::new()
1463 .prefix("test_dirty_recovery")
1464 .tempdir()
1465 .unwrap();
1466 let cfg = Config {
1467 dir: dir.path().to_str().unwrap().to_owned(),
1468 ..Default::default()
1469 };
1470 let engine =
1471 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1472 .unwrap();
1473 let data = vec![b'x'; 1024];
1474
1475 for rid in 1..21 {
1476 engine.append(rid, 1, 21, Some(&data));
1477 }
1478
1479 std::fs::create_dir(dir.path().join(Path::new("random_dir"))).unwrap();
1481 let _f = std::fs::File::create(dir.path().join(Path::new("random_file"))).unwrap();
1483
1484 let engine = engine.reopen();
1485 for rid in 1..21 {
1486 engine.scan_entries(rid, 1, 21, |_, _, d| {
1487 assert_eq!(d, &data);
1488 });
1489 }
1490 }
1491
1492 #[test]
1493 fn test_large_rewrite_batch() {
1494 let dir = tempfile::Builder::new()
1495 .prefix("test_large_rewrite_batch")
1496 .tempdir()
1497 .unwrap();
1498 let cfg = Config {
1499 dir: dir.path().to_str().unwrap().to_owned(),
1500 target_file_size: ReadableSize(1),
1501 ..Default::default()
1502 };
1503 let engine =
1504 RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1505 .unwrap();
1506 let data = vec![b'x'; 2 * 1024 * 1024];
1507
1508 for rid in 1..=3 {
1509 engine.append(rid, 1, 11, Some(&data));
1510 }
1511
1512 let old_active_file = engine.file_span(LogQueue::Append).1;
1513 engine.purge_manager.must_rewrite_append_queue(None, None);
1514 assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1);
1515 let old_active_file = engine.file_span(LogQueue::Rewrite).1;
1516 engine.purge_manager.must_rewrite_rewrite_queue();
1517 assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file);
1518
1519 for rid in engine.raft_groups() {
1520 let mut total = 0;
1521 engine
1522 .scan_raw_messages(rid, None, None, false, |k, _| {
1523 assert!(!crate::is_internal_key(k, None));
1524 total += 1;
1525 true
1526 })
1527 .unwrap();
1528 assert_eq!(total, 1);
1529 }
1530 assert_eq!(engine.raft_groups().len(), 3);
1531
1532 let engine = engine.reopen();
1533 for rid in 1..=3 {
1534 engine.scan_entries(rid, 1, 11, |_, _, d| {
1535 assert_eq!(d, &data);
1536 });
1537 }
1538 }
1539
1540 #[test]
1541 fn test_combination_of_version_and_recycle() {
1542 fn test_engine_ops(cfg_v1: &Config, cfg_v2: &Config) {
1543 let rid = 1;
1544 let data = vec![b'7'; 1024];
1545 {
1546 let engine = RaftLogEngine::open(cfg_v1.clone()).unwrap();
1548 engine.append(rid, 0, 20, Some(&data));
1549 let append_first = engine.file_span(LogQueue::Append).0;
1550 engine.compact_to(rid, 18);
1551 engine.purge_expired_files().unwrap();
1552 assert!(engine.file_span(LogQueue::Append).0 > append_first);
1553 assert_eq!(engine.first_index(rid).unwrap(), 18);
1554 assert_eq!(engine.last_index(rid).unwrap(), 19);
1555 }
1556 {
1557 let engine = RaftLogEngine::open(cfg_v2.clone()).unwrap();
1559 assert_eq!(engine.first_index(rid).unwrap(), 18);
1560 assert_eq!(engine.last_index(rid).unwrap(), 19);
1561 engine.append(rid, 20, 40, Some(&data));
1562 let append_first = engine.file_span(LogQueue::Append).0;
1563 engine.compact_to(rid, 38);
1564 engine.purge_expired_files().unwrap();
1565 assert!(engine.file_span(LogQueue::Append).0 > append_first);
1566 assert_eq!(engine.first_index(rid).unwrap(), 38);
1567 assert_eq!(engine.last_index(rid).unwrap(), 39);
1568 }
1569 {
1570 let engine = RaftLogEngine::open(cfg_v1.clone()).unwrap();
1572 assert_eq!(engine.first_index(rid).unwrap(), 38);
1573 assert_eq!(engine.last_index(rid).unwrap(), 39);
1574 }
1575 }
1576 {
1578 let dir = tempfile::Builder::new()
1579 .prefix("test_mutable_format_version")
1580 .tempdir()
1581 .unwrap();
1582 let cfg_v1 = Config {
1584 dir: dir.path().to_str().unwrap().to_owned(),
1585 target_file_size: ReadableSize(1),
1586 purge_threshold: ReadableSize(1),
1587 format_version: Version::V1,
1588 enable_log_recycle: false,
1589 ..Default::default()
1590 };
1591 let cfg_v2 = Config {
1593 dir: dir.path().to_str().unwrap().to_owned(),
1594 target_file_size: ReadableSize(1),
1595 purge_threshold: ReadableSize(1),
1596 format_version: Version::V2,
1597 enable_log_recycle: false,
1598 ..Default::default()
1599 };
1600 test_engine_ops(&cfg_v1, &cfg_v2);
1601 }
1602 {
1604 let dir = tempfile::Builder::new()
1605 .prefix("test_enable_log_recycle")
1606 .tempdir()
1607 .unwrap();
1608 let cfg_v1 = Config {
1610 dir: dir.path().to_str().unwrap().to_owned(),
1611 target_file_size: ReadableSize(1),
1612 purge_threshold: ReadableSize(1),
1613 format_version: Version::V1,
1614 enable_log_recycle: false,
1615 ..Default::default()
1616 };
1617 let cfg_v2 = Config {
1619 dir: dir.path().to_str().unwrap().to_owned(),
1620 target_file_size: ReadableSize(1),
1621 purge_threshold: ReadableSize(1),
1622 format_version: Version::V2,
1623 enable_log_recycle: true,
1624 prefill_for_recycle: true,
1625 ..Default::default()
1626 };
1627 test_engine_ops(&cfg_v1, &cfg_v2);
1628 }
1629 }
1630
1631 #[test]
1634 fn test_dump_file_or_directory() {
1635 let dir = tempfile::Builder::new()
1636 .prefix("test_dump_file_or_directory")
1637 .tempdir()
1638 .unwrap();
1639 let entry_data = vec![b'x'; 1024];
1640 let fs = Arc::new(ObfuscatedFileSystem::default());
1641
1642 let mut batches = vec![vec![LogBatch::default()]];
1643 let mut batch = LogBatch::default();
1644 batch
1645 .add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
1646 .unwrap();
1647 batch.add_command(7, Command::Clean);
1648 batch.put(7, b"key".to_vec(), b"value".to_vec()).unwrap();
1649 batch.delete(7, b"key2".to_vec());
1650 batches.push(vec![batch.clone()]);
1651 let mut batch2 = LogBatch::default();
1652 batch2.put(8, b"key3".to_vec(), b"value".to_vec()).unwrap();
1653 batch2
1654 .add_entries::<Entry>(8, &generate_entries(5, 15, Some(&entry_data)))
1655 .unwrap();
1656 batches.push(vec![batch, batch2]);
1657
1658 let cfg = Config {
1659 dir: dir.path().to_str().unwrap().to_owned(),
1660 ..Default::default()
1661 };
1662
1663 let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
1664 for bs in batches.iter_mut() {
1665 for batch in bs.iter_mut() {
1666 engine.write(batch, false).unwrap();
1667 }
1668
1669 engine.sync().unwrap();
1670 }
1671
1672 drop(engine);
1673 let dump_it = Engine::dump_with_file_system(dir.path(), fs.clone()).unwrap();
1676 let total = dump_it
1677 .inspect(|i| {
1678 i.as_ref().unwrap();
1679 })
1680 .count();
1681 assert!(total == 10);
1682
1683 let file_id = FileId {
1685 queue: LogQueue::Rewrite,
1686 seq: 1,
1687 };
1688 let dump_it = Engine::dump_with_file_system(
1689 file_id.build_file_path(dir.path()).as_path(),
1690 fs.clone(),
1691 )
1692 .unwrap();
1693 let total = dump_it
1694 .inspect(|i| {
1695 i.as_ref().unwrap();
1696 })
1697 .count();
1698 assert!(0 == total);
1699
1700 assert!(Engine::dump_with_file_system(Path::new("/not_exists_dir"), fs.clone()).is_err());
1702
1703 let mut not_exists_file = PathBuf::from(dir.as_ref());
1705 not_exists_file.push("not_exists_file");
1706 assert!(Engine::dump_with_file_system(not_exists_file.as_path(), fs).is_err());
1707 }
1708
1709 #[cfg(feature = "scripting")]
1710 #[test]
1711 fn test_repair_default() {
1712 let dir = tempfile::Builder::new()
1713 .prefix("test_repair_default")
1714 .tempdir()
1715 .unwrap();
1716 let entry_data = vec![b'x'; 128];
1717 let cfg = Config {
1718 dir: dir.path().to_str().unwrap().to_owned(),
1719 target_file_size: ReadableSize(1), ..Default::default()
1721 };
1722 let fs = Arc::new(ObfuscatedFileSystem::default());
1723
1724 let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1725 for rid in 1..=50 {
1726 engine.append(rid, 1, 6, Some(&entry_data));
1727 }
1728 for rid in 25..=50 {
1729 engine.append(rid, 6, 11, Some(&entry_data));
1730 }
1731 drop(engine);
1732
1733 let script1 = "".to_owned();
1734 RaftLogEngine::unsafe_repair_with_file_system(
1735 dir.path(),
1736 None, script1,
1738 fs.clone(),
1739 )
1740 .unwrap();
1741 let script2 = "
1742 fn filter_append(id, first, count, rewrite_count, queue, ifirst, ilast) {
1743 0
1744 }
1745 fn filter_compact(id, first, count, rewrite_count, queue, compact_to) {
1746 0
1747 }
1748 fn filter_clean(id, first, count, rewrite_count, queue) {
1749 0
1750 }
1751 "
1752 .to_owned();
1753 RaftLogEngine::unsafe_repair_with_file_system(
1754 dir.path(),
1755 None, script2,
1757 fs.clone(),
1758 )
1759 .unwrap();
1760
1761 let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1762 for rid in 1..25 {
1763 engine.scan_entries(rid, 1, 6, |_, _, d| {
1764 assert_eq!(d, &entry_data);
1765 });
1766 }
1767 for rid in 25..=50 {
1768 engine.scan_entries(rid, 1, 11, |_, _, d| {
1769 assert_eq!(d, &entry_data);
1770 });
1771 }
1772 }
1773
1774 #[cfg(feature = "scripting")]
1775 #[test]
1776 fn test_repair_discard_entries() {
1777 let dir = tempfile::Builder::new()
1778 .prefix("test_repair_discard")
1779 .tempdir()
1780 .unwrap();
1781 let entry_data = vec![b'x'; 128];
1782 let cfg = Config {
1783 dir: dir.path().to_str().unwrap().to_owned(),
1784 target_file_size: ReadableSize(1), ..Default::default()
1786 };
1787 let fs = Arc::new(ObfuscatedFileSystem::default());
1788
1789 let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1790 for rid in 1..=50 {
1791 engine.append(rid, 1, 6, Some(&entry_data));
1792 }
1793 for rid in 25..=50 {
1794 engine.append(rid, 6, 11, Some(&entry_data));
1795 }
1796 drop(engine);
1797
1798 let incoming_emptied = [1, 25];
1799 let existing_emptied = [2, 26];
1800 let script = "
1801 fn filter_append(id, first, count, rewrite_count, queue, ifirst, ilast) {
1802 if id == 1 {
1803 return 1;
1804 } else if id == 2 {
1805 return 2;
1806 } else if id == 25 {
1807 return 1;
1808 } else if id == 26 {
1809 return 2;
1810 }
1811 0 // default
1812 }
1813 "
1814 .to_owned();
1815 RaftLogEngine::unsafe_repair_with_file_system(
1816 dir.path(),
1817 None, script,
1819 fs.clone(),
1820 )
1821 .unwrap();
1822
1823 let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1824 for rid in 1..25 {
1825 if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) {
1826 continue;
1827 }
1828 engine.scan_entries(rid, 1, 6, |_, _, d| {
1829 assert_eq!(d, &entry_data);
1830 });
1831 }
1832 for rid in 25..=50 {
1833 if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) {
1834 continue;
1835 }
1836 engine.scan_entries(rid, 1, 11, |_, _, d| {
1837 assert_eq!(d, &entry_data);
1838 });
1839 }
1840 for rid in existing_emptied {
1841 let first_index = if rid < 25 { 1 } else { 6 };
1842 let last_index = if rid < 25 { 5 } else { 10 };
1843 engine.scan_entries(rid, first_index, last_index + 1, |_, _, d| {
1844 assert_eq!(d, &entry_data);
1845 });
1846 }
1847 for rid in incoming_emptied {
1848 let last_index = if rid < 25 { 5 } else { 10 };
1849 assert_eq!(engine.first_index(rid), None);
1850 assert_eq!(engine.last_index(rid), None);
1851 assert_eq!(engine.decode_last_index(rid), Some(last_index));
1852 }
1853 }
1854
1855 #[test]
1856 fn test_tail_corruption() {
1857 let dir = tempfile::Builder::new()
1858 .prefix("test_tail_corruption")
1859 .tempdir()
1860 .unwrap();
1861 let entry_data = vec![b'x'; 16];
1862 let cfg = Config {
1863 dir: dir.path().to_str().unwrap().to_owned(),
1864 target_file_size: ReadableSize::gb(10),
1866 ..Default::default()
1867 };
1868 let fs = Arc::new(ObfuscatedFileSystem::default());
1869
1870 let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1871 for rid in 1..=50 {
1872 engine.append(rid, 1, 6, Some(&entry_data));
1873 }
1874 for rid in 25..=50 {
1875 engine.append(rid, 6, 11, Some(&entry_data));
1876 }
1877 let (_, last_file_seq) = engine.file_span(LogQueue::Append);
1878 drop(engine);
1879
1880 let last_file = FileId {
1881 queue: LogQueue::Append,
1882 seq: last_file_seq,
1883 };
1884 let f = OpenOptions::new()
1885 .write(true)
1886 .open(last_file.build_file_path(dir.path()))
1887 .unwrap();
1888
1889 f.set_len(f.metadata().unwrap().len() - 1).unwrap();
1891 RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1892
1893 f.set_len(1).unwrap();
1895 RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1896 }
1897
1898 #[test]
1899 fn test_reopen_with_wrong_file_system() {
1900 let dir = tempfile::Builder::new()
1901 .prefix("test_reopen_with_wrong_file_system")
1902 .tempdir()
1903 .unwrap();
1904 let entry_data = vec![b'x'; 128];
1905 let cfg = Config {
1906 dir: dir.path().to_str().unwrap().to_owned(),
1907 target_file_size: ReadableSize(1),
1908 ..Default::default()
1909 };
1910 let fs = Arc::new(ObfuscatedFileSystem::default());
1911
1912 let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1913 for rid in 1..=10 {
1914 engine.append(rid, 1, 11, Some(&entry_data));
1915 }
1916 drop(engine);
1917
1918 assert!(RaftLogEngine::open(cfg.clone()).is_err());
1919
1920 let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1921 for rid in 1..10 {
1922 engine.scan_entries(rid, 1, 11, |_, _, d| {
1923 assert_eq!(d, &entry_data);
1924 });
1925 }
1926 }
1927
1928 #[cfg(feature = "nightly")]
1929 #[bench]
1930 fn bench_engine_fetch_entries(b: &mut test::Bencher) {
1931 use rand::{thread_rng, Rng};
1932
1933 let dir = tempfile::Builder::new()
1934 .prefix("bench_engine_fetch_entries")
1935 .tempdir()
1936 .unwrap();
1937 let entry_data = vec![b'x'; 1024];
1938 let cfg = Config {
1939 dir: dir.path().to_str().unwrap().to_owned(),
1940 ..Default::default()
1941 };
1942 let engine = RaftLogEngine::open(cfg).unwrap();
1943 for i in 0..10 {
1944 for rid in 1..=100 {
1945 engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
1946 }
1947 }
1948 let mut vec: Vec<Entry> = Vec::new();
1949 b.iter(move || {
1950 let region_id = thread_rng().gen_range(1..=100);
1951 engine
1952 .fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
1953 .unwrap();
1954 vec.clear();
1955 });
1956 }
1957
1958 #[test]
1959 fn test_engine_is_empty() {
1960 let dir = tempfile::Builder::new()
1961 .prefix("test_engine_is_empty")
1962 .tempdir()
1963 .unwrap();
1964 let entry_data = vec![b'x'; 128];
1965 let cfg = Config {
1966 dir: dir.path().to_str().unwrap().to_owned(),
1967 ..Default::default()
1968 };
1969 let fs = Arc::new(ObfuscatedFileSystem::default());
1970 let rid = 1;
1971
1972 let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1973 assert!(engine.is_empty());
1974 engine.append(rid, 1, 11, Some(&entry_data));
1975 assert!(!engine.is_empty());
1976
1977 let mut log_batch = LogBatch::default();
1978 log_batch.add_command(rid, Command::Compact { index: 11 });
1979 log_batch.delete(rid, b"last_index".to_vec());
1980 engine.write(&mut log_batch, true).unwrap();
1981 assert!(!engine.is_empty());
1982
1983 engine.clean(rid);
1984 assert!(engine.is_empty());
1985 }
1986
1987 pub struct DeleteMonitoredFileSystem {
1988 inner: ObfuscatedFileSystem,
1989 append_metadata: Mutex<BTreeSet<u64>>,
1990 reserved_metadata: Mutex<BTreeSet<u64>>,
1991 }
1992
1993 impl DeleteMonitoredFileSystem {
1994 fn new() -> Self {
1995 Self {
1996 inner: ObfuscatedFileSystem::default(),
1997 append_metadata: Mutex::new(BTreeSet::new()),
1998 reserved_metadata: Mutex::new(BTreeSet::new()),
1999 }
2000 }
2001
2002 fn update_metadata(&self, path: &Path, delete: bool) -> bool {
2003 let path = path.file_name().unwrap().to_str().unwrap();
2004 let parse_append = FileId::parse_file_name(path);
2005 let parse_reserved = parse_reserved_file_name(path);
2006 match (parse_append, parse_reserved) {
2007 (Some(id), None) if id.queue == LogQueue::Append => {
2008 if delete {
2009 self.append_metadata.lock().unwrap().remove(&id.seq)
2010 } else {
2011 self.append_metadata.lock().unwrap().insert(id.seq)
2012 }
2013 }
2014 (None, Some(seq)) => {
2015 if delete {
2016 self.reserved_metadata.lock().unwrap().remove(&seq)
2017 } else {
2018 self.reserved_metadata.lock().unwrap().insert(seq)
2019 }
2020 }
2021 _ => false,
2022 }
2023 }
2024 }
2025
2026 impl FileSystem for DeleteMonitoredFileSystem {
2027 type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
2028 type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
2029 type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
2030
2031 fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
2032 let handle = self.inner.create(&path)?;
2033 self.update_metadata(path.as_ref(), false);
2034 Ok(handle)
2035 }
2036
2037 fn open<P: AsRef<Path>>(&self, path: P, perm: Permission) -> std::io::Result<Self::Handle> {
2038 let handle = self.inner.open(&path, perm)?;
2039 self.update_metadata(path.as_ref(), false);
2040 Ok(handle)
2041 }
2042
2043 fn delete<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
2044 self.inner.delete(&path)?;
2045 self.update_metadata(path.as_ref(), true);
2046 Ok(())
2047 }
2048
2049 fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
2050 self.inner.rename(src_path.as_ref(), dst_path.as_ref())?;
2051 self.update_metadata(src_path.as_ref(), true);
2052 self.update_metadata(dst_path.as_ref(), false);
2053 Ok(())
2054 }
2055
2056 fn reuse<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
2057 self.inner.reuse(src_path.as_ref(), dst_path.as_ref())?;
2058 self.update_metadata(src_path.as_ref(), true);
2059 self.update_metadata(dst_path.as_ref(), false);
2060 Ok(())
2061 }
2062
2063 fn delete_metadata<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
2064 self.inner.delete_metadata(&path)?;
2065 self.update_metadata(path.as_ref(), true);
2066 Ok(())
2067 }
2068
2069 fn exists_metadata<P: AsRef<Path>>(&self, path: P) -> bool {
2070 if self.inner.exists_metadata(&path) {
2071 return true;
2072 }
2073 let path = path.as_ref().file_name().unwrap().to_str().unwrap();
2074 let parse_append = FileId::parse_file_name(path);
2075 let parse_reserved = parse_reserved_file_name(path);
2076 match (parse_append, parse_reserved) {
2077 (Some(id), None) if id.queue == LogQueue::Append => {
2078 self.append_metadata.lock().unwrap().contains(&id.seq)
2079 }
2080 (None, Some(seq)) => self.reserved_metadata.lock().unwrap().contains(&seq),
2081 _ => false,
2082 }
2083 }
2084
2085 fn new_reader(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Reader> {
2086 self.inner.new_reader(h)
2087 }
2088
2089 fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
2090 self.inner.new_writer(h)
2091 }
2092 }
2093
2094 #[test]
2095 fn test_managed_file_deletion() {
2096 let dir = tempfile::Builder::new()
2097 .prefix("test_managed_file_deletion")
2098 .tempdir()
2099 .unwrap();
2100 let entry_data = vec![b'x'; 128];
2101 let cfg = Config {
2102 dir: dir.path().to_str().unwrap().to_owned(),
2103 target_file_size: ReadableSize(1),
2104 purge_threshold: ReadableSize(1),
2105 enable_log_recycle: false,
2106 ..Default::default()
2107 };
2108 let fs = Arc::new(DeleteMonitoredFileSystem::new());
2109 let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
2110 for rid in 1..=10 {
2111 engine.append(rid, 1, 11, Some(&entry_data));
2112 }
2113 for rid in 1..=5 {
2114 engine.clean(rid);
2115 }
2116 let (start, _) = engine.file_span(LogQueue::Append);
2117 engine.purge_expired_files().unwrap();
2118 assert!(start < engine.file_span(LogQueue::Append).0);
2120 assert_eq!(engine.file_count(None), fs.inner.file_count());
2122 let start = engine.file_span(LogQueue::Append).0;
2123 assert_eq!(
2125 fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2126 &start
2127 );
2128
2129 let engine = engine.reopen();
2130 assert_eq!(engine.file_count(None), fs.inner.file_count());
2131 let (start, _) = engine.file_span(LogQueue::Append);
2132 assert_eq!(
2133 fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2134 &start
2135 );
2136
2137 for i in start / 2..start {
2139 fs.append_metadata.lock().unwrap().insert(i);
2140 }
2141 let engine = engine.reopen();
2142 let (start, _) = engine.file_span(LogQueue::Append);
2143 assert_eq!(
2144 fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2145 &start
2146 );
2147 }
2148
2149 #[test]
2150 fn test_managed_file_reuse() {
2151 let dir = tempfile::Builder::new()
2152 .prefix("test_managed_file_reuse")
2153 .tempdir()
2154 .unwrap();
2155 let entry_data = vec![b'x'; 16];
2156 let cfg = Config {
2157 dir: dir.path().to_str().unwrap().to_owned(),
2158 target_file_size: ReadableSize(1),
2159 purge_threshold: ReadableSize(50),
2160 format_version: Version::V2,
2161 enable_log_recycle: true,
2162 prefill_for_recycle: true,
2163 ..Default::default()
2164 };
2165 let recycle_capacity = cfg.recycle_capacity() as u64;
2166 let fs = Arc::new(DeleteMonitoredFileSystem::new());
2167 let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
2168
2169 let reserved_start = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2170 for rid in 1..=10 {
2171 engine.append(rid, 1, 11, Some(&entry_data));
2172 }
2173 for rid in 1..=10 {
2174 engine.clean(rid);
2175 }
2176 engine.purge_manager.must_rewrite_append_queue(None, None);
2178 assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
2179 let reserved_start_1 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2181 assert!(reserved_start < reserved_start_1);
2182 for rid in 1..=5 {
2184 engine.append(rid, 1, 11, Some(&entry_data));
2185 }
2186 let reserved_start_2 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2187 assert!(reserved_start_1 < reserved_start_2);
2188
2189 let file_count = fs.inner.file_count();
2190 let start_1 = *fs.append_metadata.lock().unwrap().first().unwrap();
2191 let engine = engine.reopen();
2192 assert_eq!(file_count, fs.inner.file_count());
2195 let start_2 = *fs.append_metadata.lock().unwrap().first().unwrap();
2196 assert!(start_1 < start_2);
2197 let reserved_start_3 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2198 assert_eq!(reserved_start_2, reserved_start_3);
2199
2200 for rid in 1..=recycle_capacity {
2202 engine.append(rid, 1, 11, Some(&entry_data));
2203 }
2204 assert!(fs.reserved_metadata.lock().unwrap().is_empty());
2205 for rid in 1..=recycle_capacity {
2206 engine.clean(rid);
2207 }
2208 engine.purge_manager.must_rewrite_append_queue(None, None);
2209 engine.append(1, 1, 11, Some(&entry_data));
2211 assert_eq!(engine.file_count(Some(LogQueue::Append)), 2);
2212 let start_3 = *fs.append_metadata.lock().unwrap().first().unwrap();
2213 assert!(start_2 < start_3);
2214 }
2215
2216 #[test]
2217 fn test_simple_write_perf_context() {
2218 let dir = tempfile::Builder::new()
2219 .prefix("test_simple_write_perf_context")
2220 .tempdir()
2221 .unwrap();
2222 let cfg = Config {
2223 dir: dir.path().to_str().unwrap().to_owned(),
2224 ..Default::default()
2225 };
2226 let rid = 1;
2227 let entry_size = 5120;
2228 let engine = RaftLogEngine::open(cfg).unwrap();
2229 let data = vec![b'x'; entry_size];
2230 let old_perf_context = get_perf_context();
2231 engine.append(rid, 1, 5, Some(&data));
2232 let new_perf_context = get_perf_context();
2233 assert_ne!(
2234 old_perf_context.log_populating_duration,
2235 new_perf_context.log_populating_duration
2236 );
2237 assert_ne!(
2238 old_perf_context.log_write_duration,
2239 new_perf_context.log_write_duration
2240 );
2241 assert_ne!(
2242 old_perf_context.apply_duration,
2243 new_perf_context.apply_duration
2244 );
2245 }
2246
2247 #[test]
2248 fn test_recycle_no_signing_files() {
2249 let dir = tempfile::Builder::new()
2250 .prefix("test_recycle_no_signing_files")
2251 .tempdir()
2252 .unwrap();
2253 let entry_data = vec![b'x'; 128];
2254 let fs = Arc::new(DeleteMonitoredFileSystem::new());
2255 let cfg_v1 = Config {
2256 dir: dir.path().to_str().unwrap().to_owned(),
2257 target_file_size: ReadableSize(1),
2258 purge_threshold: ReadableSize(1024),
2259 format_version: Version::V1,
2260 enable_log_recycle: false,
2261 ..Default::default()
2262 };
2263 let cfg_v2 = Config {
2264 dir: dir.path().to_str().unwrap().to_owned(),
2265 target_file_size: ReadableSize(1),
2266 purge_threshold: ReadableSize(15),
2267 format_version: Version::V2,
2268 enable_log_recycle: true,
2269 prefill_for_recycle: false,
2270 ..Default::default()
2271 };
2272 assert!(cfg_v2.recycle_capacity() > 0);
2273 {
2275 let engine = RaftLogEngine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap();
2276 for rid in 1..=10 {
2277 engine.append(rid, 1, 11, Some(&entry_data));
2278 }
2279 }
2280 {
2282 let engine = RaftLogEngine::open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap();
2283 let (start, _) = engine.file_span(LogQueue::Append);
2284 for rid in 6..=10 {
2285 engine.append(rid, 11, 20, Some(&entry_data));
2286 }
2287 engine.clean(6);
2289 engine.purge_expired_files().unwrap();
2291 assert_eq!(engine.file_count(Some(LogQueue::Append)), 5);
2292 assert!(start < engine.file_span(LogQueue::Append).0);
2293 }
2294 {
2296 let engine = RaftLogEngine::open_with_file_system(cfg_v1, fs.clone()).unwrap();
2297 let (start, _) = engine.file_span(LogQueue::Append);
2298 for rid in 6..=10 {
2299 engine.append(rid, 20, 30, Some(&entry_data));
2300 }
2301 for rid in 6..=10 {
2302 engine.append(rid, 30, 40, Some(&entry_data));
2303 }
2304 for rid in 1..=5 {
2305 engine.append(rid, 11, 20, Some(&entry_data));
2306 }
2307 assert_eq!(engine.file_span(LogQueue::Append).0, start);
2308 let file_count = engine.file_count(Some(LogQueue::Append));
2309 drop(engine);
2310 let engine = RaftLogEngine::open_with_file_system(cfg_v2, fs).unwrap();
2311 assert_eq!(engine.file_span(LogQueue::Append).0, start);
2312 assert_eq!(engine.file_count(Some(LogQueue::Append)), file_count);
2313 for rid in 1..=10 {
2315 engine.clean(rid);
2316 }
2317 let (start, _) = engine.file_span(LogQueue::Append);
2318 engine.purge_expired_files().unwrap();
2320 assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
2321 assert!(engine.file_span(LogQueue::Append).0 > start);
2322 }
2323 }
2324
2325 #[test]
2326 fn test_start_engine_with_resize_recycle_capacity() {
2327 let dir = tempfile::Builder::new()
2328 .prefix("test_start_engine_with_resize_recycle_capacity")
2329 .tempdir()
2330 .unwrap();
2331 let path = dir.path().to_str().unwrap();
2332 let file_system = Arc::new(DeleteMonitoredFileSystem::new());
2333 let entry_data = vec![b'x'; 512];
2334
2335 let cfg = Config {
2337 dir: path.to_owned(),
2338 enable_log_recycle: false,
2339 ..Default::default()
2340 };
2341 let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap();
2342 let (start, _) = engine.file_span(LogQueue::Append);
2343 assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
2345 assert_eq!(file_system.inner.file_count(), engine.file_count(None));
2346 for rid in 1..=5 {
2348 engine.append(rid, 1, 10, Some(&entry_data));
2349 }
2350 assert_eq!(engine.file_span(LogQueue::Append).0, start);
2351 assert_eq!(file_system.inner.file_count(), engine.file_count(None));
2352 drop(engine);
2353
2354 let cfg = Config {
2356 dir: path.to_owned(),
2357 target_file_size: ReadableSize(1),
2358 purge_threshold: ReadableSize(80), enable_log_recycle: true,
2360 prefill_for_recycle: true,
2361 ..Default::default()
2362 };
2363 let engine =
2364 RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
2365 let (start, end) = engine.file_span(LogQueue::Append);
2366 assert_eq!(start, end);
2368 let recycled_count = file_system.inner.file_count() - engine.file_count(None);
2369 assert!(recycled_count > 0);
2370 for rid in 1..=5 {
2372 engine.append(rid, 10, 20, Some(&entry_data));
2373 }
2374 assert_eq!(engine.file_span(LogQueue::Append).0, start);
2375 assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
2376 let (start, end) = engine.file_span(LogQueue::Append);
2377 let recycled_count = file_system.inner.file_count() - engine.file_count(None);
2378 drop(engine);
2379
2380 let cfg_v2 = Config {
2383 target_file_size: ReadableSize(1),
2384 purge_threshold: ReadableSize(50),
2385 ..cfg
2386 };
2387 let engine =
2388 RaftLogEngine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap();
2389 assert_eq!(engine.file_span(LogQueue::Append), (start, end));
2390 assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
2391 engine.purge_expired_files().unwrap();
2394 assert_eq!(engine.file_span(LogQueue::Append), (start, end));
2395 for rid in 1..=10 {
2396 engine.append(rid, 20, 31, Some(&entry_data));
2397 }
2398 assert!(engine.file_span(LogQueue::Append).1 > end);
2399 let engine = engine.reopen();
2400 assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
2401 drop(engine);
2402
2403 let cfg_v3 = Config {
2406 target_file_size: ReadableSize::kb(2),
2407 purge_threshold: ReadableSize::kb(100),
2408 enable_log_recycle: false,
2409 prefill_for_recycle: false,
2410 ..cfg_v2
2411 };
2412 let engine = RaftLogEngine::open_with_file_system(cfg_v3, file_system.clone()).unwrap();
2413 assert_eq!(file_system.inner.file_count(), engine.file_count(None));
2414 }
2415
2416 #[test]
2417 fn test_rewrite_atomic_group() {
2418 let dir = tempfile::Builder::new()
2419 .prefix("test_rewrite_atomic_group")
2420 .tempdir()
2421 .unwrap();
2422 let cfg = Config {
2423 dir: dir.path().to_str().unwrap().to_owned(),
2424 recovery_threads: 100,
2426 target_file_size: ReadableSize(1),
2427 ..Default::default()
2428 };
2429 let fs = Arc::new(ObfuscatedFileSystem::default());
2430 let key = vec![b'x'; 2];
2431 let value = vec![b'y'; 8];
2432
2433 let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
2434 let mut data = HashSet::new();
2435 let mut rid = 1;
2436 let mut log_batch = LogBatch::default();
2438 let flush = |lb: &mut LogBatch| {
2439 lb.finish_populate(0, None).unwrap();
2440 engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap();
2441 lb.drain();
2442 };
2443 {
2444 let mut builder = AtomicGroupBuilder::with_id(3);
2446 builder.begin(&mut log_batch);
2447 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2448 flush(&mut log_batch);
2449 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2450 }
2451 {
2452 let mut builder = AtomicGroupBuilder::with_id(3);
2454 builder.begin(&mut log_batch);
2455 rid += 1;
2456 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2457 data.insert(rid);
2458 flush(&mut log_batch);
2459 rid += 1;
2461 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2462 data.insert(rid);
2463 flush(&mut log_batch);
2464 builder.end(&mut log_batch);
2465 rid += 1;
2466 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2467 data.insert(rid);
2468 flush(&mut log_batch);
2469 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2470 }
2471 {
2472 let mut builder = AtomicGroupBuilder::with_id(3);
2474 builder.begin(&mut log_batch);
2475 rid += 1;
2476 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2477 data.insert(rid);
2478 flush(&mut log_batch);
2479 builder.add(&mut log_batch);
2480 rid += 1;
2481 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2482 data.insert(rid);
2483 flush(&mut log_batch);
2484 builder.add(&mut log_batch);
2485 rid += 1;
2486 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2487 data.insert(rid);
2488 flush(&mut log_batch);
2489 builder.end(&mut log_batch);
2490 rid += 1;
2491 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2492 data.insert(rid);
2493 flush(&mut log_batch);
2494 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2495 }
2496 {
2497 let mut builder = AtomicGroupBuilder::with_id(3);
2499 builder.begin(&mut log_batch);
2500 rid += 1;
2501 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2502 flush(&mut log_batch);
2503 let mut builder = AtomicGroupBuilder::with_id(3);
2504 builder.begin(&mut log_batch);
2505 rid += 1;
2506 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2507 data.insert(rid);
2508 flush(&mut log_batch);
2509 builder.end(&mut log_batch);
2510 rid += 1;
2511 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2512 data.insert(rid);
2513 flush(&mut log_batch);
2514 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2515 }
2516 {
2517 let mut builder = AtomicGroupBuilder::with_id(4);
2521 builder.begin(&mut LogBatch::default());
2522 builder.end(&mut log_batch);
2523 rid += 1;
2524 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2525 flush(&mut log_batch);
2526 let mut builder = AtomicGroupBuilder::with_id(4);
2527 builder.begin(&mut LogBatch::default());
2528 builder.add(&mut log_batch);
2529 rid += 1;
2530 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2531 flush(&mut log_batch);
2532 builder.end(&mut log_batch);
2533 rid += 1;
2534 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2535 flush(&mut log_batch);
2536 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2537 }
2538 {
2539 let mut builder = AtomicGroupBuilder::with_id(5);
2541 builder.begin(&mut LogBatch::default());
2542 builder.end(&mut log_batch);
2543 rid += 1;
2544 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2545 flush(&mut log_batch);
2546 let mut builder = AtomicGroupBuilder::with_id(5);
2547 builder.begin(&mut log_batch);
2548 rid += 1;
2549 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2550 data.insert(rid);
2551 flush(&mut log_batch);
2552 builder.end(&mut log_batch);
2553 rid += 1;
2554 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2555 data.insert(rid);
2556 flush(&mut log_batch);
2557 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2558 }
2559 {
2560 let mut builder = AtomicGroupBuilder::with_id(6);
2562 builder.begin(&mut log_batch);
2563 rid += 1;
2564 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2565 data.insert(rid);
2566 flush(&mut log_batch);
2567 builder.end(&mut log_batch);
2568 flush(&mut log_batch);
2569 let mut builder = AtomicGroupBuilder::with_id(7);
2570 builder.begin(&mut log_batch);
2571 flush(&mut log_batch);
2572 builder.end(&mut log_batch);
2573 rid += 1;
2574 log_batch.put(rid, key.clone(), value.clone()).unwrap();
2575 data.insert(rid);
2576 flush(&mut log_batch);
2577 engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2578 }
2579 engine.pipe_log.sync(LogQueue::Rewrite).unwrap();
2580
2581 let engine = engine.reopen();
2582 for rid in engine.raft_groups() {
2583 assert!(data.remove(&rid), "{}", rid);
2584 assert_eq!(engine.get(rid, &key).unwrap(), value);
2585 }
2586 assert!(data.is_empty(), "data loss {:?}", data);
2587 }
2588
2589 #[test]
2590 fn test_internal_key_filter() {
2591 let dir = tempfile::Builder::new()
2592 .prefix("test_internal_key_filter")
2593 .tempdir()
2594 .unwrap();
2595 let cfg = Config {
2596 dir: dir.path().to_str().unwrap().to_owned(),
2597 ..Default::default()
2598 };
2599 let fs = Arc::new(ObfuscatedFileSystem::default());
2600 let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
2601 let value = vec![b'y'; 8];
2602 let mut log_batch = LogBatch::default();
2603 log_batch.put_unchecked(1, crate::make_internal_key(&[1]), value.clone());
2604 log_batch.put_unchecked(2, crate::make_internal_key(&[1]), value.clone());
2605 engine.write(&mut log_batch, false).unwrap();
2606 assert!(engine.raft_groups().is_empty());
2608
2609 let engine = engine.reopen();
2610 assert!(engine.raft_groups().is_empty());
2612
2613 log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone());
2614 log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value);
2615 log_batch.finish_populate(0, None).unwrap();
2616 let block_handle = engine
2617 .pipe_log
2618 .append(LogQueue::Rewrite, &mut log_batch)
2619 .unwrap();
2620 log_batch.finish_write(block_handle);
2621 engine
2622 .memtables
2623 .apply_rewrite_writes(log_batch.drain(), None, 0);
2624 assert!(engine.raft_groups().is_empty());
2626
2627 let engine = engine.reopen();
2628 assert!(engine.raft_groups().is_empty());
2630 }
2631
2632 #[test]
2633 fn test_start_engine_with_multi_dirs() {
2634 let dir = tempfile::Builder::new()
2635 .prefix("test_start_engine_with_multi_dirs_default")
2636 .tempdir()
2637 .unwrap();
2638 let spill_dir = tempfile::Builder::new()
2639 .prefix("test_start_engine_with_multi_dirs_spill")
2640 .tempdir()
2641 .unwrap();
2642 fn number_of_files(p: &Path) -> usize {
2643 let mut r = 0;
2644 std::fs::read_dir(p).unwrap().for_each(|e| {
2645 if e.unwrap()
2646 .path()
2647 .file_name()
2648 .unwrap()
2649 .to_str()
2650 .unwrap()
2651 .starts_with("000")
2652 {
2653 r += 1;
2654 }
2655 });
2656 r
2657 }
2658 let file_system = Arc::new(DeleteMonitoredFileSystem::new());
2659 let entry_data = vec![b'x'; 512];
2660
2661 let cfg = Config {
2663 dir: dir.path().to_str().unwrap().to_owned(),
2664 spill_dir: Some(spill_dir.path().to_str().unwrap().to_owned()),
2665 enable_log_recycle: false,
2666 target_file_size: ReadableSize(1),
2667 ..Default::default()
2668 };
2669 {
2670 let engine =
2672 RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
2673 for rid in 1..=10 {
2674 engine.append(rid, 1, 10, Some(&entry_data));
2675 }
2676 drop(engine);
2677
2678 let mut moved = 0;
2681 for e in std::fs::read_dir(dir.path()).unwrap() {
2682 let p = e.unwrap().path();
2683 let file_name = p.file_name().unwrap().to_str().unwrap();
2684 if let Some(FileId {
2685 queue: LogQueue::Append,
2686 seq: _,
2687 }) = FileId::parse_file_name(file_name)
2688 {
2689 file_system
2690 .rename(&p, &spill_dir.path().join(file_name))
2691 .unwrap();
2692 moved += 1;
2693 if moved == 4 {
2694 break;
2695 }
2696 }
2697 }
2698 }
2699
2700 let cfg_2 = Config {
2703 enable_log_recycle: true,
2704 prefill_for_recycle: true,
2705 purge_threshold: ReadableSize(40),
2706 ..cfg.clone()
2707 };
2708 let recycle_capacity = cfg_2.recycle_capacity() as u64;
2709 let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap();
2710 assert!(number_of_files(spill_dir.path()) > 0);
2711 for rid in 1..=10 {
2712 assert_eq!(engine.first_index(rid).unwrap(), 1);
2713 engine.clean(rid);
2714 }
2715 engine.purge_manager.must_rewrite_append_queue(None, None);
2716 let file_count = file_system.inner.file_count();
2717 assert_eq!(
2718 number_of_files(spill_dir.path()) + number_of_files(dir.path()),
2719 file_count
2720 );
2721 assert!(file_count > engine.file_count(None));
2722 for rid in 1..=recycle_capacity - 10 {
2724 engine.append(rid, 20, 30, Some(&entry_data));
2725 }
2726 assert_eq!(file_count, file_system.inner.file_count());
2728 assert!(number_of_files(spill_dir.path()) > 0);
2729
2730 let cfg_3 = Config {
2731 enable_log_recycle: false,
2732 purge_threshold: ReadableSize(40),
2733 ..cfg
2734 };
2735 drop(engine);
2736 let engine = RaftLogEngine::open_with_file_system(cfg_3, file_system).unwrap();
2737 assert!(number_of_files(spill_dir.path()) > 0);
2738 for rid in 1..=10 {
2739 assert_eq!(engine.first_index(rid).unwrap(), 20);
2740 }
2741
2742 {
2744 let mut file_count = 0;
2746 for e in std::fs::read_dir(spill_dir.path()).unwrap() {
2747 let p = e.unwrap().path();
2748 let file_name = p.file_name().unwrap().to_str().unwrap();
2749 if let Some(FileId {
2750 queue: LogQueue::Append,
2751 seq: _,
2752 }) = FileId::parse_file_name(file_name)
2753 {
2754 if file_count % 2 == 0 {
2755 std::fs::copy(&p, dir.path().join(file_name)).unwrap();
2756 }
2757 file_count += 1;
2758 }
2759 }
2760 }
2761 let start = engine.file_span(LogQueue::Append).0;
2762 let engine = engine.reopen();
2763 assert!(engine.file_span(LogQueue::Append).0 > start);
2765 }
2766}