worterbuch 1.4.2

A message broker / database hybrid.
Documentation
pub mod error;
mod json;
mod redb;

use crate::{
    Config, PersistenceMode, Worterbuch,
    persistence::{
        error::PersistenceResult, json::PersistentJsonStorage, redb::PersistentRedbStore,
    },
    server::CloneableWbApi,
};
use lazy_static::lazy_static;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio_graceful_shutdown::SubsystemHandle;
use tracing::{info, warn};
use worterbuch_common::{Key, SYSTEM_TOPIC_ROOT_PREFIX, ValueEntry};

lazy_static! {
    static ref PERSISTENCE_LOCKED: AtomicBool = AtomicBool::new(true);
}

pub fn is_persistence_locked() -> bool {
    PERSISTENCE_LOCKED.load(Ordering::Acquire)
}

pub fn unlock_persistence() {
    PERSISTENCE_LOCKED.store(false, Ordering::Release);
}

pub trait PersistentStorage {
    async fn update_value(&self, key: &Key, value: &ValueEntry) -> PersistenceResult<()>;

    async fn delete_value(&self, key: &Key) -> PersistenceResult<()>;

    async fn flush(&mut self, worterbuch: &mut Worterbuch) -> PersistenceResult<()>;

    async fn load(&self, config: &Config) -> PersistenceResult<Worterbuch>;

    async fn clear(&self) -> PersistenceResult<()>;
}

#[derive(Default)]
pub enum PersistentStorageImpl {
    Json(Box<PersistentJsonStorage>),
    ReDB(Box<PersistentRedbStore>),
    #[default]
    Noop,
}

impl PersistentStorageImpl {
    pub async fn update_value(&self, key: &Key, value: &ValueEntry) -> PersistenceResult<()> {
        if key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
            return Ok(());
        }

        match self {
            PersistentStorageImpl::Json(s) => s.update_value(key, value).await,
            PersistentStorageImpl::ReDB(s) => s.update_value(key, value).await,
            PersistentStorageImpl::Noop => Ok(()),
        }
    }

    pub async fn delete_value(&self, key: &Key) -> PersistenceResult<()> {
        if key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
            return Ok(());
        }

        match self {
            PersistentStorageImpl::Json(s) => s.delete_value(key).await,
            PersistentStorageImpl::ReDB(s) => s.delete_value(key).await,
            PersistentStorageImpl::Noop => Ok(()),
        }
    }

    pub async fn flush(&mut self, worterbuch: &mut Worterbuch) -> PersistenceResult<()> {
        match self {
            PersistentStorageImpl::Json(s) => s.flush(worterbuch).await,
            PersistentStorageImpl::ReDB(s) => s.flush(worterbuch).await,
            PersistentStorageImpl::Noop => Ok(()),
        }
    }

    pub async fn load(&self, config: &Config) -> Worterbuch {
        let res = if config.follower {
            Ok(Worterbuch::with_config(config.clone()))
        } else {
            match self {
                PersistentStorageImpl::Json(s) => s.load(config).await,
                PersistentStorageImpl::ReDB(s) => s.load(config).await,
                PersistentStorageImpl::Noop => Ok(Worterbuch::with_config(config.clone())),
            }
        };

        match res {
            Ok(it) => it,
            Err(e) => {
                warn!("Could not restore worterbuch from persistence: {e}");
                info!("Starting empty instace.");
                Worterbuch::with_config(config.clone())
            }
        }
    }

    pub async fn clear(&self) -> PersistenceResult<()> {
        match self {
            PersistentStorageImpl::Json(s) => s.clear().await,
            PersistentStorageImpl::ReDB(s) => s.clear().await,
            PersistentStorageImpl::Noop => Ok(()),
        }
    }
}

pub(crate) async fn restore(
    subsys: &SubsystemHandle,
    config: &Config,
    api: &CloneableWbApi,
) -> PersistenceResult<Worterbuch> {
    let persistent_storage = get_storage_instance(subsys, config, api).await?;
    let mut wb = persistent_storage.load(config).await;
    wb.set_persistent_storage(persistent_storage);
    unlock_persistence();
    Ok(wb)
}

async fn get_storage_instance(
    subsys: &SubsystemHandle,
    config: &Config,
    api: &CloneableWbApi,
) -> PersistenceResult<PersistentStorageImpl> {
    if !config.use_persistence {
        return Ok(PersistentStorageImpl::Noop);
    }

    let flush_periodically = !config.follower;

    let storage =
        match config.persistence_mode {
            PersistenceMode::Json => PersistentStorageImpl::Json(Box::new(
                PersistentJsonStorage::new(subsys, config.clone(), api.clone(), flush_periodically),
            )),
            PersistenceMode::ReDB => {
                PersistentStorageImpl::ReDB(Box::new(PersistentRedbStore::new(config).await?))
            }
        };

    Ok(storage)
}