1use std::{
2 collections::BTreeMap,
3 path::PathBuf,
4 sync::{
5 Arc, Mutex, MutexGuard,
6 mpsc::{Receiver, Sender},
7 },
8 thread::JoinHandle,
9 vec,
10};
11
12use log::{debug, error, trace};
13use serde::de::DeserializeOwned;
14
15use super::{DbOp, FileLock, InMemoryDb, Key, Op, Value};
16
17pub(super) enum Notify {
18 Update,
19 FullFlush,
20 Stop,
21}
22
23#[derive(Debug)]
24pub struct FileDb<K: Key, V: Value> {
25 __inner: Arc<Mutex<InMemoryDb<K, V>>>,
26 __event_sender: Sender<Notify>,
27 __thread_handle: Option<JoinHandle<()>>,
28 __file_lock: Arc<FileLock>,
29}
30
31#[derive(Debug)]
32pub(super) struct FileDbConfig<K: Key, V: Value> {
33 pub(super) inner: Arc<Mutex<InMemoryDb<K, V>>>,
34 pub(super) file_lock: Arc<FileLock>,
35}
36
37trait GuardedDb<K: Key, V: Value> {
38 fn get_guard(&self) -> Option<MutexGuard<InMemoryDb<K, V>>>;
39 fn get_sender(&self) -> &Sender<Notify>;
40 fn update<E, F: FnOnce(MutexGuard<InMemoryDb<K, V>>) -> Option<E>>(
41 &self,
42 f: F,
43 ) -> Option<E> {
44 let guard = self.get_guard()?;
45 let sender = self.get_sender();
46 sender.send(Notify::Update).ok()?;
47 f(guard)
48 }
49}
50
51impl<K: Key, V: Value> GuardedDb<K, V> for FileDb<K, V> {
52 fn get_guard(&self) -> Option<MutexGuard<InMemoryDb<K, V>>> {
53 match self.__inner.lock() {
54 Ok(lock) => Some(lock),
55 Err(e) => {
56 error!("Lock could not be acquired! {e}");
57 None
58 }
59 }
60 }
61
62 fn get_sender(&self) -> &Sender<Notify> {
63 &self.__event_sender
64 }
65}
66
67impl<K, V> DbOp<K, V> for FileDb<K, V>
68where
69 K: 'static + Key + DeserializeOwned + std::fmt::Debug,
70 V: 'static + Value + DeserializeOwned + std::fmt::Debug,
71{
72 fn get_current_tree(&self) -> Option<String> {
73 let guard = self.get_guard()?;
74 guard.get_current_tree()
75 }
76
77 fn flush(&self) -> anyhow::Result<&'static str> {
78 match self.get_sender().send(Notify::FullFlush) {
79 Ok(_) => Ok("notify db to update itself"),
80 Err(e) => Err(anyhow::Error::from(e)),
81 }
82 }
83
84 fn open_tree(&mut self, tree_name: &str) -> Option<bool> {
85 let mut guard = self.get_guard()?;
86 let res = guard.open_tree(tree_name)?;
87 if res {
88 self.__event_sender.send(Notify::Update).ok()?;
89 }
90 Some(res)
91 }
92
93 fn tree_names(&self) -> Vec<String> {
94 match self.get_guard() {
95 Some(guard) => guard.tree_names(),
96 _ => {
97 vec![]
98 }
99 }
100 }
101
102 fn drop_tree(&mut self, tree_name: &str) -> bool {
103 self.update(|mut guard| Some(guard.drop_tree(tree_name)))
104 .unwrap_or_default()
105 }
106
107 fn clear_tree(&mut self, tree_name: &str) -> bool {
108 self.update(|mut guard| Some(guard.clear_tree(tree_name)))
109 .unwrap_or_default()
110 }
111
112 fn merge_trees(
113 &mut self,
114 tree_name_source: &str,
115 tree_name_dest: &str,
116 ) -> Option<()> {
117 self.update(|mut guard| {
118 guard.merge_trees(tree_name_source, tree_name_dest)
119 })
120 }
121
122 fn merge_current_tree_with(
123 &mut self,
124 tree_name_source: &str,
125 ) -> Option<()> {
126 self.update(|mut guard| guard.merge_current_tree_with(tree_name_source))
127 }
128
129 fn apply_batch(&mut self, batch: super::Batch<K, V>) -> Option<()> {
130 self.update(|mut guard| guard.apply_batch(batch))
131 }
132
133 fn apply_tree(
134 &mut self,
135 tree_name: &str,
136 consumer: &mut impl FnMut(&mut super::tree::Tree<K, V>) -> Option<V>,
137 ) -> Option<V> {
138 self.update(|mut guard| guard.apply_tree(tree_name, consumer))
139 }
140}
141
142impl<K: Key, V: Value> Op<K, V> for FileDb<K, V> {
143 fn read(&self, k: impl Into<K>, r: impl Fn(&V) -> Option<V>) -> Option<V> {
144 let guard = self.get_guard()?;
145 guard.read(k, r)
146 }
147
148 fn insert(&mut self, k: impl Into<K>, v: impl Into<V>) -> Option<V> {
149 self.update(move |mut guard| guard.insert(k, v))
150 }
151
152 fn remove(&mut self, k: impl Into<K>) -> Option<V> {
153 self.update(move |mut guard| guard.remove(k))
154 }
155
156 fn clear(&mut self) {
157 self.update(|mut guard| {
158 guard.clear();
159 Some(())
160 });
161 }
162
163 fn contains(&self, k: &K) -> Option<bool> {
164 let guard = self.get_guard()?;
165 guard.contains(k)
166 }
167
168 fn len(&self) -> Option<usize> {
169 let guard = self.get_guard()?;
170 guard.len()
171 }
172
173 fn keys(&self) -> Vec<K> {
174 match self.get_guard() {
175 Some(guard) => guard.keys(),
176 _ => {
177 vec![]
178 }
179 }
180 }
181
182 fn list_all(&self) -> BTreeMap<K, V> {
183 match self.get_guard() {
184 Some(guard) => guard.list_all(),
185 _ => BTreeMap::default(),
186 }
187 }
188}
189
190impl<K, V> FileDb<K, V>
191where
192 K: 'static + Key + DeserializeOwned + std::fmt::Debug,
193 V: 'static + Value + DeserializeOwned + std::fmt::Debug,
194{
195 pub fn get_path(&self) -> &PathBuf {
196 self.__file_lock.get_path()
197 }
198
199 fn __flush(
200 inner_db: Arc<Mutex<InMemoryDb<K, V>>>,
201 file_lock: &FileLock,
202 ) -> anyhow::Result<()> {
203 trace!("syncing");
204 let db =
205 inner_db.lock().map_err(|e| anyhow::Error::msg(e.to_string()))?;
206 let bytes = bincode::serialize(&*db)?;
207 drop(db); file_lock.write(&bytes)?;
209 trace!("syncing done");
210 Ok(())
211 }
212 fn start_file_db(
213 &mut self,
214 receiver: Receiver<Notify>,
215 ) -> anyhow::Result<()> {
216 let clone = Arc::clone(&self.__inner);
217 let file_lock = self.__file_lock.clone();
218
219 let handle = std::thread::spawn(move || {
220 debug!("start syncing");
221
222 for event in receiver.iter() {
223 match event {
224 Notify::Update => {
225 debug!("receive update!");
226 match Self::__flush(Arc::clone(&clone), &file_lock) {
227 Err(e) => {
228 error!("could not flush db. Err: '{e}'.");
229 }
230 _ => {
231 trace!("sync done");
232 }
233 }
234 }
235 Notify::FullFlush => {
236 debug!("receive full flush!");
237 match Self::__flush(Arc::clone(&clone), &file_lock) {
238 Err(e) => {
239 error!("could not flush db. Err: '{e}'.");
240 }
241 _ => match file_lock.flush() {
242 Err(e) => {
243 error!("could not write on file lock {e}");
244 }
245 _ => {
246 trace!("full flush done");
247 }
248 },
249 }
250 }
251 Notify::Stop => {
252 debug!("receive stop!");
253 break;
254 }
255 }
256 }
257
258 debug!("DROPPED");
259
260 if let Err(e) = Self::__flush(clone, &file_lock) {
261 error!("could not flush db. Err: '{e}'.");
262 }
263 });
264
265 self.__thread_handle = Some(handle);
266 Ok(())
267 }
268}
269impl<K: Key, V: Value> Drop for FileDb<K, V> {
270 fn drop(&mut self) {
271 debug!("done");
272 self.__event_sender
273 .send(Notify::Stop)
274 .expect("could not send stop event!!!");
275 if let Some(handle) = self.__thread_handle.take() {
276 handle.join().expect("Could not cleanup thread handle!!!");
277 }
278 debug!("cleanup file db success!")
279 }
280}
281
282impl<K, V> TryFrom<FileDbConfig<K, V>> for FileDb<K, V>
283where
284 K: 'static + Key + DeserializeOwned + std::fmt::Debug,
285 V: 'static + Value + DeserializeOwned + std::fmt::Debug,
286{
287 type Error = anyhow::Error;
288
289 fn try_from(config: FileDbConfig<K, V>) -> Result<Self, Self::Error> {
290 let (__event_sender, receiver) = std::sync::mpsc::channel();
291 let mut db = FileDb {
292 __file_lock: config.file_lock,
293 __inner: config.inner,
294 __event_sender,
295 __thread_handle: None,
296 };
297 db.start_file_db(receiver)?;
298 Ok(db)
299 }
300}