raft_engine/
engine.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use std::cell::{Cell, RefCell};
4use std::marker::PhantomData;
5use std::path::Path;
6use std::sync::{mpsc, Arc, Mutex};
7use std::thread::{Builder as ThreadBuilder, JoinHandle};
8use std::time::{Duration, Instant};
9
10use log::{error, info};
11use protobuf::{parse_from_bytes, Message};
12
13use crate::config::{Config, RecoveryMode};
14use crate::consistency::ConsistencyChecker;
15use crate::env::{DefaultFileSystem, FileSystem};
16use crate::event_listener::EventListener;
17use crate::file_pipe_log::debug::LogItemReader;
18use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
19use crate::log_batch::{Command, LogBatch, MessageExt};
20use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
21use crate::metrics::*;
22use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
23use crate::purge::{PurgeHook, PurgeManager};
24use crate::write_barrier::{WriteBarrier, Writer};
25use crate::{perf_context, Error, GlobalStats, Result};
26
27const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
28/// Max times for `write`.
29const MAX_WRITE_ATTEMPT: u64 = 2;
30
31pub struct Engine<F = DefaultFileSystem, P = FilePipeLog<F>>
32where
33    F: FileSystem,
34    P: PipeLog,
35{
36    cfg: Arc<Config>,
37    listeners: Vec<Arc<dyn EventListener>>,
38
39    #[allow(dead_code)]
40    stats: Arc<GlobalStats>,
41    memtables: MemTables,
42    pipe_log: Arc<P>,
43    purge_manager: PurgeManager<P>,
44
45    write_barrier: WriteBarrier<LogBatch, Result<FileBlockHandle>>,
46
47    tx: Mutex<mpsc::Sender<()>>,
48    metrics_flusher: Option<JoinHandle<()>>,
49
50    _phantom: PhantomData<F>,
51}
52
53impl Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>> {
54    pub fn open(cfg: Config) -> Result<Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>>> {
55        Self::open_with_listeners(cfg, vec![])
56    }
57
58    pub fn open_with_listeners(
59        cfg: Config,
60        listeners: Vec<Arc<dyn EventListener>>,
61    ) -> Result<Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>>> {
62        Self::open_with(cfg, Arc::new(DefaultFileSystem), listeners)
63    }
64}
65
66impl<F> Engine<F, FilePipeLog<F>>
67where
68    F: FileSystem,
69{
70    pub fn open_with_file_system(
71        cfg: Config,
72        file_system: Arc<F>,
73    ) -> Result<Engine<F, FilePipeLog<F>>> {
74        Self::open_with(cfg, file_system, vec![])
75    }
76
77    pub fn open_with(
78        mut cfg: Config,
79        file_system: Arc<F>,
80        mut listeners: Vec<Arc<dyn EventListener>>,
81    ) -> Result<Engine<F, FilePipeLog<F>>> {
82        cfg.sanitize()?;
83        listeners.push(Arc::new(PurgeHook::default()) as Arc<dyn EventListener>);
84
85        let start = Instant::now();
86        let mut builder = FilePipeLogBuilder::new(cfg.clone(), file_system, listeners.clone());
87        builder.scan()?;
88        let factory = MemTableRecoverContextFactory::new(&cfg);
89        let (append, rewrite) = builder.recover(&factory)?;
90        let pipe_log = Arc::new(builder.finish()?);
91        rewrite.merge_append_context(append);
92        let (memtables, stats) = rewrite.finish();
93        info!("Recovering raft logs takes {:?}", start.elapsed());
94
95        let cfg = Arc::new(cfg);
96        let purge_manager = PurgeManager::new(
97            cfg.clone(),
98            memtables.clone(),
99            pipe_log.clone(),
100            stats.clone(),
101            listeners.clone(),
102        );
103
104        let (tx, rx) = mpsc::channel();
105        let stats_clone = stats.clone();
106        let memtables_clone = memtables.clone();
107        let metrics_flusher = ThreadBuilder::new()
108            .name("re-metrics".into())
109            .spawn(move || loop {
110                stats_clone.flush_metrics();
111                memtables_clone.flush_metrics();
112                if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
113                    break;
114                }
115            })?;
116
117        Ok(Self {
118            cfg,
119            listeners,
120            stats,
121            memtables,
122            pipe_log,
123            purge_manager,
124            write_barrier: Default::default(),
125            tx: Mutex::new(tx),
126            metrics_flusher: Some(metrics_flusher),
127            _phantom: PhantomData,
128        })
129    }
130}
131
132impl<F, P> Engine<F, P>
133where
134    F: FileSystem,
135    P: PipeLog,
136{
137    /// Writes the content of `log_batch` into the engine and returns written
138    /// bytes. If `sync` is true, the write will be followed by a call to
139    /// `fdatasync` on the log file.
140    pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> {
141        if log_batch.is_empty() {
142            return Ok(0);
143        }
144        let start = Instant::now();
145        let len = log_batch.finish_populate(
146            self.cfg.batch_compression_threshold.0 as usize,
147            self.cfg.compression_level,
148        )?;
149        debug_assert!(len > 0);
150
151        let mut attempt_count = 0_u64;
152        let block_handle = loop {
153            // Max retry count is limited to `WRITE_MAX_RETRY_TIMES`, that is, 2.
154            // If the first `append` retry because of NOSPC error, the next `append`
155            // should success, unless there exists several abnormal cases in the IO device.
156            // In that case, `Engine::write` must return `Err`.
157            attempt_count += 1;
158            let mut writer = Writer::new(log_batch, sync);
159            // Snapshot and clear the current perf context temporarily, so the write group
160            // leader will collect the perf context diff later.
161            let mut perf_context = take_perf_context();
162            let before_enter = Instant::now();
163            if let Some(mut group) = self.write_barrier.enter(&mut writer) {
164                let now = Instant::now();
165                let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
166                for writer in group.iter_mut() {
167                    writer.entered_time = Some(now);
168                    sync |= writer.sync;
169                    let log_batch = writer.mut_payload();
170                    let res = self.pipe_log.append(LogQueue::Append, log_batch);
171                    writer.set_output(res);
172                }
173                perf_context!(log_write_duration).observe_since(now);
174                if sync {
175                    // As per trait protocol, sync error should be retriable. But we panic anyway to
176                    // save the trouble of propagating it to other group members.
177                    self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
178                }
179                // Pass the perf context diff to all the writers.
180                let diff = get_perf_context();
181                for writer in group.iter_mut() {
182                    writer.perf_context_diff = diff.clone();
183                }
184            }
185            let entered_time = writer.entered_time.unwrap();
186            perf_context.write_wait_duration +=
187                entered_time.saturating_duration_since(before_enter);
188            debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO);
189            perf_context += &writer.perf_context_diff;
190            set_perf_context(perf_context);
191            // Retry if `writer.finish()` returns a special 'Error::TryAgain', remarking
192            // that there still exists free space for this `LogBatch`.
193            match writer.finish() {
194                Ok(handle) => {
195                    ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM
196                        .observe(entered_time.saturating_duration_since(start).as_secs_f64());
197                    break handle;
198                }
199                Err(Error::TryAgain(e)) => {
200                    if attempt_count >= MAX_WRITE_ATTEMPT {
201                        // A special err, we will retry this LogBatch `append` by appending
202                        // this writer to the next write group, and the current write leader
203                        // will not hang on this write and will return timely.
204                        return Err(Error::TryAgain(format!(
205                            "Failed to write logbatch, exceed MAX_WRITE_ATTEMPT: ({MAX_WRITE_ATTEMPT}), err: {e}",
206                        )));
207                    }
208                    info!("got err: {e}, try to write this LogBatch again");
209                }
210                Err(e) => {
211                    return Err(e);
212                }
213            }
214        };
215        let mut now = Instant::now();
216        log_batch.finish_write(block_handle);
217        self.memtables.apply_append_writes(log_batch.drain());
218        for listener in &self.listeners {
219            listener.post_apply_memtables(block_handle.id);
220        }
221        let end = Instant::now();
222        let apply_duration = end.saturating_duration_since(now);
223        ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
224        perf_context!(apply_duration).observe(apply_duration);
225        now = end;
226        ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
227        ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
228        Ok(len)
229    }
230
231    /// Synchronizes the Raft engine.
232    pub fn sync(&self) -> Result<()> {
233        self.write(&mut LogBatch::default(), true)?;
234        Ok(())
235    }
236
237    pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> {
238        let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
239        if let Some(memtable) = self.memtables.get(region_id) {
240            if let Some(value) = memtable.read().get(key) {
241                return Ok(Some(parse_from_bytes(&value)?));
242            }
243        }
244        Ok(None)
245    }
246
247    pub fn get(&self, region_id: u64, key: &[u8]) -> Option<Vec<u8>> {
248        let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
249        if let Some(memtable) = self.memtables.get(region_id) {
250            return memtable.read().get(key);
251        }
252        None
253    }
254
255    /// Iterates over [start_key, end_key) range of Raft Group key-values and
256    /// yields messages of the required type. Unparsable items are skipped.
257    pub fn scan_messages<S, C>(
258        &self,
259        region_id: u64,
260        start_key: Option<&[u8]>,
261        end_key: Option<&[u8]>,
262        reverse: bool,
263        mut callback: C,
264    ) -> Result<()>
265    where
266        S: Message,
267        C: FnMut(&[u8], S) -> bool,
268    {
269        self.scan_raw_messages(region_id, start_key, end_key, reverse, move |k, raw_v| {
270            if let Ok(v) = parse_from_bytes(raw_v) {
271                callback(k, v)
272            } else {
273                true
274            }
275        })
276    }
277
278    /// Iterates over [start_key, end_key) range of Raft Group key-values and
279    /// yields all key value pairs as bytes.
280    pub fn scan_raw_messages<C>(
281        &self,
282        region_id: u64,
283        start_key: Option<&[u8]>,
284        end_key: Option<&[u8]>,
285        reverse: bool,
286        callback: C,
287    ) -> Result<()>
288    where
289        C: FnMut(&[u8], &[u8]) -> bool,
290    {
291        let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
292        if let Some(memtable) = self.memtables.get(region_id) {
293            memtable
294                .read()
295                .scan(start_key, end_key, reverse, callback)?;
296        }
297        Ok(())
298    }
299
300    pub fn get_entry<M: MessageExt>(
301        &self,
302        region_id: u64,
303        log_idx: u64,
304    ) -> Result<Option<M::Entry>> {
305        let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
306        if let Some(memtable) = self.memtables.get(region_id) {
307            if let Some(idx) = memtable.read().get_entry(log_idx) {
308                ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0);
309                return Ok(Some(read_entry_from_file::<M, _>(
310                    self.pipe_log.as_ref(),
311                    &idx,
312                )?));
313            }
314        }
315        Ok(None)
316    }
317
318    /// Purges expired logs files and returns a set of Raft group ids that need
319    /// to be compacted.
320    pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
321        self.purge_manager.purge_expired_files()
322    }
323
324    /// Returns count of fetched entries.
325    pub fn fetch_entries_to<M: MessageExt>(
326        &self,
327        region_id: u64,
328        begin: u64,
329        end: u64,
330        max_size: Option<usize>,
331        vec: &mut Vec<M::Entry>,
332    ) -> Result<usize> {
333        let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
334        if let Some(memtable) = self.memtables.get(region_id) {
335            let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
336            memtable
337                .read()
338                .fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
339            for i in ents_idx.iter() {
340                vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
341            }
342            ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
343            return Ok(ents_idx.len());
344        }
345        Ok(0)
346    }
347
348    pub fn first_index(&self, region_id: u64) -> Option<u64> {
349        if let Some(memtable) = self.memtables.get(region_id) {
350            return memtable.read().first_index();
351        }
352        None
353    }
354
355    pub fn last_index(&self, region_id: u64) -> Option<u64> {
356        if let Some(memtable) = self.memtables.get(region_id) {
357            return memtable.read().last_index();
358        }
359        None
360    }
361
362    /// Deletes log entries before `index` in the specified Raft group. Returns
363    /// the number of deleted entries.
364    pub fn compact_to(&self, region_id: u64, index: u64) -> u64 {
365        let first_index = match self.first_index(region_id) {
366            Some(index) => index,
367            None => return 0,
368        };
369
370        let mut log_batch = LogBatch::default();
371        log_batch.add_command(region_id, Command::Compact { index });
372        if let Err(e) = self.write(&mut log_batch, false) {
373            error!("Failed to write Compact command: {e}");
374        }
375
376        self.first_index(region_id).unwrap_or(index) - first_index
377    }
378
379    pub fn raft_groups(&self) -> Vec<u64> {
380        self.memtables.fold(vec![], |mut v, m| {
381            v.push(m.region_id());
382            v
383        })
384    }
385
386    /// Returns `true` if the engine contains no Raft Group. Empty Raft Group
387    /// that isn't cleaned is counted as well.
388    pub fn is_empty(&self) -> bool {
389        self.memtables.is_empty()
390    }
391
392    /// Returns the sequence number range of active log files in the specific
393    /// log queue.
394    /// For testing only.
395    pub fn file_span(&self, queue: LogQueue) -> (u64, u64) {
396        self.pipe_log.file_span(queue)
397    }
398
399    pub fn get_used_size(&self) -> usize {
400        self.pipe_log.total_size(LogQueue::Append) + self.pipe_log.total_size(LogQueue::Rewrite)
401    }
402
403    pub fn path(&self) -> &str {
404        self.cfg.dir.as_str()
405    }
406
407    #[cfg(feature = "internals")]
408    pub fn purge_manager(&self) -> &PurgeManager<P> {
409        &self.purge_manager
410    }
411}
412
413impl<F, P> Drop for Engine<F, P>
414where
415    F: FileSystem,
416    P: PipeLog,
417{
418    fn drop(&mut self) {
419        self.tx.lock().unwrap().send(()).unwrap();
420        if let Some(t) = self.metrics_flusher.take() {
421            t.join().unwrap();
422        }
423    }
424}
425
426impl Engine<DefaultFileSystem, FilePipeLog<DefaultFileSystem>> {
427    pub fn consistency_check(path: &Path) -> Result<Vec<(u64, u64)>> {
428        Self::consistency_check_with_file_system(path, Arc::new(DefaultFileSystem))
429    }
430
431    #[cfg(feature = "scripting")]
432    pub fn unsafe_repair(path: &Path, queue: Option<LogQueue>, script: String) -> Result<()> {
433        Self::unsafe_repair_with_file_system(path, queue, script, Arc::new(DefaultFileSystem))
434    }
435
436    pub fn dump(path: &Path) -> Result<LogItemReader<DefaultFileSystem>> {
437        Self::dump_with_file_system(path, Arc::new(DefaultFileSystem))
438    }
439}
440
441impl<F> Engine<F, FilePipeLog<F>>
442where
443    F: FileSystem,
444{
445    /// Returns a list of corrupted Raft groups, including their ids and last
446    /// valid log index. Head or tail corruption cannot be detected.
447    pub fn consistency_check_with_file_system(
448        path: &Path,
449        file_system: Arc<F>,
450    ) -> Result<Vec<(u64, u64)>> {
451        if !path.exists() {
452            return Err(Error::InvalidArgument(format!(
453                "raft-engine directory '{}' does not exist.",
454                path.to_str().unwrap()
455            )));
456        }
457
458        let cfg = Config {
459            dir: path.to_str().unwrap().to_owned(),
460            recovery_mode: RecoveryMode::TolerateAnyCorruption,
461            ..Default::default()
462        };
463        let mut builder = FilePipeLogBuilder::new(cfg, file_system, Vec::new());
464        builder.scan()?;
465        let (append, rewrite) =
466            builder.recover(&DefaultMachineFactory::<ConsistencyChecker>::default())?;
467        let mut map = rewrite.finish();
468        for (id, index) in append.finish() {
469            map.entry(id).or_insert(index);
470        }
471        let mut list: Vec<(u64, u64)> = map.into_iter().collect();
472        list.sort_unstable();
473        Ok(list)
474    }
475
476    #[cfg(feature = "scripting")]
477    pub fn unsafe_repair_with_file_system(
478        path: &Path,
479        queue: Option<LogQueue>,
480        script: String,
481        file_system: Arc<F>,
482    ) -> Result<()> {
483        use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};
484
485        if !path.exists() {
486            return Err(Error::InvalidArgument(format!(
487                "raft-engine directory '{}' does not exist.",
488                path.to_str().unwrap()
489            )));
490        }
491
492        let cfg = Config {
493            dir: path.to_str().unwrap().to_owned(),
494            recovery_mode: RecoveryMode::TolerateAnyCorruption,
495            ..Default::default()
496        };
497        let recovery_mode = cfg.recovery_mode;
498        let read_block_size = cfg.recovery_read_block_size.0;
499        let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
500        builder.scan()?;
501        let factory = crate::filter::RhaiFilterMachineFactory::from_script(script);
502        let mut machine = None;
503        if queue.is_none() || queue.unwrap() == LogQueue::Append {
504            machine = Some(builder.recover_queue(
505                file_system.clone(),
506                RecoveryConfig {
507                    queue: LogQueue::Append,
508                    mode: recovery_mode,
509                    concurrency: 1,
510                    read_block_size,
511                },
512                &factory,
513            )?);
514        }
515        if queue.is_none() || queue.unwrap() == LogQueue::Rewrite {
516            let machine2 = builder.recover_queue(
517                file_system.clone(),
518                RecoveryConfig {
519                    queue: LogQueue::Rewrite,
520                    mode: recovery_mode,
521                    concurrency: 1,
522                    read_block_size,
523                },
524                &factory,
525            )?;
526            if let Some(machine) = &mut machine {
527                machine.merge(machine2, LogQueue::Rewrite)?;
528            }
529        }
530        if let Some(machine) = machine {
531            machine.finish(file_system.as_ref(), path)?;
532        }
533        Ok(())
534    }
535
536    /// Dumps all operations.
537    pub fn dump_with_file_system(path: &Path, file_system: Arc<F>) -> Result<LogItemReader<F>> {
538        if !path.exists() {
539            return Err(Error::InvalidArgument(format!(
540                "raft-engine directory or file '{}' does not exist.",
541                path.to_str().unwrap()
542            )));
543        }
544
545        if path.is_dir() {
546            LogItemReader::new_directory_reader(file_system, path)
547        } else {
548            LogItemReader::new_file_reader(file_system, path)
549        }
550    }
551}
552
553struct BlockCache {
554    key: Cell<FileBlockHandle>,
555    block: RefCell<Vec<u8>>,
556}
557
558impl BlockCache {
559    fn new() -> Self {
560        BlockCache {
561            key: Cell::new(FileBlockHandle {
562                id: FileId::new(LogQueue::Append, 0),
563                offset: 0,
564                len: 0,
565            }),
566            block: RefCell::new(Vec::new()),
567        }
568    }
569
570    fn insert(&self, key: FileBlockHandle, block: Vec<u8>) {
571        self.key.set(key);
572        self.block.replace(block);
573    }
574}
575
576thread_local! {
577    static BLOCK_CACHE: BlockCache = BlockCache::new();
578}
579
580pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
581where
582    M: MessageExt,
583    P: PipeLog,
584{
585    BLOCK_CACHE.with(|cache| {
586        if cache.key.get() != idx.entries.unwrap() {
587            cache.insert(
588                idx.entries.unwrap(),
589                LogBatch::decode_entries_block(
590                    &pipe_log.read_bytes(idx.entries.unwrap())?,
591                    idx.entries.unwrap(),
592                    idx.compression_type,
593                )?,
594            );
595        }
596        let e = parse_from_bytes(
597            &cache.block.borrow()
598                [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
599        )?;
600        assert_eq!(M::index(&e), idx.index);
601        Ok(e)
602    })
603}
604
605pub(crate) fn read_entry_bytes_from_file<P>(pipe_log: &P, idx: &EntryIndex) -> Result<Vec<u8>>
606where
607    P: PipeLog,
608{
609    BLOCK_CACHE.with(|cache| {
610        if cache.key.get() != idx.entries.unwrap() {
611            cache.insert(
612                idx.entries.unwrap(),
613                LogBatch::decode_entries_block(
614                    &pipe_log.read_bytes(idx.entries.unwrap())?,
615                    idx.entries.unwrap(),
616                    idx.compression_type,
617                )?,
618            );
619        }
620        Ok(cache.block.borrow()
621            [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize]
622            .to_owned())
623    })
624}
625
626#[cfg(test)]
627pub(crate) mod tests {
628    use super::*;
629    use crate::env::{ObfuscatedFileSystem, Permission};
630    use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt};
631    use crate::log_batch::AtomicGroupBuilder;
632    use crate::pipe_log::Version;
633    use crate::test_util::{generate_entries, PanicGuard};
634    use crate::util::ReadableSize;
635    use kvproto::raft_serverpb::RaftLocalState;
636    use raft::eraftpb::Entry;
637    use std::collections::{BTreeSet, HashSet};
638    use std::fs::OpenOptions;
639    use std::path::PathBuf;
640
641    pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
642    impl<F: FileSystem> RaftLogEngine<F> {
643        fn append(&self, rid: u64, start_index: u64, end_index: u64, data: Option<&[u8]>) {
644            let entries = generate_entries(start_index, end_index, data);
645            if !entries.is_empty() {
646                let mut batch = LogBatch::default();
647                batch.add_entries::<Entry>(rid, &entries).unwrap();
648                batch
649                    .put_message(
650                        rid,
651                        b"last_index".to_vec(),
652                        &RaftLocalState {
653                            last_index: entries[entries.len() - 1].index,
654                            ..Default::default()
655                        },
656                    )
657                    .unwrap();
658                self.write(&mut batch, true).unwrap();
659            }
660        }
661
662        fn clean(&self, rid: u64) {
663            let mut log_batch = LogBatch::default();
664            log_batch.add_command(rid, Command::Clean);
665            self.write(&mut log_batch, true).unwrap();
666        }
667
668        fn decode_last_index(&self, rid: u64) -> Option<u64> {
669            self.get_message::<RaftLocalState>(rid, b"last_index")
670                .unwrap()
671                .map(|s| s.last_index)
672        }
673
674        fn reopen(self) -> Self {
675            let cfg: Config = self.cfg.as_ref().clone();
676            let file_system = self.pipe_log.file_system();
677            let mut listeners = self.listeners.clone();
678            listeners.pop();
679            drop(self);
680            RaftLogEngine::open_with(cfg, file_system, listeners).unwrap()
681        }
682
683        fn scan_entries<FR: Fn(u64, LogQueue, &[u8])>(
684            &self,
685            rid: u64,
686            start: u64,
687            end: u64,
688            reader: FR,
689        ) {
690            let mut entries = Vec::new();
691            self.fetch_entries_to::<Entry>(
692                rid,
693                self.first_index(rid).unwrap(),
694                self.last_index(rid).unwrap() + 1,
695                None,
696                &mut entries,
697            )
698            .unwrap();
699            assert_eq!(entries.first().unwrap().index, start, "{rid}");
700            assert_eq!(entries.last().unwrap().index + 1, end);
701            assert_eq!(
702                entries.last().unwrap().index,
703                self.decode_last_index(rid).unwrap()
704            );
705            assert_eq!(entries.len(), (end - start) as usize);
706            for e in entries.iter() {
707                let entry_index = self
708                    .memtables
709                    .get(rid)
710                    .unwrap()
711                    .read()
712                    .get_entry(e.index)
713                    .unwrap();
714                assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e);
715                reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
716            }
717        }
718
719        fn file_count(&self, queue: Option<LogQueue>) -> usize {
720            if let Some(queue) = queue {
721                let (a, b) = self.file_span(queue);
722                (b - a + 1) as usize
723            } else {
724                self.file_count(Some(LogQueue::Append)) + self.file_count(Some(LogQueue::Rewrite))
725            }
726        }
727    }
728
729    #[test]
730    fn test_empty_engine() {
731        let dir = tempfile::Builder::new()
732            .prefix("test_empty_engine")
733            .tempdir()
734            .unwrap();
735        let mut sub_dir = PathBuf::from(dir.as_ref());
736        sub_dir.push("raft-engine");
737        let cfg = Config {
738            dir: sub_dir.to_str().unwrap().to_owned(),
739            ..Default::default()
740        };
741        RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
742            .unwrap();
743    }
744
745    #[test]
746    fn test_get_entry() {
747        let normal_batch_size = 10;
748        let compressed_batch_size = 5120;
749        for &entry_size in &[normal_batch_size, compressed_batch_size] {
750            let dir = tempfile::Builder::new()
751                .prefix("test_get_entry")
752                .tempdir()
753                .unwrap();
754            let cfg = Config {
755                dir: dir.path().to_str().unwrap().to_owned(),
756                target_file_size: ReadableSize(1),
757                ..Default::default()
758            };
759
760            let engine = RaftLogEngine::open_with_file_system(
761                cfg.clone(),
762                Arc::new(ObfuscatedFileSystem::default()),
763            )
764            .unwrap();
765            assert_eq!(engine.path(), dir.path().to_str().unwrap());
766            let data = vec![b'x'; entry_size];
767            for i in 10..20 {
768                let rid = i;
769                let index = i;
770                engine.append(rid, index, index + 2, Some(&data));
771            }
772            for i in 10..20 {
773                let rid = i;
774                let index = i;
775                engine.scan_entries(rid, index, index + 2, |_, q, d| {
776                    assert_eq!(q, LogQueue::Append);
777                    assert_eq!(d, &data);
778                });
779            }
780
781            // Recover the engine.
782            let engine = engine.reopen();
783            for i in 10..20 {
784                let rid = i;
785                let index = i;
786                engine.scan_entries(rid, index, index + 2, |_, q, d| {
787                    assert_eq!(q, LogQueue::Append);
788                    assert_eq!(d, &data);
789                });
790            }
791        }
792    }
793
794    #[test]
795    fn test_clean_raft_group() {
796        fn run_steps(steps: &[Option<(u64, u64)>]) {
797            let rid = 1;
798            let data = vec![b'x'; 1024];
799
800            for rewrite_step in 1..=steps.len() {
801                for exit_purge in [None, Some(1), Some(2)] {
802                    let _guard = PanicGuard::with_prompt(format!(
803                        "case: [{steps:?}, {rewrite_step}, {exit_purge:?}]",
804                    ));
805                    let dir = tempfile::Builder::new()
806                        .prefix("test_clean_raft_group")
807                        .tempdir()
808                        .unwrap();
809                    let cfg = Config {
810                        dir: dir.path().to_str().unwrap().to_owned(),
811                        target_file_size: ReadableSize(1),
812                        ..Default::default()
813                    };
814                    let engine = RaftLogEngine::open_with_file_system(
815                        cfg.clone(),
816                        Arc::new(ObfuscatedFileSystem::default()),
817                    )
818                    .unwrap();
819
820                    for (i, step) in steps.iter().enumerate() {
821                        if let Some((start, end)) = *step {
822                            engine.append(rid, start, end, Some(&data));
823                        } else {
824                            engine.clean(rid);
825                        }
826                        if i + 1 == rewrite_step {
827                            engine
828                                .purge_manager
829                                .must_rewrite_append_queue(None, exit_purge);
830                        }
831                    }
832
833                    let engine = engine.reopen();
834                    if let Some((start, end)) = *steps.last().unwrap() {
835                        engine.scan_entries(rid, start, end, |_, _, d| {
836                            assert_eq!(d, &data);
837                        });
838                    } else {
839                        assert!(engine.raft_groups().is_empty());
840                    }
841
842                    engine.purge_manager.must_rewrite_append_queue(None, None);
843                    let engine = engine.reopen();
844                    if let Some((start, end)) = *steps.last().unwrap() {
845                        engine.scan_entries(rid, start, end, |_, _, d| {
846                            assert_eq!(d, &data);
847                        });
848                    } else {
849                        assert!(engine.raft_groups().is_empty());
850                    }
851                }
852            }
853        }
854
855        run_steps(&[Some((1, 5)), None, Some((2, 6)), None, Some((3, 7)), None]);
856        run_steps(&[Some((1, 5)), None, Some((2, 6)), None, Some((3, 7))]);
857        run_steps(&[Some((1, 5)), None, Some((2, 6)), None]);
858        run_steps(&[Some((1, 5)), None, Some((2, 6))]);
859        run_steps(&[Some((1, 5)), None]);
860    }
861
862    #[test]
863    fn test_key_value_scan() {
864        fn key(i: u64) -> Vec<u8> {
865            format!("k{i}").as_bytes().to_vec()
866        }
867        fn value(i: u64) -> Vec<u8> {
868            format!("v{i}").as_bytes().to_vec()
869        }
870        fn rich_value(i: u64) -> RaftLocalState {
871            RaftLocalState {
872                last_index: i,
873                ..Default::default()
874            }
875        }
876
877        let dir = tempfile::Builder::new()
878            .prefix("test_key_value_scan")
879            .tempdir()
880            .unwrap();
881        let cfg = Config {
882            dir: dir.path().to_str().unwrap().to_owned(),
883            target_file_size: ReadableSize(1),
884            ..Default::default()
885        };
886        let rid = 1;
887        let engine =
888            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
889                .unwrap();
890
891        engine
892            .scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
893                panic!("unexpected message.");
894            })
895            .unwrap();
896
897        let mut batch = LogBatch::default();
898        let mut res = Vec::new();
899        let mut rich_res = Vec::new();
900        batch.put(rid, key(1), value(1)).unwrap();
901        batch.put(rid, key(2), value(2)).unwrap();
902        batch.put(rid, key(3), value(3)).unwrap();
903        engine.write(&mut batch, false).unwrap();
904
905        engine
906            .scan_raw_messages(rid, None, None, false, |k, v| {
907                res.push((k.to_vec(), v.to_vec()));
908                true
909            })
910            .unwrap();
911        assert_eq!(
912            res,
913            vec![(key(1), value(1)), (key(2), value(2)), (key(3), value(3))]
914        );
915        res.clear();
916        engine
917            .scan_raw_messages(rid, None, None, true, |k, v| {
918                res.push((k.to_vec(), v.to_vec()));
919                true
920            })
921            .unwrap();
922        assert_eq!(
923            res,
924            vec![(key(3), value(3)), (key(2), value(2)), (key(1), value(1))]
925        );
926        res.clear();
927        engine
928            .scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
929                panic!("unexpected message.")
930            })
931            .unwrap();
932
933        batch.put_message(rid, key(22), &rich_value(22)).unwrap();
934        batch.put_message(rid, key(33), &rich_value(33)).unwrap();
935        engine.write(&mut batch, false).unwrap();
936
937        engine
938            .scan_messages(rid, None, None, false, |k, v| {
939                rich_res.push((k.to_vec(), v));
940                false
941            })
942            .unwrap();
943        assert_eq!(rich_res, vec![(key(22), rich_value(22))]);
944        rich_res.clear();
945        engine
946            .scan_messages(rid, None, None, true, |k, v| {
947                rich_res.push((k.to_vec(), v));
948                false
949            })
950            .unwrap();
951        assert_eq!(rich_res, vec![(key(33), rich_value(33))]);
952        rich_res.clear();
953    }
954
955    #[test]
956    fn test_delete_key_value() {
957        let dir = tempfile::Builder::new()
958            .prefix("test_delete_key_value")
959            .tempdir()
960            .unwrap();
961        let cfg = Config {
962            dir: dir.path().to_str().unwrap().to_owned(),
963            target_file_size: ReadableSize(1),
964            ..Default::default()
965        };
966        let rid = 1;
967        let key = b"key".to_vec();
968        let (v1, v2) = (b"v1".to_vec(), b"v2".to_vec());
969        let mut batch_1 = LogBatch::default();
970        batch_1.put(rid, key.clone(), v1).unwrap();
971        let mut batch_2 = LogBatch::default();
972        batch_2.put(rid, key.clone(), v2.clone()).unwrap();
973        let mut delete_batch = LogBatch::default();
974        delete_batch.delete(rid, key.clone());
975
976        let engine =
977            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
978                .unwrap();
979        assert_eq!(
980            engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
981            None
982        );
983        assert_eq!(engine.get(rid, &key), None);
984
985        // put | delete
986        //     ^ rewrite
987        engine.write(&mut batch_1.clone(), true).unwrap();
988        assert!(engine.get_message::<RaftLocalState>(rid, &key).is_err());
989        engine.purge_manager.must_rewrite_append_queue(None, None);
990        engine.write(&mut delete_batch.clone(), true).unwrap();
991        let engine = engine.reopen();
992        assert_eq!(engine.get(rid, &key), None);
993        assert_eq!(
994            engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
995            None
996        );
997
998        // Incomplete purge.
999        engine.write(&mut batch_1.clone(), true).unwrap();
1000        engine
1001            .purge_manager
1002            .must_rewrite_append_queue(None, Some(2));
1003        engine.write(&mut delete_batch.clone(), true).unwrap();
1004        let engine = engine.reopen();
1005        assert_eq!(engine.get(rid, &key), None);
1006
1007        // TODO: Preserve kv tombstone during rewrite and activate this test case.
1008        // put | delete |
1009        //              ^ rewrite
1010        // let engine = engine.reopen();
1011        // engine.write(&mut batch_1.clone(), true).unwrap();
1012        // engine.write(&mut delete_batch.clone(), true).unwrap();
1013        // engine.purge_manager.must_rewrite_append_queue(None, None);
1014        // let engine = engine.reopen();
1015        // assert_eq!(engine.get(rid, &key), None);
1016
1017        // put | delete | put
1018        //     ^ rewrite
1019        let engine = engine.reopen();
1020        engine.write(&mut batch_1.clone(), true).unwrap();
1021        engine.purge_manager.must_rewrite_append_queue(None, None);
1022        engine.write(&mut delete_batch.clone(), true).unwrap();
1023        engine.write(&mut batch_2.clone(), true).unwrap();
1024        let engine = engine.reopen();
1025        assert_eq!(engine.get(rid, &key).unwrap(), v2);
1026        // Incomplete purge.
1027        engine.write(&mut batch_1.clone(), true).unwrap();
1028        engine
1029            .purge_manager
1030            .must_rewrite_append_queue(None, Some(2));
1031        engine.write(&mut delete_batch.clone(), true).unwrap();
1032        engine.write(&mut batch_2.clone(), true).unwrap();
1033        let engine = engine.reopen();
1034        assert_eq!(engine.get(rid, &key).unwrap(), v2);
1035
1036        // put | delete | put
1037        //              ^ rewrite
1038        let engine = engine.reopen();
1039        engine.write(&mut batch_1.clone(), true).unwrap();
1040        engine.write(&mut delete_batch.clone(), true).unwrap();
1041        engine.purge_manager.must_rewrite_append_queue(None, None);
1042        engine.write(&mut batch_2.clone(), true).unwrap();
1043        let engine = engine.reopen();
1044        assert_eq!(engine.get(rid, &key).unwrap(), v2);
1045        // Incomplete purge.
1046        engine.write(&mut batch_1.clone(), true).unwrap();
1047        engine.write(&mut delete_batch.clone(), true).unwrap();
1048        engine
1049            .purge_manager
1050            .must_rewrite_append_queue(None, Some(2));
1051        engine.write(&mut batch_2.clone(), true).unwrap();
1052        let engine = engine.reopen();
1053        assert_eq!(engine.get(rid, &key).unwrap(), v2);
1054
1055        // put | delete | put |
1056        //                    ^ rewrite
1057        let engine = engine.reopen();
1058        engine.write(&mut batch_1.clone(), true).unwrap();
1059        engine.write(&mut delete_batch.clone(), true).unwrap();
1060        engine.write(&mut batch_2.clone(), true).unwrap();
1061        engine.purge_manager.must_rewrite_append_queue(None, None);
1062        let engine = engine.reopen();
1063        assert_eq!(engine.get(rid, &key).unwrap(), v2);
1064        // Incomplete purge.
1065        let engine = engine.reopen();
1066        engine.write(&mut batch_1.clone(), true).unwrap();
1067        engine.write(&mut delete_batch.clone(), true).unwrap();
1068        engine.write(&mut batch_2.clone(), true).unwrap();
1069        engine
1070            .purge_manager
1071            .must_rewrite_append_queue(None, Some(2));
1072        let engine = engine.reopen();
1073        assert_eq!(engine.get(rid, &key).unwrap(), v2);
1074    }
1075
1076    #[test]
1077    fn test_compact_raft_group() {
1078        let dir = tempfile::Builder::new()
1079            .prefix("test_compact_raft_group")
1080            .tempdir()
1081            .unwrap();
1082        let cfg = Config {
1083            dir: dir.path().to_str().unwrap().to_owned(),
1084            target_file_size: ReadableSize(1),
1085            ..Default::default()
1086        };
1087        let engine =
1088            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1089                .unwrap();
1090        let data = vec![b'x'; 1024];
1091
1092        // rewrite:[1  ..10]
1093        // append:   [5..10]
1094        let mut rid = 7;
1095        engine.append(rid, 1, 10, Some(&data));
1096        // Files are not purged.
1097        engine
1098            .purge_manager
1099            .must_rewrite_append_queue(None, Some(2));
1100        let mut compact_log = LogBatch::default();
1101        compact_log.add_command(rid, Command::Compact { index: 5 });
1102        engine.write(&mut compact_log, true).unwrap();
1103        let engine = engine.reopen();
1104        engine.scan_entries(rid, 5, 10, |_, q, d| {
1105            assert_eq!(q, LogQueue::Append);
1106            assert_eq!(d, &data);
1107        });
1108        assert_eq!(engine.stats.live_entries(LogQueue::Append), 6); // 5 entries + 1 kv
1109
1110        // rewrite:   [20..25]
1111        // append: [10   ..25]
1112        rid += 1;
1113        engine.append(rid, 5, 15, Some(&data));
1114        let mut compact_log = LogBatch::default();
1115        compact_log.add_command(rid, Command::Compact { index: 10 });
1116        engine.write(&mut compact_log, true).unwrap();
1117        engine.append(rid, 15, 25, Some(&data));
1118        // Files are not purged.
1119        engine
1120            .purge_manager
1121            .must_rewrite_append_queue(None, Some(2));
1122        // Simulate loss of buffered write.
1123        let mut compact_log = LogBatch::default();
1124        compact_log.add_command(rid, Command::Compact { index: 20 });
1125        engine.memtables.apply_append_writes(compact_log.drain());
1126        engine.purge_manager.must_rewrite_rewrite_queue();
1127        let engine = engine.reopen();
1128        engine.scan_entries(rid, 10, 25, |_, q, d| {
1129            assert_eq!(q, LogQueue::Append);
1130            assert_eq!(d, &data);
1131        });
1132        assert_eq!(engine.stats.live_entries(LogQueue::Append), 22); // 20 entries + 2 kv
1133        engine.clean(rid - 1);
1134        assert_eq!(engine.stats.live_entries(LogQueue::Append), 16);
1135        // rewrite: [20..25][10..25]
1136        // append: [10..25]
1137        engine
1138            .purge_manager
1139            .must_rewrite_append_queue(None, Some(2));
1140        let engine = engine.reopen();
1141        engine.scan_entries(rid, 10, 25, |_, q, d| {
1142            assert_eq!(q, LogQueue::Append);
1143            assert_eq!(d, &data);
1144        });
1145
1146        // rewrite:[10..15][15  ..25]
1147        // append:           [20..25]
1148        rid += 1;
1149        engine.append(rid, 5, 15, Some(&data));
1150        let mut compact_log = LogBatch::default();
1151        compact_log.add_command(rid, Command::Compact { index: 10 });
1152        engine.write(&mut compact_log, true).unwrap();
1153        engine.purge_manager.must_rewrite_append_queue(None, None);
1154        engine.append(rid, 15, 25, Some(&data));
1155        engine
1156            .purge_manager
1157            .must_rewrite_append_queue(None, Some(2));
1158        let mut compact_log = LogBatch::default();
1159        compact_log.add_command(rid, Command::Compact { index: 20 });
1160        engine.write(&mut compact_log, true).unwrap();
1161        let engine = engine.reopen();
1162        engine.scan_entries(rid, 20, 25, |_, q, d| {
1163            assert_eq!(q, LogQueue::Append);
1164            assert_eq!(d, &data);
1165        });
1166
1167        // rewrite:[1..5] [10..15]
1168        // append:        [10..15]
1169        rid += 1;
1170        engine.append(rid, 1, 5, Some(&data));
1171        engine.purge_manager.must_rewrite_append_queue(None, None);
1172        engine.append(rid, 5, 15, Some(&data));
1173        let mut compact_log = LogBatch::default();
1174        compact_log.add_command(rid, Command::Compact { index: 10 });
1175        engine.write(&mut compact_log, true).unwrap();
1176        // Files are not purged.
1177        engine
1178            .purge_manager
1179            .must_rewrite_append_queue(None, Some(2));
1180        let engine = engine.reopen();
1181        engine.scan_entries(rid, 10, 15, |_, q, d| {
1182            assert_eq!(q, LogQueue::Append);
1183            assert_eq!(d, &data);
1184        });
1185    }
1186
1187    #[test]
1188    fn test_purge_triggered_by_compact() {
1189        let dir = tempfile::Builder::new()
1190            .prefix("test_purge_triggered_by_compact")
1191            .tempdir()
1192            .unwrap();
1193        let cfg = Config {
1194            dir: dir.path().to_str().unwrap().to_owned(),
1195            target_file_size: ReadableSize::kb(5),
1196            purge_threshold: ReadableSize::kb(150),
1197            ..Default::default()
1198        };
1199
1200        let engine =
1201            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1202                .unwrap();
1203        let data = vec![b'x'; 1024];
1204        for index in 0..100 {
1205            engine.append(1, index, index + 1, Some(&data));
1206        }
1207
1208        // GC all log entries. Won't trigger purge because total size is not enough.
1209        let count = engine.compact_to(1, 100);
1210        assert_eq!(count, 100);
1211        assert!(!engine
1212            .purge_manager
1213            .needs_rewrite_log_files(LogQueue::Append));
1214
1215        // Append more logs to make total size greater than `purge_threshold`.
1216        for index in 100..250 {
1217            engine.append(1, index, index + 1, Some(&data));
1218        }
1219
1220        // GC first 101 log entries.
1221        assert_eq!(engine.compact_to(1, 101), 1);
1222        // Needs to purge because the total size is greater than `purge_threshold`.
1223        assert!(engine
1224            .purge_manager
1225            .needs_rewrite_log_files(LogQueue::Append));
1226
1227        let old_min_file_seq = engine.file_span(LogQueue::Append).0;
1228        let will_force_compact = engine.purge_expired_files().unwrap();
1229        let new_min_file_seq = engine.file_span(LogQueue::Append).0;
1230        // Some entries are rewritten.
1231        assert!(new_min_file_seq > old_min_file_seq);
1232        // No regions need to be force compacted because the threshold is not reached.
1233        assert!(will_force_compact.is_empty());
1234        // After purge, entries and raft state are still available.
1235        assert!(engine.get_entry::<Entry>(1, 101).unwrap().is_some());
1236
1237        assert_eq!(engine.compact_to(1, 102), 1);
1238        // Needs to purge because the total size is greater than `purge_threshold`.
1239        assert!(engine
1240            .purge_manager
1241            .needs_rewrite_log_files(LogQueue::Append));
1242        let will_force_compact = engine.purge_expired_files().unwrap();
1243        // The region needs to be force compacted because the threshold is reached.
1244        assert!(!will_force_compact.is_empty());
1245        assert_eq!(will_force_compact[0], 1);
1246    }
1247
1248    #[test]
1249    fn test_purge_trigger_force_rewrite() {
1250        let dir = tempfile::Builder::new()
1251            .prefix("test_purge_trigger_force_write")
1252            .tempdir()
1253            .unwrap();
1254        let cfg = Config {
1255            dir: dir.path().to_str().unwrap().to_owned(),
1256            target_file_size: ReadableSize::kb(1),
1257            purge_threshold: ReadableSize::kb(10),
1258            ..Default::default()
1259        };
1260
1261        let engine =
1262            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1263                .unwrap();
1264        let data = vec![b'x'; 1024];
1265        // write 50 small entries into region 1~3, it should trigger force compact.
1266        for rid in 1..=3 {
1267            for index in 0..50 {
1268                engine.append(rid, index, index + 1, Some(&data[..10]));
1269            }
1270        }
1271        // write some small entries to trigger purge.
1272        for rid in 4..=50 {
1273            engine.append(rid, 1, 2, Some(&data));
1274        }
1275
1276        let check_purge = |pending_regions: Vec<u64>| {
1277            let mut compact_regions = engine.purge_expired_files().unwrap();
1278            // sort key in order.
1279            compact_regions.sort_unstable();
1280            assert_eq!(compact_regions, pending_regions);
1281        };
1282
1283        for _ in 0..9 {
1284            check_purge(vec![1, 2, 3]);
1285        }
1286
1287        // 10th, rewritten, but still needs to be compacted.
1288        check_purge(vec![1, 2, 3]);
1289        for rid in 1..=3 {
1290            let memtable = engine.memtables.get(rid).unwrap();
1291            assert_eq!(memtable.read().rewrite_count(), 50);
1292        }
1293
1294        // compact and write some new data to trigger compact again.
1295        for rid in 2..=50 {
1296            let last_idx = engine.last_index(rid).unwrap();
1297            engine.compact_to(rid, last_idx);
1298            engine.append(rid, last_idx, last_idx + 1, Some(&data));
1299        }
1300        // after write, region 1 can trigger compact again.
1301        check_purge(vec![1]);
1302    }
1303
1304    #[test]
1305    fn test_rewrite_and_recover() {
1306        let dir = tempfile::Builder::new()
1307            .prefix("test_rewrite_and_recover")
1308            .tempdir()
1309            .unwrap();
1310        let cfg = Config {
1311            dir: dir.path().to_str().unwrap().to_owned(),
1312            target_file_size: ReadableSize::kb(5),
1313            purge_threshold: ReadableSize::kb(80),
1314            ..Default::default()
1315        };
1316        let engine =
1317            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1318                .unwrap();
1319        let data = vec![b'x'; 1024];
1320
1321        // Put 100 entries into 10 regions.
1322        for index in 1..=10 {
1323            for rid in 1..=10 {
1324                engine.append(rid, index, index + 1, Some(&data));
1325            }
1326        }
1327        engine.append(11, 1, 11, Some(&data));
1328
1329        // The engine needs purge, and all old entries should be rewritten.
1330        assert!(engine
1331            .purge_manager
1332            .needs_rewrite_log_files(LogQueue::Append));
1333        assert!(engine.purge_expired_files().unwrap().is_empty());
1334        assert!(engine.file_span(LogQueue::Append).0 > 1);
1335
1336        let rewrite_file_size = engine.pipe_log.total_size(LogQueue::Rewrite);
1337        assert!(rewrite_file_size > 59); // The rewrite queue isn't empty.
1338
1339        // All entries should be available.
1340        for rid in 1..=10 {
1341            engine.scan_entries(rid, 1, 11, |_, _, d| {
1342                assert_eq!(d, &data);
1343            });
1344        }
1345
1346        engine.clean(11);
1347        let cleaned_region_ids = engine.memtables.cleaned_region_ids();
1348        assert_eq!(cleaned_region_ids.len(), 1);
1349
1350        let engine = engine.reopen();
1351        assert_eq!(engine.memtables.cleaned_region_ids(), cleaned_region_ids);
1352
1353        for rid in 1..=10 {
1354            engine.scan_entries(rid, 1, 11, |_, _, d| {
1355                assert_eq!(d, &data);
1356            });
1357        }
1358
1359        // Rewrite again to check the rewrite queue is healthy.
1360        for index in 11..=20 {
1361            for rid in 1..=10 {
1362                engine.append(rid, index, index + 1, Some(&data));
1363            }
1364        }
1365
1366        assert!(engine
1367            .purge_manager
1368            .needs_rewrite_log_files(LogQueue::Append));
1369        assert!(engine.purge_expired_files().unwrap().is_empty());
1370    }
1371
1372    #[test]
1373    fn test_empty_protobuf_message() {
1374        let dir = tempfile::Builder::new()
1375            .prefix("test_empty_protobuf_message")
1376            .tempdir()
1377            .unwrap();
1378        let cfg = Config {
1379            dir: dir.path().to_str().unwrap().to_owned(),
1380            ..Default::default()
1381        };
1382        let engine =
1383            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1384                .unwrap();
1385
1386        let mut log_batch = LogBatch::default();
1387        let empty_entry = Entry::new();
1388        assert_eq!(empty_entry.compute_size(), 0);
1389        log_batch
1390            .add_entries::<Entry>(0, &[empty_entry.clone()])
1391            .unwrap();
1392        engine.write(&mut log_batch, false).unwrap();
1393        let empty_state = RaftLocalState::new();
1394        assert_eq!(empty_state.compute_size(), 0);
1395        log_batch
1396            .put_message(1, b"key".to_vec(), &empty_state)
1397            .unwrap();
1398        engine.write(&mut log_batch, false).unwrap();
1399        log_batch
1400            .add_entries::<Entry>(2, &[empty_entry.clone()])
1401            .unwrap();
1402        log_batch
1403            .put_message(2, b"key".to_vec(), &empty_state)
1404            .unwrap();
1405        engine.write(&mut log_batch, true).unwrap();
1406
1407        let engine = engine.reopen();
1408        assert_eq!(
1409            engine.get_entry::<Entry>(0, 0).unwrap().unwrap(),
1410            empty_entry
1411        );
1412        assert_eq!(
1413            engine.get_entry::<Entry>(2, 0).unwrap().unwrap(),
1414            empty_entry
1415        );
1416        assert_eq!(
1417            engine
1418                .get_message::<RaftLocalState>(1, b"key")
1419                .unwrap()
1420                .unwrap(),
1421            empty_state
1422        );
1423        assert_eq!(
1424            engine
1425                .get_message::<RaftLocalState>(2, b"key")
1426                .unwrap()
1427                .unwrap(),
1428            empty_state
1429        );
1430    }
1431
1432    #[test]
1433    fn test_empty_batch() {
1434        let dir = tempfile::Builder::new()
1435            .prefix("test_empty_batch")
1436            .tempdir()
1437            .unwrap();
1438        let cfg = Config {
1439            dir: dir.path().to_str().unwrap().to_owned(),
1440            ..Default::default()
1441        };
1442        let engine =
1443            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1444                .unwrap();
1445        let data = vec![b'x'; 16];
1446        let cases = [[false, false], [false, true], [true, true]];
1447        for (i, writes) in cases.iter().enumerate() {
1448            let rid = i as u64;
1449            let mut batch = LogBatch::default();
1450            for &has_data in writes {
1451                if has_data {
1452                    batch.put(rid, b"key".to_vec(), data.clone()).unwrap();
1453                }
1454                engine.write(&mut batch, true).unwrap();
1455                assert!(batch.is_empty());
1456            }
1457        }
1458    }
1459
1460    #[test]
1461    fn test_dirty_recovery() {
1462        let dir = tempfile::Builder::new()
1463            .prefix("test_dirty_recovery")
1464            .tempdir()
1465            .unwrap();
1466        let cfg = Config {
1467            dir: dir.path().to_str().unwrap().to_owned(),
1468            ..Default::default()
1469        };
1470        let engine =
1471            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1472                .unwrap();
1473        let data = vec![b'x'; 1024];
1474
1475        for rid in 1..21 {
1476            engine.append(rid, 1, 21, Some(&data));
1477        }
1478
1479        // Create an unrelated sub-directory.
1480        std::fs::create_dir(dir.path().join(Path::new("random_dir"))).unwrap();
1481        // Create an unrelated file.
1482        let _f = std::fs::File::create(dir.path().join(Path::new("random_file"))).unwrap();
1483
1484        let engine = engine.reopen();
1485        for rid in 1..21 {
1486            engine.scan_entries(rid, 1, 21, |_, _, d| {
1487                assert_eq!(d, &data);
1488            });
1489        }
1490    }
1491
1492    #[test]
1493    fn test_large_rewrite_batch() {
1494        let dir = tempfile::Builder::new()
1495            .prefix("test_large_rewrite_batch")
1496            .tempdir()
1497            .unwrap();
1498        let cfg = Config {
1499            dir: dir.path().to_str().unwrap().to_owned(),
1500            target_file_size: ReadableSize(1),
1501            ..Default::default()
1502        };
1503        let engine =
1504            RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
1505                .unwrap();
1506        let data = vec![b'x'; 2 * 1024 * 1024];
1507
1508        for rid in 1..=3 {
1509            engine.append(rid, 1, 11, Some(&data));
1510        }
1511
1512        let old_active_file = engine.file_span(LogQueue::Append).1;
1513        engine.purge_manager.must_rewrite_append_queue(None, None);
1514        assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1);
1515        let old_active_file = engine.file_span(LogQueue::Rewrite).1;
1516        engine.purge_manager.must_rewrite_rewrite_queue();
1517        assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file);
1518
1519        for rid in engine.raft_groups() {
1520            let mut total = 0;
1521            engine
1522                .scan_raw_messages(rid, None, None, false, |k, _| {
1523                    assert!(!crate::is_internal_key(k, None));
1524                    total += 1;
1525                    true
1526                })
1527                .unwrap();
1528            assert_eq!(total, 1);
1529        }
1530        assert_eq!(engine.raft_groups().len(), 3);
1531
1532        let engine = engine.reopen();
1533        for rid in 1..=3 {
1534            engine.scan_entries(rid, 1, 11, |_, _, d| {
1535                assert_eq!(d, &data);
1536            });
1537        }
1538    }
1539
1540    #[test]
1541    fn test_combination_of_version_and_recycle() {
1542        fn test_engine_ops(cfg_v1: &Config, cfg_v2: &Config) {
1543            let rid = 1;
1544            let data = vec![b'7'; 1024];
1545            {
1546                // open engine with format_version - Version::V1
1547                let engine = RaftLogEngine::open(cfg_v1.clone()).unwrap();
1548                engine.append(rid, 0, 20, Some(&data));
1549                let append_first = engine.file_span(LogQueue::Append).0;
1550                engine.compact_to(rid, 18);
1551                engine.purge_expired_files().unwrap();
1552                assert!(engine.file_span(LogQueue::Append).0 > append_first);
1553                assert_eq!(engine.first_index(rid).unwrap(), 18);
1554                assert_eq!(engine.last_index(rid).unwrap(), 19);
1555            }
1556            {
1557                // open engine with format_version - Version::V2
1558                let engine = RaftLogEngine::open(cfg_v2.clone()).unwrap();
1559                assert_eq!(engine.first_index(rid).unwrap(), 18);
1560                assert_eq!(engine.last_index(rid).unwrap(), 19);
1561                engine.append(rid, 20, 40, Some(&data));
1562                let append_first = engine.file_span(LogQueue::Append).0;
1563                engine.compact_to(rid, 38);
1564                engine.purge_expired_files().unwrap();
1565                assert!(engine.file_span(LogQueue::Append).0 > append_first);
1566                assert_eq!(engine.first_index(rid).unwrap(), 38);
1567                assert_eq!(engine.last_index(rid).unwrap(), 39);
1568            }
1569            {
1570                // reopen engine with format_version - Version::V1
1571                let engine = RaftLogEngine::open(cfg_v1.clone()).unwrap();
1572                assert_eq!(engine.first_index(rid).unwrap(), 38);
1573                assert_eq!(engine.last_index(rid).unwrap(), 39);
1574            }
1575        }
1576        // test engine on mutable versions
1577        {
1578            let dir = tempfile::Builder::new()
1579                .prefix("test_mutable_format_version")
1580                .tempdir()
1581                .unwrap();
1582            // config with v1
1583            let cfg_v1 = Config {
1584                dir: dir.path().to_str().unwrap().to_owned(),
1585                target_file_size: ReadableSize(1),
1586                purge_threshold: ReadableSize(1),
1587                format_version: Version::V1,
1588                enable_log_recycle: false,
1589                ..Default::default()
1590            };
1591            // config with v2
1592            let cfg_v2 = Config {
1593                dir: dir.path().to_str().unwrap().to_owned(),
1594                target_file_size: ReadableSize(1),
1595                purge_threshold: ReadableSize(1),
1596                format_version: Version::V2,
1597                enable_log_recycle: false,
1598                ..Default::default()
1599            };
1600            test_engine_ops(&cfg_v1, &cfg_v2);
1601        }
1602        // test engine when enable_log_recycle == true
1603        {
1604            let dir = tempfile::Builder::new()
1605                .prefix("test_enable_log_recycle")
1606                .tempdir()
1607                .unwrap();
1608            // config with v1
1609            let cfg_v1 = Config {
1610                dir: dir.path().to_str().unwrap().to_owned(),
1611                target_file_size: ReadableSize(1),
1612                purge_threshold: ReadableSize(1),
1613                format_version: Version::V1,
1614                enable_log_recycle: false,
1615                ..Default::default()
1616            };
1617            // config with v2
1618            let cfg_v2 = Config {
1619                dir: dir.path().to_str().unwrap().to_owned(),
1620                target_file_size: ReadableSize(1),
1621                purge_threshold: ReadableSize(1),
1622                format_version: Version::V2,
1623                enable_log_recycle: true,
1624                prefill_for_recycle: true,
1625                ..Default::default()
1626            };
1627            test_engine_ops(&cfg_v1, &cfg_v2);
1628        }
1629    }
1630
1631    /// Test cases related to tools ///
1632
1633    #[test]
1634    fn test_dump_file_or_directory() {
1635        let dir = tempfile::Builder::new()
1636            .prefix("test_dump_file_or_directory")
1637            .tempdir()
1638            .unwrap();
1639        let entry_data = vec![b'x'; 1024];
1640        let fs = Arc::new(ObfuscatedFileSystem::default());
1641
1642        let mut batches = vec![vec![LogBatch::default()]];
1643        let mut batch = LogBatch::default();
1644        batch
1645            .add_entries::<Entry>(7, &generate_entries(1, 11, Some(&entry_data)))
1646            .unwrap();
1647        batch.add_command(7, Command::Clean);
1648        batch.put(7, b"key".to_vec(), b"value".to_vec()).unwrap();
1649        batch.delete(7, b"key2".to_vec());
1650        batches.push(vec![batch.clone()]);
1651        let mut batch2 = LogBatch::default();
1652        batch2.put(8, b"key3".to_vec(), b"value".to_vec()).unwrap();
1653        batch2
1654            .add_entries::<Entry>(8, &generate_entries(5, 15, Some(&entry_data)))
1655            .unwrap();
1656        batches.push(vec![batch, batch2]);
1657
1658        let cfg = Config {
1659            dir: dir.path().to_str().unwrap().to_owned(),
1660            ..Default::default()
1661        };
1662
1663        let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
1664        for bs in batches.iter_mut() {
1665            for batch in bs.iter_mut() {
1666                engine.write(batch, false).unwrap();
1667            }
1668
1669            engine.sync().unwrap();
1670        }
1671
1672        drop(engine);
1673        //dump dir with raft groups. 8 element in raft groups 7 and 2 elements in raft
1674        // groups 8
1675        let dump_it = Engine::dump_with_file_system(dir.path(), fs.clone()).unwrap();
1676        let total = dump_it
1677            .inspect(|i| {
1678                i.as_ref().unwrap();
1679            })
1680            .count();
1681        assert!(total == 10);
1682
1683        //dump file
1684        let file_id = FileId {
1685            queue: LogQueue::Rewrite,
1686            seq: 1,
1687        };
1688        let dump_it = Engine::dump_with_file_system(
1689            file_id.build_file_path(dir.path()).as_path(),
1690            fs.clone(),
1691        )
1692        .unwrap();
1693        let total = dump_it
1694            .inspect(|i| {
1695                i.as_ref().unwrap();
1696            })
1697            .count();
1698        assert!(0 == total);
1699
1700        //dump dir that does not exists
1701        assert!(Engine::dump_with_file_system(Path::new("/not_exists_dir"), fs.clone()).is_err());
1702
1703        //dump file that does not exists
1704        let mut not_exists_file = PathBuf::from(dir.as_ref());
1705        not_exists_file.push("not_exists_file");
1706        assert!(Engine::dump_with_file_system(not_exists_file.as_path(), fs).is_err());
1707    }
1708
1709    #[cfg(feature = "scripting")]
1710    #[test]
1711    fn test_repair_default() {
1712        let dir = tempfile::Builder::new()
1713            .prefix("test_repair_default")
1714            .tempdir()
1715            .unwrap();
1716        let entry_data = vec![b'x'; 128];
1717        let cfg = Config {
1718            dir: dir.path().to_str().unwrap().to_owned(),
1719            target_file_size: ReadableSize(1), // Create lots of files.
1720            ..Default::default()
1721        };
1722        let fs = Arc::new(ObfuscatedFileSystem::default());
1723
1724        let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1725        for rid in 1..=50 {
1726            engine.append(rid, 1, 6, Some(&entry_data));
1727        }
1728        for rid in 25..=50 {
1729            engine.append(rid, 6, 11, Some(&entry_data));
1730        }
1731        drop(engine);
1732
1733        let script1 = "".to_owned();
1734        RaftLogEngine::unsafe_repair_with_file_system(
1735            dir.path(),
1736            None, /* queue */
1737            script1,
1738            fs.clone(),
1739        )
1740        .unwrap();
1741        let script2 = "
1742            fn filter_append(id, first, count, rewrite_count, queue, ifirst, ilast) {
1743                0
1744            }
1745            fn filter_compact(id, first, count, rewrite_count, queue, compact_to) {
1746                0
1747            }
1748            fn filter_clean(id, first, count, rewrite_count, queue) {
1749                0
1750            }
1751        "
1752        .to_owned();
1753        RaftLogEngine::unsafe_repair_with_file_system(
1754            dir.path(),
1755            None, /* queue */
1756            script2,
1757            fs.clone(),
1758        )
1759        .unwrap();
1760
1761        let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1762        for rid in 1..25 {
1763            engine.scan_entries(rid, 1, 6, |_, _, d| {
1764                assert_eq!(d, &entry_data);
1765            });
1766        }
1767        for rid in 25..=50 {
1768            engine.scan_entries(rid, 1, 11, |_, _, d| {
1769                assert_eq!(d, &entry_data);
1770            });
1771        }
1772    }
1773
1774    #[cfg(feature = "scripting")]
1775    #[test]
1776    fn test_repair_discard_entries() {
1777        let dir = tempfile::Builder::new()
1778            .prefix("test_repair_discard")
1779            .tempdir()
1780            .unwrap();
1781        let entry_data = vec![b'x'; 128];
1782        let cfg = Config {
1783            dir: dir.path().to_str().unwrap().to_owned(),
1784            target_file_size: ReadableSize(1), // Create lots of files.
1785            ..Default::default()
1786        };
1787        let fs = Arc::new(ObfuscatedFileSystem::default());
1788
1789        let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1790        for rid in 1..=50 {
1791            engine.append(rid, 1, 6, Some(&entry_data));
1792        }
1793        for rid in 25..=50 {
1794            engine.append(rid, 6, 11, Some(&entry_data));
1795        }
1796        drop(engine);
1797
1798        let incoming_emptied = [1, 25];
1799        let existing_emptied = [2, 26];
1800        let script = "
1801            fn filter_append(id, first, count, rewrite_count, queue, ifirst, ilast) {
1802                if id == 1 {
1803                    return 1;
1804                } else if id == 2 {
1805                    return 2;
1806                } else if id == 25 {
1807                    return 1;
1808                } else if id == 26 {
1809                    return 2;
1810                }
1811                0 // default
1812            }
1813        "
1814        .to_owned();
1815        RaftLogEngine::unsafe_repair_with_file_system(
1816            dir.path(),
1817            None, /* queue */
1818            script,
1819            fs.clone(),
1820        )
1821        .unwrap();
1822
1823        let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1824        for rid in 1..25 {
1825            if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) {
1826                continue;
1827            }
1828            engine.scan_entries(rid, 1, 6, |_, _, d| {
1829                assert_eq!(d, &entry_data);
1830            });
1831        }
1832        for rid in 25..=50 {
1833            if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) {
1834                continue;
1835            }
1836            engine.scan_entries(rid, 1, 11, |_, _, d| {
1837                assert_eq!(d, &entry_data);
1838            });
1839        }
1840        for rid in existing_emptied {
1841            let first_index = if rid < 25 { 1 } else { 6 };
1842            let last_index = if rid < 25 { 5 } else { 10 };
1843            engine.scan_entries(rid, first_index, last_index + 1, |_, _, d| {
1844                assert_eq!(d, &entry_data);
1845            });
1846        }
1847        for rid in incoming_emptied {
1848            let last_index = if rid < 25 { 5 } else { 10 };
1849            assert_eq!(engine.first_index(rid), None);
1850            assert_eq!(engine.last_index(rid), None);
1851            assert_eq!(engine.decode_last_index(rid), Some(last_index));
1852        }
1853    }
1854
1855    #[test]
1856    fn test_tail_corruption() {
1857        let dir = tempfile::Builder::new()
1858            .prefix("test_tail_corruption")
1859            .tempdir()
1860            .unwrap();
1861        let entry_data = vec![b'x'; 16];
1862        let cfg = Config {
1863            dir: dir.path().to_str().unwrap().to_owned(),
1864            // One big file.
1865            target_file_size: ReadableSize::gb(10),
1866            ..Default::default()
1867        };
1868        let fs = Arc::new(ObfuscatedFileSystem::default());
1869
1870        let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1871        for rid in 1..=50 {
1872            engine.append(rid, 1, 6, Some(&entry_data));
1873        }
1874        for rid in 25..=50 {
1875            engine.append(rid, 6, 11, Some(&entry_data));
1876        }
1877        let (_, last_file_seq) = engine.file_span(LogQueue::Append);
1878        drop(engine);
1879
1880        let last_file = FileId {
1881            queue: LogQueue::Append,
1882            seq: last_file_seq,
1883        };
1884        let f = OpenOptions::new()
1885            .write(true)
1886            .open(last_file.build_file_path(dir.path()))
1887            .unwrap();
1888
1889        // Corrupt a log batch.
1890        f.set_len(f.metadata().unwrap().len() - 1).unwrap();
1891        RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1892
1893        // Corrupt the file header.
1894        f.set_len(1).unwrap();
1895        RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1896    }
1897
1898    #[test]
1899    fn test_reopen_with_wrong_file_system() {
1900        let dir = tempfile::Builder::new()
1901            .prefix("test_reopen_with_wrong_file_system")
1902            .tempdir()
1903            .unwrap();
1904        let entry_data = vec![b'x'; 128];
1905        let cfg = Config {
1906            dir: dir.path().to_str().unwrap().to_owned(),
1907            target_file_size: ReadableSize(1),
1908            ..Default::default()
1909        };
1910        let fs = Arc::new(ObfuscatedFileSystem::default());
1911
1912        let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
1913        for rid in 1..=10 {
1914            engine.append(rid, 1, 11, Some(&entry_data));
1915        }
1916        drop(engine);
1917
1918        assert!(RaftLogEngine::open(cfg.clone()).is_err());
1919
1920        let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1921        for rid in 1..10 {
1922            engine.scan_entries(rid, 1, 11, |_, _, d| {
1923                assert_eq!(d, &entry_data);
1924            });
1925        }
1926    }
1927
1928    #[cfg(feature = "nightly")]
1929    #[bench]
1930    fn bench_engine_fetch_entries(b: &mut test::Bencher) {
1931        use rand::{thread_rng, Rng};
1932
1933        let dir = tempfile::Builder::new()
1934            .prefix("bench_engine_fetch_entries")
1935            .tempdir()
1936            .unwrap();
1937        let entry_data = vec![b'x'; 1024];
1938        let cfg = Config {
1939            dir: dir.path().to_str().unwrap().to_owned(),
1940            ..Default::default()
1941        };
1942        let engine = RaftLogEngine::open(cfg).unwrap();
1943        for i in 0..10 {
1944            for rid in 1..=100 {
1945                engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
1946            }
1947        }
1948        let mut vec: Vec<Entry> = Vec::new();
1949        b.iter(move || {
1950            let region_id = thread_rng().gen_range(1..=100);
1951            engine
1952                .fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
1953                .unwrap();
1954            vec.clear();
1955        });
1956    }
1957
1958    #[test]
1959    fn test_engine_is_empty() {
1960        let dir = tempfile::Builder::new()
1961            .prefix("test_engine_is_empty")
1962            .tempdir()
1963            .unwrap();
1964        let entry_data = vec![b'x'; 128];
1965        let cfg = Config {
1966            dir: dir.path().to_str().unwrap().to_owned(),
1967            ..Default::default()
1968        };
1969        let fs = Arc::new(ObfuscatedFileSystem::default());
1970        let rid = 1;
1971
1972        let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
1973        assert!(engine.is_empty());
1974        engine.append(rid, 1, 11, Some(&entry_data));
1975        assert!(!engine.is_empty());
1976
1977        let mut log_batch = LogBatch::default();
1978        log_batch.add_command(rid, Command::Compact { index: 11 });
1979        log_batch.delete(rid, b"last_index".to_vec());
1980        engine.write(&mut log_batch, true).unwrap();
1981        assert!(!engine.is_empty());
1982
1983        engine.clean(rid);
1984        assert!(engine.is_empty());
1985    }
1986
1987    pub struct DeleteMonitoredFileSystem {
1988        inner: ObfuscatedFileSystem,
1989        append_metadata: Mutex<BTreeSet<u64>>,
1990        reserved_metadata: Mutex<BTreeSet<u64>>,
1991    }
1992
1993    impl DeleteMonitoredFileSystem {
1994        fn new() -> Self {
1995            Self {
1996                inner: ObfuscatedFileSystem::default(),
1997                append_metadata: Mutex::new(BTreeSet::new()),
1998                reserved_metadata: Mutex::new(BTreeSet::new()),
1999            }
2000        }
2001
2002        fn update_metadata(&self, path: &Path, delete: bool) -> bool {
2003            let path = path.file_name().unwrap().to_str().unwrap();
2004            let parse_append = FileId::parse_file_name(path);
2005            let parse_reserved = parse_reserved_file_name(path);
2006            match (parse_append, parse_reserved) {
2007                (Some(id), None) if id.queue == LogQueue::Append => {
2008                    if delete {
2009                        self.append_metadata.lock().unwrap().remove(&id.seq)
2010                    } else {
2011                        self.append_metadata.lock().unwrap().insert(id.seq)
2012                    }
2013                }
2014                (None, Some(seq)) => {
2015                    if delete {
2016                        self.reserved_metadata.lock().unwrap().remove(&seq)
2017                    } else {
2018                        self.reserved_metadata.lock().unwrap().insert(seq)
2019                    }
2020                }
2021                _ => false,
2022            }
2023        }
2024    }
2025
2026    impl FileSystem for DeleteMonitoredFileSystem {
2027        type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
2028        type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
2029        type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
2030
2031        fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
2032            let handle = self.inner.create(&path)?;
2033            self.update_metadata(path.as_ref(), false);
2034            Ok(handle)
2035        }
2036
2037        fn open<P: AsRef<Path>>(&self, path: P, perm: Permission) -> std::io::Result<Self::Handle> {
2038            let handle = self.inner.open(&path, perm)?;
2039            self.update_metadata(path.as_ref(), false);
2040            Ok(handle)
2041        }
2042
2043        fn delete<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
2044            self.inner.delete(&path)?;
2045            self.update_metadata(path.as_ref(), true);
2046            Ok(())
2047        }
2048
2049        fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
2050            self.inner.rename(src_path.as_ref(), dst_path.as_ref())?;
2051            self.update_metadata(src_path.as_ref(), true);
2052            self.update_metadata(dst_path.as_ref(), false);
2053            Ok(())
2054        }
2055
2056        fn reuse<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
2057            self.inner.reuse(src_path.as_ref(), dst_path.as_ref())?;
2058            self.update_metadata(src_path.as_ref(), true);
2059            self.update_metadata(dst_path.as_ref(), false);
2060            Ok(())
2061        }
2062
2063        fn delete_metadata<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
2064            self.inner.delete_metadata(&path)?;
2065            self.update_metadata(path.as_ref(), true);
2066            Ok(())
2067        }
2068
2069        fn exists_metadata<P: AsRef<Path>>(&self, path: P) -> bool {
2070            if self.inner.exists_metadata(&path) {
2071                return true;
2072            }
2073            let path = path.as_ref().file_name().unwrap().to_str().unwrap();
2074            let parse_append = FileId::parse_file_name(path);
2075            let parse_reserved = parse_reserved_file_name(path);
2076            match (parse_append, parse_reserved) {
2077                (Some(id), None) if id.queue == LogQueue::Append => {
2078                    self.append_metadata.lock().unwrap().contains(&id.seq)
2079                }
2080                (None, Some(seq)) => self.reserved_metadata.lock().unwrap().contains(&seq),
2081                _ => false,
2082            }
2083        }
2084
2085        fn new_reader(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Reader> {
2086            self.inner.new_reader(h)
2087        }
2088
2089        fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
2090            self.inner.new_writer(h)
2091        }
2092    }
2093
2094    #[test]
2095    fn test_managed_file_deletion() {
2096        let dir = tempfile::Builder::new()
2097            .prefix("test_managed_file_deletion")
2098            .tempdir()
2099            .unwrap();
2100        let entry_data = vec![b'x'; 128];
2101        let cfg = Config {
2102            dir: dir.path().to_str().unwrap().to_owned(),
2103            target_file_size: ReadableSize(1),
2104            purge_threshold: ReadableSize(1),
2105            enable_log_recycle: false,
2106            ..Default::default()
2107        };
2108        let fs = Arc::new(DeleteMonitoredFileSystem::new());
2109        let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
2110        for rid in 1..=10 {
2111            engine.append(rid, 1, 11, Some(&entry_data));
2112        }
2113        for rid in 1..=5 {
2114            engine.clean(rid);
2115        }
2116        let (start, _) = engine.file_span(LogQueue::Append);
2117        engine.purge_expired_files().unwrap();
2118        // some active files have been deleted.
2119        assert!(start < engine.file_span(LogQueue::Append).0);
2120        // corresponding physical files have been deleted too.
2121        assert_eq!(engine.file_count(None), fs.inner.file_count());
2122        let start = engine.file_span(LogQueue::Append).0;
2123        // metadata have been deleted.
2124        assert_eq!(
2125            fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2126            &start
2127        );
2128
2129        let engine = engine.reopen();
2130        assert_eq!(engine.file_count(None), fs.inner.file_count());
2131        let (start, _) = engine.file_span(LogQueue::Append);
2132        assert_eq!(
2133            fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2134            &start
2135        );
2136
2137        // Simulate stale metadata.
2138        for i in start / 2..start {
2139            fs.append_metadata.lock().unwrap().insert(i);
2140        }
2141        let engine = engine.reopen();
2142        let (start, _) = engine.file_span(LogQueue::Append);
2143        assert_eq!(
2144            fs.append_metadata.lock().unwrap().iter().next().unwrap(),
2145            &start
2146        );
2147    }
2148
2149    #[test]
2150    fn test_managed_file_reuse() {
2151        let dir = tempfile::Builder::new()
2152            .prefix("test_managed_file_reuse")
2153            .tempdir()
2154            .unwrap();
2155        let entry_data = vec![b'x'; 16];
2156        let cfg = Config {
2157            dir: dir.path().to_str().unwrap().to_owned(),
2158            target_file_size: ReadableSize(1),
2159            purge_threshold: ReadableSize(50),
2160            format_version: Version::V2,
2161            enable_log_recycle: true,
2162            prefill_for_recycle: true,
2163            ..Default::default()
2164        };
2165        let recycle_capacity = cfg.recycle_capacity() as u64;
2166        let fs = Arc::new(DeleteMonitoredFileSystem::new());
2167        let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
2168
2169        let reserved_start = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2170        for rid in 1..=10 {
2171            engine.append(rid, 1, 11, Some(&entry_data));
2172        }
2173        for rid in 1..=10 {
2174            engine.clean(rid);
2175        }
2176        // Purge all files.
2177        engine.purge_manager.must_rewrite_append_queue(None, None);
2178        assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
2179        // Reserved files have been reused.
2180        let reserved_start_1 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2181        assert!(reserved_start < reserved_start_1);
2182        // Reuse more.
2183        for rid in 1..=5 {
2184            engine.append(rid, 1, 11, Some(&entry_data));
2185        }
2186        let reserved_start_2 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2187        assert!(reserved_start_1 < reserved_start_2);
2188
2189        let file_count = fs.inner.file_count();
2190        let start_1 = *fs.append_metadata.lock().unwrap().first().unwrap();
2191        let engine = engine.reopen();
2192        // Recycled files are reserved, but stale append files are renamed. The total
2193        // count should stay unchanged.
2194        assert_eq!(file_count, fs.inner.file_count());
2195        let start_2 = *fs.append_metadata.lock().unwrap().first().unwrap();
2196        assert!(start_1 < start_2);
2197        let reserved_start_3 = *fs.reserved_metadata.lock().unwrap().first().unwrap();
2198        assert_eq!(reserved_start_2, reserved_start_3);
2199
2200        // Reuse all of reserved files.
2201        for rid in 1..=recycle_capacity {
2202            engine.append(rid, 1, 11, Some(&entry_data));
2203        }
2204        assert!(fs.reserved_metadata.lock().unwrap().is_empty());
2205        for rid in 1..=recycle_capacity {
2206            engine.clean(rid);
2207        }
2208        engine.purge_manager.must_rewrite_append_queue(None, None);
2209        // Then reuse a recycled append file.
2210        engine.append(1, 1, 11, Some(&entry_data));
2211        assert_eq!(engine.file_count(Some(LogQueue::Append)), 2);
2212        let start_3 = *fs.append_metadata.lock().unwrap().first().unwrap();
2213        assert!(start_2 < start_3);
2214    }
2215
2216    #[test]
2217    fn test_simple_write_perf_context() {
2218        let dir = tempfile::Builder::new()
2219            .prefix("test_simple_write_perf_context")
2220            .tempdir()
2221            .unwrap();
2222        let cfg = Config {
2223            dir: dir.path().to_str().unwrap().to_owned(),
2224            ..Default::default()
2225        };
2226        let rid = 1;
2227        let entry_size = 5120;
2228        let engine = RaftLogEngine::open(cfg).unwrap();
2229        let data = vec![b'x'; entry_size];
2230        let old_perf_context = get_perf_context();
2231        engine.append(rid, 1, 5, Some(&data));
2232        let new_perf_context = get_perf_context();
2233        assert_ne!(
2234            old_perf_context.log_populating_duration,
2235            new_perf_context.log_populating_duration
2236        );
2237        assert_ne!(
2238            old_perf_context.log_write_duration,
2239            new_perf_context.log_write_duration
2240        );
2241        assert_ne!(
2242            old_perf_context.apply_duration,
2243            new_perf_context.apply_duration
2244        );
2245    }
2246
2247    #[test]
2248    fn test_recycle_no_signing_files() {
2249        let dir = tempfile::Builder::new()
2250            .prefix("test_recycle_no_signing_files")
2251            .tempdir()
2252            .unwrap();
2253        let entry_data = vec![b'x'; 128];
2254        let fs = Arc::new(DeleteMonitoredFileSystem::new());
2255        let cfg_v1 = Config {
2256            dir: dir.path().to_str().unwrap().to_owned(),
2257            target_file_size: ReadableSize(1),
2258            purge_threshold: ReadableSize(1024),
2259            format_version: Version::V1,
2260            enable_log_recycle: false,
2261            ..Default::default()
2262        };
2263        let cfg_v2 = Config {
2264            dir: dir.path().to_str().unwrap().to_owned(),
2265            target_file_size: ReadableSize(1),
2266            purge_threshold: ReadableSize(15),
2267            format_version: Version::V2,
2268            enable_log_recycle: true,
2269            prefill_for_recycle: false,
2270            ..Default::default()
2271        };
2272        assert!(cfg_v2.recycle_capacity() > 0);
2273        // Prepare files with format_version V1
2274        {
2275            let engine = RaftLogEngine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap();
2276            for rid in 1..=10 {
2277                engine.append(rid, 1, 11, Some(&entry_data));
2278            }
2279        }
2280        // Reopen the Engine with V2 and purge
2281        {
2282            let engine = RaftLogEngine::open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap();
2283            let (start, _) = engine.file_span(LogQueue::Append);
2284            for rid in 6..=10 {
2285                engine.append(rid, 11, 20, Some(&entry_data));
2286            }
2287            // Mark region_id -> 6 obsolete.
2288            engine.clean(6);
2289            // the [1, 12] files are recycled
2290            engine.purge_expired_files().unwrap();
2291            assert_eq!(engine.file_count(Some(LogQueue::Append)), 5);
2292            assert!(start < engine.file_span(LogQueue::Append).0);
2293        }
2294        // Reopen the Engine with V1 -> V2 and purge
2295        {
2296            let engine = RaftLogEngine::open_with_file_system(cfg_v1, fs.clone()).unwrap();
2297            let (start, _) = engine.file_span(LogQueue::Append);
2298            for rid in 6..=10 {
2299                engine.append(rid, 20, 30, Some(&entry_data));
2300            }
2301            for rid in 6..=10 {
2302                engine.append(rid, 30, 40, Some(&entry_data));
2303            }
2304            for rid in 1..=5 {
2305                engine.append(rid, 11, 20, Some(&entry_data));
2306            }
2307            assert_eq!(engine.file_span(LogQueue::Append).0, start);
2308            let file_count = engine.file_count(Some(LogQueue::Append));
2309            drop(engine);
2310            let engine = RaftLogEngine::open_with_file_system(cfg_v2, fs).unwrap();
2311            assert_eq!(engine.file_span(LogQueue::Append).0, start);
2312            assert_eq!(engine.file_count(Some(LogQueue::Append)), file_count);
2313            // Mark all regions obsolete.
2314            for rid in 1..=10 {
2315                engine.clean(rid);
2316            }
2317            let (start, _) = engine.file_span(LogQueue::Append);
2318            // the [13, 32] files are purged
2319            engine.purge_expired_files().unwrap();
2320            assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
2321            assert!(engine.file_span(LogQueue::Append).0 > start);
2322        }
2323    }
2324
2325    #[test]
2326    fn test_start_engine_with_resize_recycle_capacity() {
2327        let dir = tempfile::Builder::new()
2328            .prefix("test_start_engine_with_resize_recycle_capacity")
2329            .tempdir()
2330            .unwrap();
2331        let path = dir.path().to_str().unwrap();
2332        let file_system = Arc::new(DeleteMonitoredFileSystem::new());
2333        let entry_data = vec![b'x'; 512];
2334
2335        // Case 1: start an engine with no-recycle.
2336        let cfg = Config {
2337            dir: path.to_owned(),
2338            enable_log_recycle: false,
2339            ..Default::default()
2340        };
2341        let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap();
2342        let (start, _) = engine.file_span(LogQueue::Append);
2343        // Only one valid file left, the last one => active_file.
2344        assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
2345        assert_eq!(file_system.inner.file_count(), engine.file_count(None));
2346        // Append data.
2347        for rid in 1..=5 {
2348            engine.append(rid, 1, 10, Some(&entry_data));
2349        }
2350        assert_eq!(engine.file_span(LogQueue::Append).0, start);
2351        assert_eq!(file_system.inner.file_count(), engine.file_count(None));
2352        drop(engine);
2353
2354        // Case 2: restart the engine with a common size of recycling capacity.
2355        let cfg = Config {
2356            dir: path.to_owned(),
2357            target_file_size: ReadableSize(1),
2358            purge_threshold: ReadableSize(80), // common size of capacity
2359            enable_log_recycle: true,
2360            prefill_for_recycle: true,
2361            ..Default::default()
2362        };
2363        let engine =
2364            RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
2365        let (start, end) = engine.file_span(LogQueue::Append);
2366        // Only one valid file left, the last one => active_file.
2367        assert_eq!(start, end);
2368        let recycled_count = file_system.inner.file_count() - engine.file_count(None);
2369        assert!(recycled_count > 0);
2370        // Append data. Several recycled files have been reused.
2371        for rid in 1..=5 {
2372            engine.append(rid, 10, 20, Some(&entry_data));
2373        }
2374        assert_eq!(engine.file_span(LogQueue::Append).0, start);
2375        assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
2376        let (start, end) = engine.file_span(LogQueue::Append);
2377        let recycled_count = file_system.inner.file_count() - engine.file_count(None);
2378        drop(engine);
2379
2380        // Case 3: restart the engine with a smaller capacity. Redundant recycled files
2381        // will be cleared.
2382        let cfg_v2 = Config {
2383            target_file_size: ReadableSize(1),
2384            purge_threshold: ReadableSize(50),
2385            ..cfg
2386        };
2387        let engine =
2388            RaftLogEngine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap();
2389        assert_eq!(engine.file_span(LogQueue::Append), (start, end));
2390        assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
2391        // Recycled files have filled the LogQueue::Append, purge_expired_files won't
2392        // truely remove files from it.
2393        engine.purge_expired_files().unwrap();
2394        assert_eq!(engine.file_span(LogQueue::Append), (start, end));
2395        for rid in 1..=10 {
2396            engine.append(rid, 20, 31, Some(&entry_data));
2397        }
2398        assert!(engine.file_span(LogQueue::Append).1 > end);
2399        let engine = engine.reopen();
2400        assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None));
2401        drop(engine);
2402
2403        // Case 4: restart the engine without log recycling. Recycled logs should be
2404        // cleared.
2405        let cfg_v3 = Config {
2406            target_file_size: ReadableSize::kb(2),
2407            purge_threshold: ReadableSize::kb(100),
2408            enable_log_recycle: false,
2409            prefill_for_recycle: false,
2410            ..cfg_v2
2411        };
2412        let engine = RaftLogEngine::open_with_file_system(cfg_v3, file_system.clone()).unwrap();
2413        assert_eq!(file_system.inner.file_count(), engine.file_count(None));
2414    }
2415
2416    #[test]
2417    fn test_rewrite_atomic_group() {
2418        let dir = tempfile::Builder::new()
2419            .prefix("test_rewrite_atomic_group")
2420            .tempdir()
2421            .unwrap();
2422        let cfg = Config {
2423            dir: dir.path().to_str().unwrap().to_owned(),
2424            // Make sure each file gets replayed individually.
2425            recovery_threads: 100,
2426            target_file_size: ReadableSize(1),
2427            ..Default::default()
2428        };
2429        let fs = Arc::new(ObfuscatedFileSystem::default());
2430        let key = vec![b'x'; 2];
2431        let value = vec![b'y'; 8];
2432
2433        let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
2434        let mut data = HashSet::new();
2435        let mut rid = 1;
2436        // Directly write to pipe log.
2437        let mut log_batch = LogBatch::default();
2438        let flush = |lb: &mut LogBatch| {
2439            lb.finish_populate(0, None).unwrap();
2440            engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap();
2441            lb.drain();
2442        };
2443        {
2444            // begin.
2445            let mut builder = AtomicGroupBuilder::with_id(3);
2446            builder.begin(&mut log_batch);
2447            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2448            flush(&mut log_batch);
2449            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2450        }
2451        {
2452            // begin - unrelated - end.
2453            let mut builder = AtomicGroupBuilder::with_id(3);
2454            builder.begin(&mut log_batch);
2455            rid += 1;
2456            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2457            data.insert(rid);
2458            flush(&mut log_batch);
2459            // plug a unrelated write.
2460            rid += 1;
2461            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2462            data.insert(rid);
2463            flush(&mut log_batch);
2464            builder.end(&mut log_batch);
2465            rid += 1;
2466            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2467            data.insert(rid);
2468            flush(&mut log_batch);
2469            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2470        }
2471        {
2472            // begin - middle - middle - end.
2473            let mut builder = AtomicGroupBuilder::with_id(3);
2474            builder.begin(&mut log_batch);
2475            rid += 1;
2476            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2477            data.insert(rid);
2478            flush(&mut log_batch);
2479            builder.add(&mut log_batch);
2480            rid += 1;
2481            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2482            data.insert(rid);
2483            flush(&mut log_batch);
2484            builder.add(&mut log_batch);
2485            rid += 1;
2486            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2487            data.insert(rid);
2488            flush(&mut log_batch);
2489            builder.end(&mut log_batch);
2490            rid += 1;
2491            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2492            data.insert(rid);
2493            flush(&mut log_batch);
2494            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2495        }
2496        {
2497            // begin - begin - end.
2498            let mut builder = AtomicGroupBuilder::with_id(3);
2499            builder.begin(&mut log_batch);
2500            rid += 1;
2501            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2502            flush(&mut log_batch);
2503            let mut builder = AtomicGroupBuilder::with_id(3);
2504            builder.begin(&mut log_batch);
2505            rid += 1;
2506            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2507            data.insert(rid);
2508            flush(&mut log_batch);
2509            builder.end(&mut log_batch);
2510            rid += 1;
2511            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2512            data.insert(rid);
2513            flush(&mut log_batch);
2514            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2515        }
2516        {
2517            // end - middle - end.
2518            // We must change id to avoid getting merged with last group.
2519            // It is actually not possible in real life to only have "begin" missing.
2520            let mut builder = AtomicGroupBuilder::with_id(4);
2521            builder.begin(&mut LogBatch::default());
2522            builder.end(&mut log_batch);
2523            rid += 1;
2524            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2525            flush(&mut log_batch);
2526            let mut builder = AtomicGroupBuilder::with_id(4);
2527            builder.begin(&mut LogBatch::default());
2528            builder.add(&mut log_batch);
2529            rid += 1;
2530            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2531            flush(&mut log_batch);
2532            builder.end(&mut log_batch);
2533            rid += 1;
2534            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2535            flush(&mut log_batch);
2536            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2537        }
2538        {
2539            // end - begin - end
2540            let mut builder = AtomicGroupBuilder::with_id(5);
2541            builder.begin(&mut LogBatch::default());
2542            builder.end(&mut log_batch);
2543            rid += 1;
2544            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2545            flush(&mut log_batch);
2546            let mut builder = AtomicGroupBuilder::with_id(5);
2547            builder.begin(&mut log_batch);
2548            rid += 1;
2549            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2550            data.insert(rid);
2551            flush(&mut log_batch);
2552            builder.end(&mut log_batch);
2553            rid += 1;
2554            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2555            data.insert(rid);
2556            flush(&mut log_batch);
2557            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2558        }
2559        {
2560            // begin - end - begin - end.
2561            let mut builder = AtomicGroupBuilder::with_id(6);
2562            builder.begin(&mut log_batch);
2563            rid += 1;
2564            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2565            data.insert(rid);
2566            flush(&mut log_batch);
2567            builder.end(&mut log_batch);
2568            flush(&mut log_batch);
2569            let mut builder = AtomicGroupBuilder::with_id(7);
2570            builder.begin(&mut log_batch);
2571            flush(&mut log_batch);
2572            builder.end(&mut log_batch);
2573            rid += 1;
2574            log_batch.put(rid, key.clone(), value.clone()).unwrap();
2575            data.insert(rid);
2576            flush(&mut log_batch);
2577            engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
2578        }
2579        engine.pipe_log.sync(LogQueue::Rewrite).unwrap();
2580
2581        let engine = engine.reopen();
2582        for rid in engine.raft_groups() {
2583            assert!(data.remove(&rid), "{}", rid);
2584            assert_eq!(engine.get(rid, &key).unwrap(), value);
2585        }
2586        assert!(data.is_empty(), "data loss {:?}", data);
2587    }
2588
2589    #[test]
2590    fn test_internal_key_filter() {
2591        let dir = tempfile::Builder::new()
2592            .prefix("test_internal_key_filter")
2593            .tempdir()
2594            .unwrap();
2595        let cfg = Config {
2596            dir: dir.path().to_str().unwrap().to_owned(),
2597            ..Default::default()
2598        };
2599        let fs = Arc::new(ObfuscatedFileSystem::default());
2600        let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
2601        let value = vec![b'y'; 8];
2602        let mut log_batch = LogBatch::default();
2603        log_batch.put_unchecked(1, crate::make_internal_key(&[1]), value.clone());
2604        log_batch.put_unchecked(2, crate::make_internal_key(&[1]), value.clone());
2605        engine.write(&mut log_batch, false).unwrap();
2606        // Apply of append filtered.
2607        assert!(engine.raft_groups().is_empty());
2608
2609        let engine = engine.reopen();
2610        // Replay of append filtered.
2611        assert!(engine.raft_groups().is_empty());
2612
2613        log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone());
2614        log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value);
2615        log_batch.finish_populate(0, None).unwrap();
2616        let block_handle = engine
2617            .pipe_log
2618            .append(LogQueue::Rewrite, &mut log_batch)
2619            .unwrap();
2620        log_batch.finish_write(block_handle);
2621        engine
2622            .memtables
2623            .apply_rewrite_writes(log_batch.drain(), None, 0);
2624        // Apply of rewrite filtered.
2625        assert!(engine.raft_groups().is_empty());
2626
2627        let engine = engine.reopen();
2628        // Replay of rewrite filtered.
2629        assert!(engine.raft_groups().is_empty());
2630    }
2631
2632    #[test]
2633    fn test_start_engine_with_multi_dirs() {
2634        let dir = tempfile::Builder::new()
2635            .prefix("test_start_engine_with_multi_dirs_default")
2636            .tempdir()
2637            .unwrap();
2638        let spill_dir = tempfile::Builder::new()
2639            .prefix("test_start_engine_with_multi_dirs_spill")
2640            .tempdir()
2641            .unwrap();
2642        fn number_of_files(p: &Path) -> usize {
2643            let mut r = 0;
2644            std::fs::read_dir(p).unwrap().for_each(|e| {
2645                if e.unwrap()
2646                    .path()
2647                    .file_name()
2648                    .unwrap()
2649                    .to_str()
2650                    .unwrap()
2651                    .starts_with("000")
2652                {
2653                    r += 1;
2654                }
2655            });
2656            r
2657        }
2658        let file_system = Arc::new(DeleteMonitoredFileSystem::new());
2659        let entry_data = vec![b'x'; 512];
2660
2661        // Preparations for multi-dirs.
2662        let cfg = Config {
2663            dir: dir.path().to_str().unwrap().to_owned(),
2664            spill_dir: Some(spill_dir.path().to_str().unwrap().to_owned()),
2665            enable_log_recycle: false,
2666            target_file_size: ReadableSize(1),
2667            ..Default::default()
2668        };
2669        {
2670            // Step 1: write data into the main directory.
2671            let engine =
2672                RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
2673            for rid in 1..=10 {
2674                engine.append(rid, 1, 10, Some(&entry_data));
2675            }
2676            drop(engine);
2677
2678            // Step 2: select several log files and move them into the `spill_dir`
2679            // directory.
2680            let mut moved = 0;
2681            for e in std::fs::read_dir(dir.path()).unwrap() {
2682                let p = e.unwrap().path();
2683                let file_name = p.file_name().unwrap().to_str().unwrap();
2684                if let Some(FileId {
2685                    queue: LogQueue::Append,
2686                    seq: _,
2687                }) = FileId::parse_file_name(file_name)
2688                {
2689                    file_system
2690                        .rename(&p, &spill_dir.path().join(file_name))
2691                        .unwrap();
2692                    moved += 1;
2693                    if moved == 4 {
2694                        break;
2695                    }
2696                }
2697            }
2698        }
2699
2700        // Restart the engine with recycle and prefill. Test reusing files from both
2701        // dirs.
2702        let cfg_2 = Config {
2703            enable_log_recycle: true,
2704            prefill_for_recycle: true,
2705            purge_threshold: ReadableSize(40),
2706            ..cfg.clone()
2707        };
2708        let recycle_capacity = cfg_2.recycle_capacity() as u64;
2709        let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap();
2710        assert!(number_of_files(spill_dir.path()) > 0);
2711        for rid in 1..=10 {
2712            assert_eq!(engine.first_index(rid).unwrap(), 1);
2713            engine.clean(rid);
2714        }
2715        engine.purge_manager.must_rewrite_append_queue(None, None);
2716        let file_count = file_system.inner.file_count();
2717        assert_eq!(
2718            number_of_files(spill_dir.path()) + number_of_files(dir.path()),
2719            file_count
2720        );
2721        assert!(file_count > engine.file_count(None));
2722        // Append data, recycled files are reused.
2723        for rid in 1..=recycle_capacity - 10 {
2724            engine.append(rid, 20, 30, Some(&entry_data));
2725        }
2726        // No new file is created.
2727        assert_eq!(file_count, file_system.inner.file_count());
2728        assert!(number_of_files(spill_dir.path()) > 0);
2729
2730        let cfg_3 = Config {
2731            enable_log_recycle: false,
2732            purge_threshold: ReadableSize(40),
2733            ..cfg
2734        };
2735        drop(engine);
2736        let engine = RaftLogEngine::open_with_file_system(cfg_3, file_system).unwrap();
2737        assert!(number_of_files(spill_dir.path()) > 0);
2738        for rid in 1..=10 {
2739            assert_eq!(engine.first_index(rid).unwrap(), 20);
2740        }
2741
2742        // abnormal case - duplicate FileSeq among different dirs.
2743        {
2744            // Prerequisite: choose several files and duplicate them to main dir.
2745            let mut file_count = 0;
2746            for e in std::fs::read_dir(spill_dir.path()).unwrap() {
2747                let p = e.unwrap().path();
2748                let file_name = p.file_name().unwrap().to_str().unwrap();
2749                if let Some(FileId {
2750                    queue: LogQueue::Append,
2751                    seq: _,
2752                }) = FileId::parse_file_name(file_name)
2753                {
2754                    if file_count % 2 == 0 {
2755                        std::fs::copy(&p, dir.path().join(file_name)).unwrap();
2756                    }
2757                    file_count += 1;
2758                }
2759            }
2760        }
2761        let start = engine.file_span(LogQueue::Append).0;
2762        let engine = engine.reopen();
2763        // Duplicate log files will be skipped and cleared.
2764        assert!(engine.file_span(LogQueue::Append).0 > start);
2765    }
2766}