rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
#![cfg(feature = "cache")]

use async_trait::async_trait;
use rs_zero::cache::{
    CacheAside, CacheAsideConfig, CacheError, CacheKey, CacheStore, LruCacheStore,
    MemoryCacheStore, TwoLevelCacheStore,
};
#[cfg(feature = "cache-redis")]
use rs_zero::cache_redis::{RedisCacheConfig, RedisCacheError, RedisCacheStore};
use std::{
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::Duration,
};

#[tokio::test]
async fn memory_cache_covers_json_namespace_and_ttl() {
    let key = CacheKey::new("app", ["user", "1"]);
    let memory = MemoryCacheStore::new();
    memory
        .set_json(&key, &serde_json::json!({"name":"Ada"}), None)
        .await
        .expect("set");
    let value: serde_json::Value = memory.get_json(&key).await.expect("get").expect("value");
    assert_eq!(value["name"], "Ada");
}

#[tokio::test]
async fn cache_aside_merges_concurrent_misses() {
    let cache = CacheAside::new(
        MemoryCacheStore::new(),
        CacheAsideConfig {
            value_ttl: Duration::from_secs(30),
            ..CacheAsideConfig::default()
        },
    );
    let key = CacheKey::new("app", ["user", "42"]);
    let calls = Arc::new(AtomicUsize::new(0));

    let mut handles = Vec::new();
    for _ in 0..4 {
        let cache = cache.clone();
        let key = key.clone();
        let calls = calls.clone();
        handles.push(tokio::spawn(async move {
            cache
                .get_or_load_json(&key, || async move {
                    calls.fetch_add(1, Ordering::SeqCst);
                    tokio::time::sleep(Duration::from_millis(10)).await;
                    Ok(Some(serde_json::json!({"id":42})))
                })
                .await
                .expect("cache aside")
        }));
    }

    for handle in handles {
        assert_eq!(handle.await.expect("join").expect("value")["id"], 42);
    }
    assert_eq!(calls.load(Ordering::SeqCst), 1);
}

#[cfg(feature = "observability")]
#[tokio::test]
async fn cache_stores_record_unified_metrics_without_keys() {
    let registry = rs_zero::observability::MetricsRegistry::new();
    let l1 = LruCacheStore::new(8)
        .expect("l1")
        .with_metrics(registry.clone());
    let l2 = MemoryCacheStore::new();
    let store = TwoLevelCacheStore::new(l1, l2.clone()).with_metrics(registry.clone());
    let cache = CacheAside::new(store, CacheAsideConfig::default()).with_metrics(registry.clone());
    let key = CacheKey::new("app", ["users", "42"]);

    let value: Option<serde_json::Value> = cache
        .get_or_load_json(&key, || async { Ok(Some(serde_json::json!({"id": 42}))) })
        .await
        .expect("load");
    assert_eq!(value.expect("value")["id"], 42);
    cache.delete(&key).await.expect("delete");

    let text = registry.render_prometheus();
    assert!(text.contains("component=\"cache_aside\",operation=\"get\",result=\"miss\""));
    assert!(text.contains("component=\"two_level\",operation=\"get\",result=\"l1_miss\""));
    assert!(text.contains("component=\"lru\",operation=\"get\",result=\"miss\""));
    assert!(!text.contains("users:42"));
}

#[tokio::test]
async fn lru_cache_enforces_capacity_and_recency() {
    let store = LruCacheStore::new(2).expect("lru store");
    let first = CacheKey::new("app", ["first"]);
    let second = CacheKey::new("app", ["second"]);
    let third = CacheKey::new("app", ["third"]);

    store
        .set_raw(&first, b"1".to_vec(), None)
        .await
        .expect("set");
    store
        .set_raw(&second, b"2".to_vec(), None)
        .await
        .expect("set");
    assert_eq!(
        store.get_raw(&first).await.expect("get"),
        Some(b"1".to_vec())
    );
    store
        .set_raw(&third, b"3".to_vec(), None)
        .await
        .expect("set");

    assert!(store.get_raw(&second).await.expect("get").is_none());
    assert_eq!(store.snapshot().await.evictions, 1);
}

#[tokio::test]
async fn two_level_cache_backfills_and_deletes() {
    let l1 = LruCacheStore::new(8).expect("l1");
    let l2 = MemoryCacheStore::new();
    let store = TwoLevelCacheStore::new(l1.clone(), l2.clone());
    let key = CacheKey::new("app", ["two-level"]);

    l2.set_raw(&key, b"value".to_vec(), None)
        .await
        .expect("set l2");
    assert_eq!(
        store.get_raw(&key).await.expect("get"),
        Some(b"value".to_vec())
    );
    assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"value".to_vec()));

    store.delete(&key).await.expect("delete");
    assert!(l1.get_raw(&key).await.expect("l1").is_none());
    assert!(l2.get_raw(&key).await.expect("l2").is_none());
}

#[derive(Debug, Clone)]
struct FailingSetStore;

#[async_trait]
impl CacheStore for FailingSetStore {
    async fn get_raw(&self, _key: &CacheKey) -> rs_zero::cache::CacheResult<Option<Vec<u8>>> {
        Ok(None)
    }

    async fn set_raw(
        &self,
        _key: &CacheKey,
        _value: Vec<u8>,
        _ttl: Option<Duration>,
    ) -> rs_zero::cache::CacheResult<()> {
        Err(CacheError::Backend("l2 unavailable".to_string()))
    }

    async fn delete(&self, _key: &CacheKey) -> rs_zero::cache::CacheResult<()> {
        Ok(())
    }
}

#[tokio::test]
async fn two_level_cache_does_not_write_l1_when_l2_set_fails() {
    let l1 = MemoryCacheStore::new();
    let store = TwoLevelCacheStore::new(l1.clone(), FailingSetStore);
    let key = CacheKey::new("app", ["failed-set"]);

    let error = store
        .set_raw(&key, b"value".to_vec(), None)
        .await
        .expect_err("l2 failure");

    assert!(error.to_string().contains("l2 unavailable"));
    assert!(l1.get_raw(&key).await.expect("l1").is_none());
}

#[cfg(feature = "cache-redis")]
#[test]
fn redis_store_validates_config_without_external_service() {
    let error = RedisCacheStore::new(RedisCacheConfig {
        url: "invalid".to_string(),
        ..RedisCacheConfig::default()
    })
    .expect_err("invalid redis url");
    assert!(matches!(error, RedisCacheError::InvalidUrl { .. }));
}

#[cfg(feature = "cache-redis")]
#[tokio::test]
#[ignore = "requires RS_ZERO_TEST_REDIS_URL pointing to a running Redis instance"]
async fn redis_store_talks_to_external_redis() {
    let url = std::env::var("RS_ZERO_TEST_REDIS_URL")
        .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
    let store = RedisCacheStore::new(RedisCacheConfig {
        url,
        namespace: format!("rs-zero-test-{}", unique_suffix()),
        ..RedisCacheConfig::default()
    })
    .expect("store");

    store.health_check().await.expect("health");
    let key = CacheKey::new(store.namespace(), ["integration", "value"]);
    store
        .set_json(
            &key,
            &serde_json::json!({"ok":true}),
            Some(Duration::from_secs(5)),
        )
        .await
        .expect("set");
    let value: serde_json::Value = store.get_json(&key).await.expect("get").expect("value");
    assert_eq!(value["ok"], true);
    store.delete(&key).await.expect("delete");
}

#[cfg(feature = "cache-redis")]
fn unique_suffix() -> String {
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos();
    format!("{nanos}-{}", std::process::id())
}