rs-zero 0.2.11

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
#![allow(dead_code)]

#[cfg(feature = "cache")]
use std::collections::BTreeMap;
use std::{future::Future, time::Duration};

#[cfg(feature = "cache")]
use async_trait::async_trait;
#[cfg(feature = "cache")]
use rs_zero::cache::{CacheError, CacheKey, CacheResult, CacheStore};
#[cfg(feature = "cache")]
use tokio::sync::{Mutex, RwLock};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LongRunConfig {
    pub seconds: u64,
    pub tasks: usize,
    pub keys: usize,
}

impl LongRunConfig {
    pub fn from_env() -> Self {
        Self {
            seconds: env_u64("RS_ZERO_LONG_TEST_SECONDS", 5).max(1),
            tasks: env_usize("RS_ZERO_LONG_TEST_TASKS", 16).max(1),
            keys: env_usize("RS_ZERO_LONG_TEST_KEYS", 64).max(1),
        }
    }

    pub fn duration(self) -> Duration {
        Duration::from_secs(self.seconds)
    }
}

pub async fn eventually<F, Fut>(attempts: usize, delay: Duration, mut check: F)
where
    F: FnMut() -> Fut,
    Fut: Future<Output = bool>,
{
    for _ in 0..attempts {
        if check().await {
            return;
        }
        tokio::time::sleep(delay).await;
    }

    assert!(check().await, "condition did not become true in time");
}

pub fn env_usize(name: &str, default: usize) -> usize {
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(default)
}

pub fn env_u64(name: &str, default: u64) -> u64 {
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(default)
}

pub fn cache_key(namespace: &str, index: usize) -> CacheKey {
    CacheKey::new(namespace, [format!("key-{index}")])
}

#[cfg(feature = "cache")]
#[derive(Debug, Clone, Default)]
pub struct FaultyCacheStore {
    entries: std::sync::Arc<RwLock<BTreeMap<String, Vec<u8>>>>,
    state: std::sync::Arc<Mutex<FaultState>>,
}

#[cfg(feature = "cache")]
#[derive(Debug, Default)]
struct FaultState {
    get_errors: usize,
    set_errors: usize,
    delete_errors: usize,
    get_delay: Duration,
    set_delay: Duration,
    delete_delay: Duration,
    get_calls: usize,
    set_calls: usize,
    delete_calls: usize,
}

#[cfg(feature = "cache")]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct FaultyCacheSnapshot {
    pub entries: usize,
    pub get_calls: usize,
    pub set_calls: usize,
    pub delete_calls: usize,
}

#[cfg(feature = "cache")]
impl FaultyCacheStore {
    pub fn new() -> Self {
        Self::default()
    }

    pub async fn fail_next_gets(&self, count: usize) {
        self.state.lock().await.get_errors += count;
    }

    pub async fn fail_next_sets(&self, count: usize) {
        self.state.lock().await.set_errors += count;
    }

    pub async fn fail_next_deletes(&self, count: usize) {
        self.state.lock().await.delete_errors += count;
    }

    pub async fn set_delays(&self, get: Duration, set: Duration, delete: Duration) {
        let mut state = self.state.lock().await;
        state.get_delay = get;
        state.set_delay = set;
        state.delete_delay = delete;
    }

    pub async fn insert_raw(&self, key: &CacheKey, value: impl Into<Vec<u8>>) {
        self.entries
            .write()
            .await
            .insert(key.render(), value.into());
    }

    pub async fn snapshot(&self) -> FaultyCacheSnapshot {
        let state = self.state.lock().await;
        FaultyCacheSnapshot {
            entries: self.entries.read().await.len(),
            get_calls: state.get_calls,
            set_calls: state.set_calls,
            delete_calls: state.delete_calls,
        }
    }
}

#[cfg(feature = "cache")]
#[async_trait]
impl CacheStore for FaultyCacheStore {
    async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
        let delay = {
            let mut state = self.state.lock().await;
            state.get_calls += 1;
            if state.get_errors > 0 {
                state.get_errors -= 1;
                return Err(CacheError::Backend("injected get failure".to_string()));
            }
            state.get_delay
        };
        sleep_if_needed(delay).await;
        Ok(self.entries.read().await.get(&key.render()).cloned())
    }

    async fn set_raw(
        &self,
        key: &CacheKey,
        value: Vec<u8>,
        _ttl: Option<Duration>,
    ) -> CacheResult<()> {
        let delay = {
            let mut state = self.state.lock().await;
            state.set_calls += 1;
            if state.set_errors > 0 {
                state.set_errors -= 1;
                return Err(CacheError::Backend("injected set failure".to_string()));
            }
            state.set_delay
        };
        sleep_if_needed(delay).await;
        self.entries.write().await.insert(key.render(), value);
        Ok(())
    }

    async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
        let delay = {
            let mut state = self.state.lock().await;
            state.delete_calls += 1;
            if state.delete_errors > 0 {
                state.delete_errors -= 1;
                return Err(CacheError::Backend("injected delete failure".to_string()));
            }
            state.delete_delay
        };
        sleep_if_needed(delay).await;
        self.entries.write().await.remove(&key.render());
        Ok(())
    }
}

async fn sleep_if_needed(delay: Duration) {
    if !delay.is_zero() {
        tokio::time::sleep(delay).await;
    }
}