#![cfg(any(feature = "cache", feature = "resil"))]
mod support;
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
#[cfg(feature = "cache")]
use rs_zero::cache::{CacheAside, CacheAsideConfig, CacheKey, CacheStore, LruCacheStore};
#[cfg(feature = "resil")]
use rs_zero::resil::{
AdaptiveShedder, AdaptiveShedderConfig, BreakerConfig, BreakerState, ConcurrencyLimit,
SharedCircuitBreaker,
};
#[cfg(feature = "cache")]
use support::stress::FaultyCacheStore;
use support::stress::LongRunConfig;
#[cfg(feature = "cache")]
#[derive(Clone)]
struct CacheLongRunner {
config: LongRunConfig,
cache: CacheAside<FaultyCacheStore>,
lru: LruCacheStore,
loader_calls: Arc<AtomicUsize>,
loader_failures: Arc<AtomicUsize>,
deadline: tokio::time::Instant,
}
#[cfg(feature = "cache")]
impl CacheLongRunner {
fn new(config: LongRunConfig) -> Self {
let store = FaultyCacheStore::new();
Self {
config,
cache: CacheAside::new(store, cache_long_config()),
lru: LruCacheStore::new(config.keys.max(4) / 2).expect("lru"),
loader_calls: Arc::new(AtomicUsize::new(0)),
loader_failures: Arc::new(AtomicUsize::new(0)),
deadline: tokio::time::Instant::now() + config.duration(),
}
}
async fn run_task(self, task: usize) -> usize {
let mut round = 0usize;
while tokio::time::Instant::now() < self.deadline {
self.run_round(task, round).await;
round += 1;
tokio::task::yield_now().await;
}
round
}
async fn run_round(&self, task: usize, round: usize) {
let key_index = (task * 31 + round) % self.config.keys;
let key = CacheKey::new("long-cache", [format!("key-{key_index}")]);
match round % 11 {
0 => self.cache.delete(&key).await.expect("delete"),
1 => self.write_lru(&key, round).await,
2 => {
let _ = self.lru.get_raw(&key).await.expect("lru get");
}
_ => self.load_cache_aside(key, key_index, round).await,
}
}
async fn write_lru(&self, key: &CacheKey, round: usize) {
self.lru
.set_raw(key, format!("lru-{round}").into_bytes(), None)
.await
.expect("lru set");
}
async fn load_cache_aside(&self, key: CacheKey, key_index: usize, round: usize) {
let calls = self.loader_calls.clone();
let failures = self.loader_failures.clone();
let result: rs_zero::cache::CacheResult<Option<serde_json::Value>> = self
.cache
.get_or_load_json(&key, || async move {
calls.fetch_add(1, Ordering::SeqCst);
cache_long_loader(key_index, round, failures)
})
.await;
if let Err(error) = result {
assert!(error.to_string().contains("planned loader fault"));
}
}
}
#[cfg(feature = "cache")]
fn cache_long_config() -> CacheAsideConfig {
CacheAsideConfig {
value_ttl: Duration::from_millis(250),
not_found_ttl: Duration::from_millis(80),
ttl_jitter_ratio: 0.0,
}
}
#[cfg(feature = "cache")]
fn cache_long_loader(
key_index: usize,
round: usize,
failures: Arc<AtomicUsize>,
) -> rs_zero::cache::CacheResult<Option<serde_json::Value>> {
if (key_index + round).is_multiple_of(17) {
failures.fetch_add(1, Ordering::SeqCst);
return Err(rs_zero::cache::CacheError::Backend(
"planned loader fault".to_string(),
));
}
if (key_index + round).is_multiple_of(13) {
return Ok(None);
}
Ok(Some(serde_json::json!({
"key": key_index,
"round": round,
})))
}
#[cfg(feature = "cache")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "long-running cache soak test; tune with RS_ZERO_LONG_TEST_SECONDS/TASKS/KEYS"]
async fn cache_mixed_workload_long_running_short_profile() {
let config = LongRunConfig::from_env();
let runner = CacheLongRunner::new(config);
let mut handles = Vec::new();
for task in 0..config.tasks {
handles.push(tokio::spawn(runner.clone().run_task(task)));
}
let mut rounds = 0usize;
for handle in handles {
rounds += handle.await.expect("join");
}
let stats = runner.cache.stats().snapshot();
assert!(rounds > config.tasks, "rounds={rounds}");
assert!(runner.loader_calls.load(Ordering::SeqCst) > 0);
assert_eq!(
stats.loader_errors,
runner.loader_failures.load(Ordering::SeqCst) as u64
);
assert!(stats.misses >= runner.loader_calls.load(Ordering::SeqCst) as u64);
assert!(runner.lru.snapshot().await.entries <= runner.lru.capacity());
}
#[cfg(feature = "resil")]
#[derive(Clone)]
struct ResilienceLongRunner {
limit: ConcurrencyLimit,
breaker: SharedCircuitBreaker,
shedder: AdaptiveShedder,
successes: Arc<AtomicUsize>,
failures: Arc<AtomicUsize>,
rejections: Arc<AtomicUsize>,
deadline: tokio::time::Instant,
}
#[cfg(feature = "resil")]
impl ResilienceLongRunner {
fn new(config: LongRunConfig) -> Self {
Self {
limit: ConcurrencyLimit::new(config.tasks.clamp(2, 8)),
breaker: SharedCircuitBreaker::new(BreakerConfig {
failure_threshold: 5,
reset_timeout: Duration::from_millis(25),
}),
shedder: AdaptiveShedder::new(AdaptiveShedderConfig {
max_in_flight: config.tasks.clamp(2, 8),
min_request_count: 1,
max_latency: Duration::from_millis(2),
overload_in_flight_percent: 20,
cool_off: Duration::from_millis(10),
..AdaptiveShedderConfig::default()
}),
successes: Arc::new(AtomicUsize::new(0)),
failures: Arc::new(AtomicUsize::new(0)),
rejections: Arc::new(AtomicUsize::new(0)),
deadline: tokio::time::Instant::now() + config.duration(),
}
}
async fn run_task(self, task: usize) -> usize {
let mut round = 0usize;
while tokio::time::Instant::now() < self.deadline {
self.run_round(task, round).await;
round += 1;
tokio::task::yield_now().await;
}
round
}
async fn run_round(&self, task: usize, round: usize) {
let protected = self
.limit
.run({
let breaker = self.breaker.clone();
let shedder = self.shedder.clone();
async move { resilience_long_operation(breaker, shedder, task, round).await }
})
.await;
match protected {
Ok(Ok(())) => self.successes.fetch_add(1, Ordering::SeqCst),
Ok(Err(_)) => self.failures.fetch_add(1, Ordering::SeqCst),
Err(_) => self.rejections.fetch_add(1, Ordering::SeqCst),
};
}
}
#[cfg(feature = "resil")]
async fn resilience_long_operation(
breaker: SharedCircuitBreaker,
shedder: AdaptiveShedder,
task: usize,
round: usize,
) -> Result<(), &'static str> {
let guard = match shedder.allow().await {
Ok(guard) => guard,
Err(_) => return Err("shedder"),
};
let outcome = breaker
.do_request(|| async move {
if (task + round).is_multiple_of(19) {
tokio::time::sleep(Duration::from_millis(4)).await;
}
if (task + round).is_multiple_of(7) {
Err("backend")
} else {
Ok(())
}
})
.await;
match outcome {
Ok(()) => {
guard.record_success().await;
Ok(())
}
Err(_) => {
guard.record_failure().await;
Err("breaker")
}
}
}
#[cfg(feature = "resil")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "long-running resilience soak test; tune with RS_ZERO_LONG_TEST_SECONDS/TASKS/KEYS"]
async fn resilience_mixed_workload_long_running_short_profile() {
let config = LongRunConfig::from_env();
let runner = ResilienceLongRunner::new(config);
let mut handles = Vec::new();
for task in 0..config.tasks {
handles.push(tokio::spawn(runner.clone().run_task(task)));
}
let mut rounds = 0usize;
for handle in handles {
rounds += handle.await.expect("join");
}
let breaker_snapshot = runner.breaker.snapshot().await;
let shedder_snapshot = runner.shedder.snapshot().await;
let total = runner.successes.load(Ordering::SeqCst)
+ runner.failures.load(Ordering::SeqCst)
+ runner.rejections.load(Ordering::SeqCst);
assert!(rounds > config.tasks, "rounds={rounds}");
assert!(total > 0);
assert_eq!(shedder_snapshot.in_flight, 0);
assert_eq!(breaker_snapshot.half_open_in_flight, 0);
assert!(matches!(
breaker_snapshot.state,
BreakerState::Closed | BreakerState::Open | BreakerState::HalfOpen
));
assert!(
shedder_snapshot.window.total() > 0 || breaker_snapshot.window.total() > 0,
"shedder={shedder_snapshot:?}, breaker={breaker_snapshot:?}"
);
}