dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! A write-through LRU cache wrapping any [`Storage`].

use crate::storage::file_storage::{Storage, StorageResult};
use async_trait::async_trait;
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::Arc;

/// A write-through cache in front of a backing [`Storage`].
///
/// Reads consult the in-memory LRU first and fall back to the backend on a
/// miss (populating the cache). Writes update both the cache and the backend so
/// the cache never serves stale data for keys it owns.
pub struct Cache {
    inner: Arc<dyn Storage>,
    cache: Mutex<LruCache<String, serde_json::Value>>,
}

impl Cache {
    /// Wrap `inner` with an LRU cache of the given capacity.
    ///
    /// A `capacity` of 0 is treated as 1 (LRU requires non-zero capacity).
    pub fn new(inner: Arc<dyn Storage>, capacity: usize) -> Self {
        let cap = NonZeroUsize::new(capacity.max(1)).unwrap();
        Cache {
            inner,
            cache: Mutex::new(LruCache::new(cap)),
        }
    }

    /// Number of entries currently cached.
    pub fn len(&self) -> usize {
        self.cache.lock().len()
    }

    /// Whether the cache holds no entries.
    pub fn is_empty(&self) -> bool {
        self.cache.lock().is_empty()
    }
}

#[async_trait]
impl Storage for Cache {
    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
        self.inner.save(key, value).await?;
        self.cache.lock().put(key.to_string(), value.clone());
        Ok(())
    }

    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
        if let Some(v) = self.cache.lock().get(key).cloned() {
            return Ok(Some(v));
        }
        let loaded = self.inner.load(key).await?;
        if let Some(ref v) = loaded {
            self.cache.lock().put(key.to_string(), v.clone());
        }
        Ok(loaded)
    }

    async fn delete(&self, key: &str) -> StorageResult<()> {
        self.inner.delete(key).await?;
        self.cache.lock().pop(key);
        Ok(())
    }

    async fn list(&self) -> StorageResult<Vec<String>> {
        // The backend is authoritative for the full key set.
        self.inner.list().await
    }
}