event_base_wal 0.1.2

An Event Driven Framework - WAL Crate
Documentation
use event_base_core::error::CoreError;
use event_base_core::error::wal::WalError;
use event_base_core::wal::wal::{Wal, WalRecord, WalRecordState};
use event_base_core::worker_registry::WorkerInfo;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::{Mutex, RwLock};

#[derive(Default, Clone)]
pub struct MemoryWal {
    records: Arc<RwLock<HashMap<String, WalRecord>>>,
    delays: Arc<RwLock<HashMap<String, WalRecord>>>,
    worker_registry: Arc<RwLock<HashMap<String, WorkerInfo>>>,
    id_counter: Arc<Mutex<u64>>,
}

impl MemoryWal {
    pub fn new() -> Self {
        Self::default()
    }
}

#[async_trait::async_trait]
impl Wal for MemoryWal {
    async fn append(&mut self, mut record: WalRecord) -> Result<(), CoreError> {
        let mut counter = self.id_counter.lock().await;
        *counter += 1;
        record.record_id = *counter;
        let mut records = self.records.write().await;
        records.insert(record.message.id.clone(), record);
        Ok(())
    }

    async fn update_state(
        &mut self,
        message_id: &str,
        status: WalRecordState,
    ) -> Result<(), CoreError> {
        let mut records = self.records.write().await;
        if let Some(record) = records.get_mut(message_id) {
            record.status = status;
            Ok(())
        } else {
            Err(CoreError::from(WalError::RecordNotFound(
                message_id.to_string(),
            )))
        }
    }

    async fn replay_pending(&mut self) -> Result<Vec<WalRecord>, CoreError> {
        let records = self.records.read().await;
        let pendings = records
            .values()
            .filter(|x| x.status == WalRecordState::Pending)
            .cloned()
            .collect();
        Ok(pendings)
    }

    async fn flush(&mut self) -> Result<(), CoreError> {
        Ok(())
    }

    async fn schedule(&self, mut record: WalRecord) -> Result<(), CoreError> {
        let mut counter = self.id_counter.lock().await;
        *counter += 1;
        record.record_id = *counter;
        let mut records = self.delays.write().await;
        records.insert(record.message.id.clone(), record);
        Ok(())
    }

    async fn fetch_ready(&self) -> Result<Vec<WalRecord>, CoreError> {
        let mut delays = self.delays.write().await;
        let now = SystemTime::now();

        let mut ready = Vec::new();

        let mut to_remove = Vec::new();
        for (msg_id, delayed) in delays.iter() {
            match &delayed.message.deliver_at {
                Some(deliver_at) if deliver_at <= &now => {
                    ready.push(delayed.clone());
                    to_remove.push(msg_id.clone());
                }
                _ => {}
            }
        }

        for id in to_remove {
            delays.remove(&id);
        }
        Ok(ready)
    }

    async fn remove_scheduled(&self, msg_id: &str) -> Result<(), CoreError> {
        let mut store = self.delays.write().await;
        store.remove(msg_id);
        Ok(())
    }

    async fn save_worker_registry(
        &self,
        registry: HashMap<String, WorkerInfo>,
    ) -> Result<(), CoreError> {
        let mut store = self.worker_registry.write().await;
        *store = registry;
        Ok(())
    }

    async fn load_worker_registry(&self) -> Result<HashMap<String, WorkerInfo>, CoreError> {
        let store = self.worker_registry.write().await;
        Ok(store.clone())
    }
}