event_base_wal 0.1.2

An Event Driven Framework - WAL Crate
Documentation
use bincode::{Decode, Encode, config};
use event_base_core::error::CoreError;
use event_base_core::error::serialize::SerializeError::SerializeError;
use event_base_core::error::wal::WalError;
use event_base_core::error::wal::WalError::RecordNotFound;
use event_base_core::wal::wal::{Wal, WalRecord, WalRecordState};
use event_base_core::worker_registry::WorkerInfo;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::fs;
use tokio::sync::{Mutex, RwLock};

#[derive(Serialize, Deserialize, Encode, Decode)]
struct WalStore {
    records: HashMap<String, WalRecord>,
    delays: HashMap<String, WalRecord>,
    worker_registry: HashMap<String, WorkerInfo>,
    id_counter: u64,
}

#[derive(Default, Clone)]
pub struct PersistentWal {
    records: Arc<RwLock<HashMap<String, WalRecord>>>,
    delays: Arc<RwLock<HashMap<String, WalRecord>>>,
    worker_registry: Arc<RwLock<HashMap<String, WorkerInfo>>>,
    id_counter: Arc<Mutex<u64>>,
    file_path: String,
}

impl PersistentWal {
    pub async fn new(file_path: String) -> Result<Self, CoreError> {
        if !tokio::fs::try_exists(&file_path).await.unwrap_or(false) {
            return Ok(Self {
                file_path,
                records: Arc::new(Default::default()),
                delays: Arc::new(Default::default()),
                worker_registry: Arc::new(Default::default()),
                id_counter: Arc::new(Default::default()),
            });
        }

        let data = fs::read(&file_path).await?;

        let (store, _): (WalStore, _) = bincode::decode_from_slice(&data, config::standard())
            .map_err(|e| WalError::Backend(format!("Fail to decode the file:{}", e)))?;

        Ok(Self {
            records: Arc::new(RwLock::new(store.records)),
            delays: Arc::new(RwLock::new(store.delays)),
            worker_registry: Arc::new(RwLock::new(store.worker_registry)),
            id_counter: Arc::new(Mutex::new(store.id_counter)),
            file_path,
        })
    }
}

#[async_trait::async_trait]
impl Wal for PersistentWal {
    async fn append(&mut self, mut record: WalRecord) -> Result<(), CoreError> {
        let mut counter = self.id_counter.lock().await;
        *counter += 1;
        record.record_id = *counter;
        let mut records = self.records.write().await;
        records.insert(record.message.id.clone(), record);
        Ok(())
    }

    async fn update_state(
        &mut self,
        message_id: &str,
        status: WalRecordState,
    ) -> Result<(), CoreError> {
        let mut records = self.records.write().await;
        if let Some(record) = records.get_mut(message_id) {
            record.status = status;
            Ok(())
        } else {
            Err(CoreError::from(RecordNotFound(message_id.to_string())))
        }
    }

    async fn replay_pending(&mut self) -> Result<Vec<WalRecord>, CoreError> {
        let records = self.records.read().await;
        let pendings = records
            .values()
            .filter(|x| x.status == WalRecordState::Pending)
            .cloned()
            .collect();
        Ok(pendings)
    }

    async fn flush(&mut self) -> Result<(), CoreError> {
        let store = WalStore {
            records: self.records.read().await.clone(),
            delays: self.delays.read().await.clone(),
            worker_registry: self.worker_registry.read().await.clone(),
            id_counter: *self.id_counter.clone().lock().await,
        };
        let bytes = bincode::encode_to_vec(&store, config::standard())
            .map_err(|e| SerializeError(e.to_string()))?;
        let temp_path = PathBuf::from(&self.file_path).with_extension("tmp");
        fs::write(&temp_path, bytes).await?;
        fs::rename(&temp_path, &self.file_path).await?;
        Ok(())
    }

    async fn schedule(&self, mut record: WalRecord) -> Result<(), CoreError> {
        let mut counter = self.id_counter.lock().await;
        *counter += 1;
        record.record_id = *counter;
        let mut records = self.delays.write().await;
        records.insert(record.message.id.clone(), record);
        Ok(())
    }

    async fn fetch_ready(&self) -> Result<Vec<WalRecord>, CoreError> {
        let mut delays = self.delays.write().await;
        let now = SystemTime::now();

        let mut ready = Vec::new();

        let mut to_remove = Vec::new();
        for (msg_id, delayed) in delays.iter() {
            match &delayed.message.deliver_at {
                Some(deliver_at) if deliver_at <= &now => {
                    ready.push(delayed.clone());
                    to_remove.push(msg_id.clone());
                }
                _ => {}
            }
        }

        for id in to_remove {
            delays.remove(&id);
        }
        Ok(ready)
    }

    async fn remove_scheduled(&self, msg_id: &str) -> Result<(), CoreError> {
        let mut store = self.delays.write().await;
        store.remove(msg_id);
        Ok(())
    }

    async fn save_worker_registry(
        &self,
        registry: HashMap<String, WorkerInfo>,
    ) -> Result<(), CoreError> {
        let mut store = self.worker_registry.write().await;
        *store = registry;
        Ok(())
    }

    async fn load_worker_registry(&self) -> Result<HashMap<String, WorkerInfo>, CoreError> {
        let store = self.worker_registry.read().await;
        Ok(store.clone())
    }
}