karsher 0.8.4

karsher is a dumb cache written in rust
use std::{
    sync::mpsc::{Receiver, Sender},
    thread::JoinHandle,
    vec,
};

use serde::de::DeserializeOwned;

use crate::prelude::*;

use super::{DbOp, FileLock, InMemoryDb, Key, Op, Value};

pub(super) enum Notify {
    Update,
    Stop,
}

#[derive(Debug)]
pub struct FileDb<K: Key, V: Value> {
    __inner: Arc<Mutex<InMemoryDb<K, V>>>,
    __event_sender: Sender<Notify>,
    __thread_handle: Option<JoinHandle<()>>,
    __file_lock: Arc<FileLock>,
}

#[derive(Debug)]
pub(super) struct FileDbConfig<K: Key, V: Value> {
    pub(super) inner: Arc<Mutex<InMemoryDb<K, V>>>,
    pub(super) file_lock: Arc<FileLock>,
}

trait GuardedDb<K: Key, V: Value> {
    fn get_guard(&self) -> Option<MutexGuard<InMemoryDb<K, V>>>;
    fn get_sender(&self) -> &Sender<Notify>;
    fn update<E, F: FnOnce(MutexGuard<InMemoryDb<K, V>>) -> Option<E>>(
        &self,
        f: F,
    ) -> Option<E> {
        let guard = self.get_guard()?;
        let sender = self.get_sender();
        sender.send(Notify::Update).ok()?;
        f(guard)
    }
}

impl<K: Key, V: Value> GuardedDb<K, V> for FileDb<K, V> {
    fn get_guard(&self) -> Option<MutexGuard<InMemoryDb<K, V>>> {
        match self.__inner.lock() {
            Ok(lock) => Some(lock),
            Err(e) => {
                error!("Lock could not be acquired! {e}");
                None
            }
        }
    }

    fn get_sender(&self) -> &Sender<Notify> {
        &self.__event_sender
    }
}

impl<K: Key, V: Value> DbOp<K, V> for FileDb<K, V> {
    fn get_current_tree(&self) -> Option<String> {
        let guard = self.get_guard()?;
        guard.get_current_tree()
    }

    fn open_tree(&mut self, tree_name: &str) -> Option<bool> {
        let mut guard = self.get_guard()?;
        let res = guard.open_tree(tree_name)?;
        if res {
            self.__event_sender.send(Notify::Update).ok()?;
        }
        Some(res)
    }

    fn tree_names(&self) -> Vec<String> {
        if let Some(guard) = self.get_guard() {
            guard.tree_names()
        } else {
            vec![]
        }
    }

    fn drop_tree(&mut self, tree_name: &str) -> bool {
        if let Some(res) =
            self.update(|mut guard| Some(guard.drop_tree(tree_name)))
        {
            res
        } else {
            false
        }
    }

    fn clear_tree(&mut self, tree_name: &str) -> bool {
        if let Some(res) =
            self.update(|mut guard| Some(guard.clear_tree(tree_name)))
        {
            res
        } else {
            false
        }
    }

    fn merge_trees(
        &mut self,
        tree_name_source: &str,
        tree_name_dest: &str,
    ) -> Option<()> {
        self.update(|mut guard| {
            guard.merge_trees(tree_name_source, tree_name_dest)
        })
    }

    fn merge_current_tree_with(
        &mut self,
        tree_name_source: &str,
    ) -> Option<()> {
        self.update(|mut guard| guard.merge_current_tree_with(tree_name_source))
    }

    fn apply_batch(&mut self, batch: super::Batch<K, V>) -> Option<()> {
        self.update(|mut guard| guard.apply_batch(batch))
    }

    fn apply_tree(
        &mut self,
        tree_name: &str,
        consumer: &mut impl FnMut(&mut super::tree::Tree<K, V>) -> Option<V>,
    ) -> Option<V> {
        self.update(|mut guard| guard.apply_tree(tree_name, consumer))
    }
}

impl<K: Key, V: Value> Op<K, V> for FileDb<K, V> {
    fn read(&self, k: impl Into<K>, r: impl Fn(&V) -> Option<V>) -> Option<V> {
        let guard = self.get_guard()?;
        guard.read(k, r)
    }

    fn insert(&mut self, k: impl Into<K>, v: impl Into<V>) -> Option<V> {
        self.update(move |mut guard| guard.insert(k, v))
    }

    fn remove(&mut self, k: impl Into<K>) -> Option<V> {
        self.update(move |mut guard| guard.remove(k))
    }

    fn clear(&mut self) {
        self.update(|mut guard| {
            guard.clear();
            Some(())
        });
    }

    fn contains(&self, k: &K) -> Option<bool> {
        let guard = self.get_guard()?;
        guard.contains(k)
    }

    fn len(&self) -> Option<usize> {
        let guard = self.get_guard()?;
        guard.len()
    }

    fn keys(&self) -> Vec<K> {
        if let Some(guard) = self.get_guard() {
            guard.keys()
        } else {
            vec![]
        }
    }

    fn list_all(&self) -> BTreeMap<K, V> {
        if let Some(guard) = self.get_guard() {
            guard.list_all()
        } else {
            BTreeMap::default()
        }
    }
}

impl<K, V> FileDb<K, V>
where
    K: 'static + Key + DeserializeOwned + std::fmt::Debug,
    V: 'static + Value + DeserializeOwned + std::fmt::Debug,
{
    fn __flush(
        inner_db: Arc<Mutex<InMemoryDb<K, V>>>,
        file_lock: &FileLock,
    ) -> anyhow::Result<()> {
        trace!("syncing");
        let db =
            inner_db.lock().map_err(|e| anyhow::Error::msg(e.to_string()))?;
        let bytes = bincode::serialize(&*db)?;
        drop(db); // try to release the lock before writing to the file
        file_lock.write(&bytes)?;
        trace!("syncing done");
        Ok(())
    }
    fn start_file_db(
        &mut self,
        receiver: Receiver<Notify>,
    ) -> anyhow::Result<()> {
        let clone = Arc::clone(&self.__inner);
        let file_lock = self.__file_lock.clone();

        let handle = std::thread::spawn(move || {
            debug!("start syncing");

            for event in receiver.iter() {
                match event {
                    Notify::Update => {
                        debug!("receive update!");
                        if let Err(e) =
                            Self::__flush(Arc::clone(&clone), &file_lock)
                        {
                            error!("could not flush db. Err: '{e}'.");
                        } else {
                            trace!("sync done");
                        }
                    }
                    Notify::Stop => {
                        debug!("receive stop!");
                        break;
                    }
                }
            }

            debug!("DROPPED");

            if let Err(e) = Self::__flush(clone, &file_lock) {
                error!("could not flush db. Err: '{e}'.");
            }
        });

        self.__thread_handle = Some(handle);
        Ok(())
    }
}
impl<K: Key, V: Value> Drop for FileDb<K, V> {
    fn drop(&mut self) {
        debug!("done");
        self.__event_sender
            .send(Notify::Stop)
            .expect("could not send stop event!!!");
        if let Some(handle) = self.__thread_handle.take() {
            handle.join().expect("Could not cleanup thread handle!!!");
        }
        debug!("cleanup file db success!")
    }
}

impl<K, V> TryFrom<FileDbConfig<K, V>> for FileDb<K, V>
where
    K: 'static + Key + DeserializeOwned + std::fmt::Debug,
    V: 'static + Value + DeserializeOwned + std::fmt::Debug,
{
    type Error = anyhow::Error;

    fn try_from(config: FileDbConfig<K, V>) -> Result<Self, Self::Error> {
        let (__event_sender, receiver) = std::sync::mpsc::channel();
        let mut db = FileDb {
            __file_lock: config.file_lock,
            __inner: config.inner,
            __event_sender,
            __thread_handle: None,
        };
        db.start_file_db(receiver)?;
        Ok(db)
    }
}