db_rs/
compacter.rs

1use crate::{Db, DbResult};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4use std::thread;
5use std::thread::JoinHandle;
6use std::time::Duration;
7
8#[derive(Default, Clone)]
9pub struct CancelSig(Arc<AtomicBool>);
10
11impl CancelSig {
12    pub fn cancel(&self) {
13        self.0.store(true, Ordering::Relaxed);
14    }
15
16    pub fn is_canceled(&self) -> bool {
17        self.0.load(Ordering::Relaxed)
18    }
19}
20
21pub trait BackgroundCompacter {
22    /// Periodically compact the database log in a separate thread
23    /// You can call this function if your db is wrapped in an `Arc<Mutex>`
24    ///
25    /// freq determines how often the background thread will aquire a mutex
26    /// and call compact_log() on your db
27    ///
28    /// cancel is an AtomicBool which can be passed in and signal that compaction
29    /// should cease (could take up-to freq to return)
30    ///
31    /// this fn returns the number of times compaction took place
32    fn begin_compacter(&self, freq: Duration, cancel: CancelSig) -> JoinHandle<DbResult<usize>>;
33}
34
35impl<D> BackgroundCompacter for Arc<Mutex<D>>
36where
37    D: Db + Send + Sync + 'static,
38{
39    fn begin_compacter(&self, freq: Duration, cancel: CancelSig) -> JoinHandle<DbResult<usize>> {
40        let db = self.clone();
41        thread::spawn(move || {
42            let mut count = 0;
43            loop {
44                thread::sleep(freq);
45
46                if cancel.is_canceled() {
47                    return Ok(count);
48                }
49
50                db.lock()?.compact_log()?;
51                count += 1;
52            }
53        })
54    }
55}