#![allow(dead_code)]
#[cfg(feature = "cache")]
use std::collections::BTreeMap;
use std::{future::Future, time::Duration};
#[cfg(feature = "cache")]
use async_trait::async_trait;
#[cfg(feature = "cache")]
use rs_zero::cache::{CacheError, CacheKey, CacheResult, CacheStore};
#[cfg(feature = "cache")]
use tokio::sync::{Mutex, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LongRunConfig {
pub seconds: u64,
pub tasks: usize,
pub keys: usize,
}
impl LongRunConfig {
pub fn from_env() -> Self {
Self {
seconds: env_u64("RS_ZERO_LONG_TEST_SECONDS", 5).max(1),
tasks: env_usize("RS_ZERO_LONG_TEST_TASKS", 16).max(1),
keys: env_usize("RS_ZERO_LONG_TEST_KEYS", 64).max(1),
}
}
pub fn duration(self) -> Duration {
Duration::from_secs(self.seconds)
}
}
pub async fn eventually<F, Fut>(attempts: usize, delay: Duration, mut check: F)
where
F: FnMut() -> Fut,
Fut: Future<Output = bool>,
{
for _ in 0..attempts {
if check().await {
return;
}
tokio::time::sleep(delay).await;
}
assert!(check().await, "condition did not become true in time");
}
pub fn env_usize(name: &str, default: usize) -> usize {
std::env::var(name)
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(default)
}
pub fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(default)
}
pub fn cache_key(namespace: &str, index: usize) -> CacheKey {
CacheKey::new(namespace, [format!("key-{index}")])
}
#[cfg(feature = "cache")]
#[derive(Debug, Clone, Default)]
pub struct FaultyCacheStore {
entries: std::sync::Arc<RwLock<BTreeMap<String, Vec<u8>>>>,
state: std::sync::Arc<Mutex<FaultState>>,
}
#[cfg(feature = "cache")]
#[derive(Debug, Default)]
struct FaultState {
get_errors: usize,
set_errors: usize,
delete_errors: usize,
get_delay: Duration,
set_delay: Duration,
delete_delay: Duration,
get_calls: usize,
set_calls: usize,
delete_calls: usize,
}
#[cfg(feature = "cache")]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct FaultyCacheSnapshot {
pub entries: usize,
pub get_calls: usize,
pub set_calls: usize,
pub delete_calls: usize,
}
#[cfg(feature = "cache")]
impl FaultyCacheStore {
pub fn new() -> Self {
Self::default()
}
pub async fn fail_next_gets(&self, count: usize) {
self.state.lock().await.get_errors += count;
}
pub async fn fail_next_sets(&self, count: usize) {
self.state.lock().await.set_errors += count;
}
pub async fn fail_next_deletes(&self, count: usize) {
self.state.lock().await.delete_errors += count;
}
pub async fn set_delays(&self, get: Duration, set: Duration, delete: Duration) {
let mut state = self.state.lock().await;
state.get_delay = get;
state.set_delay = set;
state.delete_delay = delete;
}
pub async fn insert_raw(&self, key: &CacheKey, value: impl Into<Vec<u8>>) {
self.entries
.write()
.await
.insert(key.render(), value.into());
}
pub async fn snapshot(&self) -> FaultyCacheSnapshot {
let state = self.state.lock().await;
FaultyCacheSnapshot {
entries: self.entries.read().await.len(),
get_calls: state.get_calls,
set_calls: state.set_calls,
delete_calls: state.delete_calls,
}
}
}
#[cfg(feature = "cache")]
#[async_trait]
impl CacheStore for FaultyCacheStore {
async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
let delay = {
let mut state = self.state.lock().await;
state.get_calls += 1;
if state.get_errors > 0 {
state.get_errors -= 1;
return Err(CacheError::Backend("injected get failure".to_string()));
}
state.get_delay
};
sleep_if_needed(delay).await;
Ok(self.entries.read().await.get(&key.render()).cloned())
}
async fn set_raw(
&self,
key: &CacheKey,
value: Vec<u8>,
_ttl: Option<Duration>,
) -> CacheResult<()> {
let delay = {
let mut state = self.state.lock().await;
state.set_calls += 1;
if state.set_errors > 0 {
state.set_errors -= 1;
return Err(CacheError::Backend("injected set failure".to_string()));
}
state.set_delay
};
sleep_if_needed(delay).await;
self.entries.write().await.insert(key.render(), value);
Ok(())
}
async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
let delay = {
let mut state = self.state.lock().await;
state.delete_calls += 1;
if state.delete_errors > 0 {
state.delete_errors -= 1;
return Err(CacheError::Backend("injected delete failure".to_string()));
}
state.delete_delay
};
sleep_if_needed(delay).await;
self.entries.write().await.remove(&key.render());
Ok(())
}
}
async fn sleep_if_needed(delay: Duration) {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
}