adana_db/
file_db.rs

1use std::{
2    collections::BTreeMap,
3    path::PathBuf,
4    sync::{
5        Arc, Mutex, MutexGuard,
6        mpsc::{Receiver, Sender},
7    },
8    thread::JoinHandle,
9    vec,
10};
11
12use log::{debug, error, trace};
13use serde::de::DeserializeOwned;
14
15use super::{DbOp, FileLock, InMemoryDb, Key, Op, Value};
16
17pub(super) enum Notify {
18    Update,
19    FullFlush,
20    Stop,
21}
22
23#[derive(Debug)]
24pub struct FileDb<K: Key, V: Value> {
25    __inner: Arc<Mutex<InMemoryDb<K, V>>>,
26    __event_sender: Sender<Notify>,
27    __thread_handle: Option<JoinHandle<()>>,
28    __file_lock: Arc<FileLock>,
29}
30
31#[derive(Debug)]
32pub(super) struct FileDbConfig<K: Key, V: Value> {
33    pub(super) inner: Arc<Mutex<InMemoryDb<K, V>>>,
34    pub(super) file_lock: Arc<FileLock>,
35}
36
37trait GuardedDb<K: Key, V: Value> {
38    fn get_guard(&self) -> Option<MutexGuard<InMemoryDb<K, V>>>;
39    fn get_sender(&self) -> &Sender<Notify>;
40    fn update<E, F: FnOnce(MutexGuard<InMemoryDb<K, V>>) -> Option<E>>(
41        &self,
42        f: F,
43    ) -> Option<E> {
44        let guard = self.get_guard()?;
45        let sender = self.get_sender();
46        sender.send(Notify::Update).ok()?;
47        f(guard)
48    }
49}
50
51impl<K: Key, V: Value> GuardedDb<K, V> for FileDb<K, V> {
52    fn get_guard(&self) -> Option<MutexGuard<InMemoryDb<K, V>>> {
53        match self.__inner.lock() {
54            Ok(lock) => Some(lock),
55            Err(e) => {
56                error!("Lock could not be acquired! {e}");
57                None
58            }
59        }
60    }
61
62    fn get_sender(&self) -> &Sender<Notify> {
63        &self.__event_sender
64    }
65}
66
67impl<K, V> DbOp<K, V> for FileDb<K, V>
68where
69    K: 'static + Key + DeserializeOwned + std::fmt::Debug,
70    V: 'static + Value + DeserializeOwned + std::fmt::Debug,
71{
72    fn get_current_tree(&self) -> Option<String> {
73        let guard = self.get_guard()?;
74        guard.get_current_tree()
75    }
76
77    fn flush(&self) -> anyhow::Result<&'static str> {
78        match self.get_sender().send(Notify::FullFlush) {
79            Ok(_) => Ok("notify db to update itself"),
80            Err(e) => Err(anyhow::Error::from(e)),
81        }
82    }
83
84    fn open_tree(&mut self, tree_name: &str) -> Option<bool> {
85        let mut guard = self.get_guard()?;
86        let res = guard.open_tree(tree_name)?;
87        if res {
88            self.__event_sender.send(Notify::Update).ok()?;
89        }
90        Some(res)
91    }
92
93    fn tree_names(&self) -> Vec<String> {
94        match self.get_guard() {
95            Some(guard) => guard.tree_names(),
96            _ => {
97                vec![]
98            }
99        }
100    }
101
102    fn drop_tree(&mut self, tree_name: &str) -> bool {
103        self.update(|mut guard| Some(guard.drop_tree(tree_name)))
104            .unwrap_or_default()
105    }
106
107    fn clear_tree(&mut self, tree_name: &str) -> bool {
108        self.update(|mut guard| Some(guard.clear_tree(tree_name)))
109            .unwrap_or_default()
110    }
111
112    fn merge_trees(
113        &mut self,
114        tree_name_source: &str,
115        tree_name_dest: &str,
116    ) -> Option<()> {
117        self.update(|mut guard| {
118            guard.merge_trees(tree_name_source, tree_name_dest)
119        })
120    }
121
122    fn merge_current_tree_with(
123        &mut self,
124        tree_name_source: &str,
125    ) -> Option<()> {
126        self.update(|mut guard| guard.merge_current_tree_with(tree_name_source))
127    }
128
129    fn apply_batch(&mut self, batch: super::Batch<K, V>) -> Option<()> {
130        self.update(|mut guard| guard.apply_batch(batch))
131    }
132
133    fn apply_tree(
134        &mut self,
135        tree_name: &str,
136        consumer: &mut impl FnMut(&mut super::tree::Tree<K, V>) -> Option<V>,
137    ) -> Option<V> {
138        self.update(|mut guard| guard.apply_tree(tree_name, consumer))
139    }
140}
141
142impl<K: Key, V: Value> Op<K, V> for FileDb<K, V> {
143    fn read(&self, k: impl Into<K>, r: impl Fn(&V) -> Option<V>) -> Option<V> {
144        let guard = self.get_guard()?;
145        guard.read(k, r)
146    }
147
148    fn insert(&mut self, k: impl Into<K>, v: impl Into<V>) -> Option<V> {
149        self.update(move |mut guard| guard.insert(k, v))
150    }
151
152    fn remove(&mut self, k: impl Into<K>) -> Option<V> {
153        self.update(move |mut guard| guard.remove(k))
154    }
155
156    fn clear(&mut self) {
157        self.update(|mut guard| {
158            guard.clear();
159            Some(())
160        });
161    }
162
163    fn contains(&self, k: &K) -> Option<bool> {
164        let guard = self.get_guard()?;
165        guard.contains(k)
166    }
167
168    fn len(&self) -> Option<usize> {
169        let guard = self.get_guard()?;
170        guard.len()
171    }
172
173    fn keys(&self) -> Vec<K> {
174        match self.get_guard() {
175            Some(guard) => guard.keys(),
176            _ => {
177                vec![]
178            }
179        }
180    }
181
182    fn list_all(&self) -> BTreeMap<K, V> {
183        match self.get_guard() {
184            Some(guard) => guard.list_all(),
185            _ => BTreeMap::default(),
186        }
187    }
188}
189
190impl<K, V> FileDb<K, V>
191where
192    K: 'static + Key + DeserializeOwned + std::fmt::Debug,
193    V: 'static + Value + DeserializeOwned + std::fmt::Debug,
194{
195    pub fn get_path(&self) -> &PathBuf {
196        self.__file_lock.get_path()
197    }
198
199    fn __flush(
200        inner_db: Arc<Mutex<InMemoryDb<K, V>>>,
201        file_lock: &FileLock,
202    ) -> anyhow::Result<()> {
203        trace!("syncing");
204        let db =
205            inner_db.lock().map_err(|e| anyhow::Error::msg(e.to_string()))?;
206        let bytes = bincode::serialize(&*db)?;
207        drop(db); // try to release the lock before writing to the file
208        file_lock.write(&bytes)?;
209        trace!("syncing done");
210        Ok(())
211    }
212    fn start_file_db(
213        &mut self,
214        receiver: Receiver<Notify>,
215    ) -> anyhow::Result<()> {
216        let clone = Arc::clone(&self.__inner);
217        let file_lock = self.__file_lock.clone();
218
219        let handle = std::thread::spawn(move || {
220            debug!("start syncing");
221
222            for event in receiver.iter() {
223                match event {
224                    Notify::Update => {
225                        debug!("receive update!");
226                        match Self::__flush(Arc::clone(&clone), &file_lock) {
227                            Err(e) => {
228                                error!("could not flush db. Err: '{e}'.");
229                            }
230                            _ => {
231                                trace!("sync done");
232                            }
233                        }
234                    }
235                    Notify::FullFlush => {
236                        debug!("receive full flush!");
237                        match Self::__flush(Arc::clone(&clone), &file_lock) {
238                            Err(e) => {
239                                error!("could not flush db. Err: '{e}'.");
240                            }
241                            _ => match file_lock.flush() {
242                                Err(e) => {
243                                    error!("could not write on file lock {e}");
244                                }
245                                _ => {
246                                    trace!("full flush done");
247                                }
248                            },
249                        }
250                    }
251                    Notify::Stop => {
252                        debug!("receive stop!");
253                        break;
254                    }
255                }
256            }
257
258            debug!("DROPPED");
259
260            if let Err(e) = Self::__flush(clone, &file_lock) {
261                error!("could not flush db. Err: '{e}'.");
262            }
263        });
264
265        self.__thread_handle = Some(handle);
266        Ok(())
267    }
268}
269impl<K: Key, V: Value> Drop for FileDb<K, V> {
270    fn drop(&mut self) {
271        debug!("done");
272        self.__event_sender
273            .send(Notify::Stop)
274            .expect("could not send stop event!!!");
275        if let Some(handle) = self.__thread_handle.take() {
276            handle.join().expect("Could not cleanup thread handle!!!");
277        }
278        debug!("cleanup file db success!")
279    }
280}
281
282impl<K, V> TryFrom<FileDbConfig<K, V>> for FileDb<K, V>
283where
284    K: 'static + Key + DeserializeOwned + std::fmt::Debug,
285    V: 'static + Value + DeserializeOwned + std::fmt::Debug,
286{
287    type Error = anyhow::Error;
288
289    fn try_from(config: FileDbConfig<K, V>) -> Result<Self, Self::Error> {
290        let (__event_sender, receiver) = std::sync::mpsc::channel();
291        let mut db = FileDb {
292            __file_lock: config.file_lock,
293            __inner: config.inner,
294            __event_sender,
295            __thread_handle: None,
296        };
297        db.start_file_db(receiver)?;
298        Ok(db)
299    }
300}