rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
#![cfg(any(feature = "cache", feature = "resil"))]

mod support;

use std::{
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::Duration,
};

#[cfg(feature = "cache")]
use rs_zero::cache::{CacheAside, CacheAsideConfig, CacheKey, CacheStore, LruCacheStore};
#[cfg(feature = "resil")]
use rs_zero::resil::{
    AdaptiveShedder, AdaptiveShedderConfig, BreakerConfig, BreakerState, ConcurrencyLimit,
    SharedCircuitBreaker,
};
#[cfg(feature = "cache")]
use support::stress::FaultyCacheStore;
use support::stress::LongRunConfig;

#[cfg(feature = "cache")]
#[derive(Clone)]
struct CacheLongRunner {
    config: LongRunConfig,
    cache: CacheAside<FaultyCacheStore>,
    lru: LruCacheStore,
    loader_calls: Arc<AtomicUsize>,
    loader_failures: Arc<AtomicUsize>,
    deadline: tokio::time::Instant,
}

#[cfg(feature = "cache")]
impl CacheLongRunner {
    fn new(config: LongRunConfig) -> Self {
        let store = FaultyCacheStore::new();
        Self {
            config,
            cache: CacheAside::new(store, cache_long_config()),
            lru: LruCacheStore::new(config.keys.max(4) / 2).expect("lru"),
            loader_calls: Arc::new(AtomicUsize::new(0)),
            loader_failures: Arc::new(AtomicUsize::new(0)),
            deadline: tokio::time::Instant::now() + config.duration(),
        }
    }

    async fn run_task(self, task: usize) -> usize {
        let mut round = 0usize;
        while tokio::time::Instant::now() < self.deadline {
            self.run_round(task, round).await;
            round += 1;
            tokio::task::yield_now().await;
        }
        round
    }

    async fn run_round(&self, task: usize, round: usize) {
        let key_index = (task * 31 + round) % self.config.keys;
        let key = CacheKey::new("long-cache", [format!("key-{key_index}")]);
        match round % 11 {
            0 => self.cache.delete(&key).await.expect("delete"),
            1 => self.write_lru(&key, round).await,
            2 => {
                let _ = self.lru.get_raw(&key).await.expect("lru get");
            }
            _ => self.load_cache_aside(key, key_index, round).await,
        }
    }

    async fn write_lru(&self, key: &CacheKey, round: usize) {
        self.lru
            .set_raw(key, format!("lru-{round}").into_bytes(), None)
            .await
            .expect("lru set");
    }

    async fn load_cache_aside(&self, key: CacheKey, key_index: usize, round: usize) {
        let calls = self.loader_calls.clone();
        let failures = self.loader_failures.clone();
        let result: rs_zero::cache::CacheResult<Option<serde_json::Value>> = self
            .cache
            .get_or_load_json(&key, || async move {
                calls.fetch_add(1, Ordering::SeqCst);
                cache_long_loader(key_index, round, failures)
            })
            .await;
        if let Err(error) = result {
            assert!(error.to_string().contains("planned loader fault"));
        }
    }
}

#[cfg(feature = "cache")]
fn cache_long_config() -> CacheAsideConfig {
    CacheAsideConfig {
        value_ttl: Duration::from_millis(250),
        not_found_ttl: Duration::from_millis(80),
        ttl_jitter_ratio: 0.0,
    }
}

#[cfg(feature = "cache")]
fn cache_long_loader(
    key_index: usize,
    round: usize,
    failures: Arc<AtomicUsize>,
) -> rs_zero::cache::CacheResult<Option<serde_json::Value>> {
    if (key_index + round).is_multiple_of(17) {
        failures.fetch_add(1, Ordering::SeqCst);
        return Err(rs_zero::cache::CacheError::Backend(
            "planned loader fault".to_string(),
        ));
    }
    if (key_index + round).is_multiple_of(13) {
        return Ok(None);
    }
    Ok(Some(serde_json::json!({
        "key": key_index,
        "round": round,
    })))
}

#[cfg(feature = "cache")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "long-running cache soak test; tune with RS_ZERO_LONG_TEST_SECONDS/TASKS/KEYS"]
async fn cache_mixed_workload_long_running_short_profile() {
    let config = LongRunConfig::from_env();
    let runner = CacheLongRunner::new(config);
    let mut handles = Vec::new();
    for task in 0..config.tasks {
        handles.push(tokio::spawn(runner.clone().run_task(task)));
    }

    let mut rounds = 0usize;
    for handle in handles {
        rounds += handle.await.expect("join");
    }

    let stats = runner.cache.stats().snapshot();
    assert!(rounds > config.tasks, "rounds={rounds}");
    assert!(runner.loader_calls.load(Ordering::SeqCst) > 0);
    assert_eq!(
        stats.loader_errors,
        runner.loader_failures.load(Ordering::SeqCst) as u64
    );
    assert!(stats.misses >= runner.loader_calls.load(Ordering::SeqCst) as u64);
    assert!(runner.lru.snapshot().await.entries <= runner.lru.capacity());
}

#[cfg(feature = "resil")]
#[derive(Clone)]
struct ResilienceLongRunner {
    limit: ConcurrencyLimit,
    breaker: SharedCircuitBreaker,
    shedder: AdaptiveShedder,
    successes: Arc<AtomicUsize>,
    failures: Arc<AtomicUsize>,
    rejections: Arc<AtomicUsize>,
    deadline: tokio::time::Instant,
}

#[cfg(feature = "resil")]
impl ResilienceLongRunner {
    fn new(config: LongRunConfig) -> Self {
        Self {
            limit: ConcurrencyLimit::new(config.tasks.clamp(2, 8)),
            breaker: SharedCircuitBreaker::new(BreakerConfig {
                failure_threshold: 5,
                reset_timeout: Duration::from_millis(25),
            }),
            shedder: AdaptiveShedder::new(AdaptiveShedderConfig {
                max_in_flight: config.tasks.clamp(2, 8),
                min_request_count: 1,
                max_latency: Duration::from_millis(2),
                overload_in_flight_percent: 20,
                cool_off: Duration::from_millis(10),
                ..AdaptiveShedderConfig::default()
            }),
            successes: Arc::new(AtomicUsize::new(0)),
            failures: Arc::new(AtomicUsize::new(0)),
            rejections: Arc::new(AtomicUsize::new(0)),
            deadline: tokio::time::Instant::now() + config.duration(),
        }
    }

    async fn run_task(self, task: usize) -> usize {
        let mut round = 0usize;
        while tokio::time::Instant::now() < self.deadline {
            self.run_round(task, round).await;
            round += 1;
            tokio::task::yield_now().await;
        }
        round
    }

    async fn run_round(&self, task: usize, round: usize) {
        let protected = self
            .limit
            .run({
                let breaker = self.breaker.clone();
                let shedder = self.shedder.clone();
                async move { resilience_long_operation(breaker, shedder, task, round).await }
            })
            .await;

        match protected {
            Ok(Ok(())) => self.successes.fetch_add(1, Ordering::SeqCst),
            Ok(Err(_)) => self.failures.fetch_add(1, Ordering::SeqCst),
            Err(_) => self.rejections.fetch_add(1, Ordering::SeqCst),
        };
    }
}

#[cfg(feature = "resil")]
async fn resilience_long_operation(
    breaker: SharedCircuitBreaker,
    shedder: AdaptiveShedder,
    task: usize,
    round: usize,
) -> Result<(), &'static str> {
    let guard = match shedder.allow().await {
        Ok(guard) => guard,
        Err(_) => return Err("shedder"),
    };
    let outcome = breaker
        .do_request(|| async move {
            if (task + round).is_multiple_of(19) {
                tokio::time::sleep(Duration::from_millis(4)).await;
            }
            if (task + round).is_multiple_of(7) {
                Err("backend")
            } else {
                Ok(())
            }
        })
        .await;
    match outcome {
        Ok(()) => {
            guard.record_success().await;
            Ok(())
        }
        Err(_) => {
            guard.record_failure().await;
            Err("breaker")
        }
    }
}

#[cfg(feature = "resil")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "long-running resilience soak test; tune with RS_ZERO_LONG_TEST_SECONDS/TASKS/KEYS"]
async fn resilience_mixed_workload_long_running_short_profile() {
    let config = LongRunConfig::from_env();
    let runner = ResilienceLongRunner::new(config);
    let mut handles = Vec::new();
    for task in 0..config.tasks {
        handles.push(tokio::spawn(runner.clone().run_task(task)));
    }

    let mut rounds = 0usize;
    for handle in handles {
        rounds += handle.await.expect("join");
    }

    let breaker_snapshot = runner.breaker.snapshot().await;
    let shedder_snapshot = runner.shedder.snapshot().await;
    let total = runner.successes.load(Ordering::SeqCst)
        + runner.failures.load(Ordering::SeqCst)
        + runner.rejections.load(Ordering::SeqCst);
    assert!(rounds > config.tasks, "rounds={rounds}");
    assert!(total > 0);
    assert_eq!(shedder_snapshot.in_flight, 0);
    assert_eq!(breaker_snapshot.half_open_in_flight, 0);
    assert!(matches!(
        breaker_snapshot.state,
        BreakerState::Closed | BreakerState::Open | BreakerState::HalfOpen
    ));
    assert!(
        shedder_snapshot.window.total() > 0 || breaker_snapshot.window.total() > 0,
        "shedder={shedder_snapshot:?}, breaker={breaker_snapshot:?}"
    );
}