use std::{env, time::Duration};
use super::runtime;
use crate::common::SuppressionFactorCacheMs;
use crate::hybrid::SyncIntervalMs;
use crate::{
HardLimitFactor, LocalRateLimiterOptions, RateGroupSizeMs, RateLimit, RateLimitDecision,
RateLimiter, RateLimiterOptions, RedisKey, RedisRateLimiterOptions, WindowSizeSeconds,
};
fn redis_url() -> String {
env::var("REDIS_URL").unwrap_or_else(|_| {
panic!(
"REDIS_URL env var must be set for Redis-backed tests (e.g. REDIS_URL=redis://127.0.0.1:16379/)"
)
})
}
fn unique_prefix() -> RedisKey {
let n: u64 = rand::random();
RedisKey::try_from(format!("trypema_test_{n}")).unwrap()
}
fn key(s: &str) -> RedisKey {
RedisKey::try_from(s.to_string()).unwrap()
}
async fn build_limiter(
url: &str,
window_size_seconds: u64,
rate_group_size_ms: u64,
hard_limit_factor: f64,
) -> std::sync::Arc<RateLimiter> {
build_limiter_with_cache_ms(
url,
window_size_seconds,
rate_group_size_ms,
hard_limit_factor,
*SuppressionFactorCacheMs::default(),
)
.await
}
async fn build_limiter_with_cache_ms(
url: &str,
window_size_seconds: u64,
rate_group_size_ms: u64,
hard_limit_factor: f64,
suppression_factor_cache_ms: u64,
) -> std::sync::Arc<RateLimiter> {
let client = redis::Client::open(url).unwrap();
let cm = client.get_connection_manager().await.unwrap();
let prefix = unique_prefix();
let options = RateLimiterOptions {
local: LocalRateLimiterOptions {
window_size_seconds: WindowSizeSeconds::try_from(window_size_seconds).unwrap(),
rate_group_size_ms: RateGroupSizeMs::try_from(rate_group_size_ms).unwrap(),
hard_limit_factor: HardLimitFactor::try_from(hard_limit_factor).unwrap(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::try_from(
suppression_factor_cache_ms,
)
.unwrap(),
},
redis: RedisRateLimiterOptions {
connection_manager: cm,
prefix: Some(prefix.clone()),
window_size_seconds: WindowSizeSeconds::try_from(window_size_seconds).unwrap(),
rate_group_size_ms: RateGroupSizeMs::try_from(rate_group_size_ms).unwrap(),
hard_limit_factor: HardLimitFactor::try_from(hard_limit_factor).unwrap(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::try_from(
suppression_factor_cache_ms,
)
.unwrap(),
sync_interval_ms: SyncIntervalMs::default(),
},
};
std::sync::Arc::new(RateLimiter::new(options))
}
#[test]
fn get_suppression_factor_fresh_key_returns_zero_and_sets_cache_ttl() {
let url = redis_url();
runtime::block_on(async {
let rl = build_limiter_with_cache_ms(&url, 10, 100, 2f64, 500).await;
let k = key("k");
let sf = rl
.redis()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
assert!((sf - 0.0).abs() < 1e-12, "sf: {sf}");
})
}
#[test]
fn get_suppression_factor_computed_uses_last_second_peak_rate_at_threshold_boundary() {
let url = redis_url();
runtime::block_on(async {
let cache_ms = 50_u64;
let rl = build_limiter_with_cache_ms(&url, 10, 100, 2f64, cache_ms).await;
let k = key("k");
let rate_limit = RateLimit::try_from(1f64).unwrap();
for _ in 0..11 {
let _ = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
std::thread::sleep(Duration::from_millis(cache_ms + 25));
let sf = rl
.redis()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
let expected = 1.0_f64 - (1.0_f64 / 11.0_f64);
assert!(
(sf - expected).abs() < 1e-12,
"sf: {sf}, expected: {expected}"
);
});
}
#[test]
fn get_suppression_factor_evicts_out_of_window_usage_and_resets_admission() {
let url = redis_url();
runtime::block_on(async {
let window_size_seconds = 1_u64;
let cache_ms = 50_u64;
let rl = build_limiter_with_cache_ms(&url, window_size_seconds, 1000, 2f64, cache_ms).await;
let k = key("k");
let rate_limit = RateLimit::try_from(1f64).unwrap();
let d1 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 2)
.await
.unwrap();
assert!(matches!(d1, RateLimitDecision::Allowed), "d1: {:?}", d1);
std::thread::sleep(Duration::from_millis(window_size_seconds * 1000 + 50));
std::thread::sleep(Duration::from_millis(cache_ms + 25));
let sf = rl
.redis()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
assert!((sf - 0.0).abs() < 1e-12, "sf: {sf}");
let d2 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(matches!(d2, RateLimitDecision::Allowed), "d2: {:?}", d2);
});
}
#[test]
fn suppression_factor_gt_one_is_invalid_and_is_recomputed() {
let url = redis_url();
runtime::block_on(async {
let rl = build_limiter(&url, 1, 1000, 10f64).await;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let decision = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(matches!(
decision,
RateLimitDecision::Allowed | RateLimitDecision::Suppressed { .. }
));
});
}
#[test]
fn suppression_factor_negative_is_invalid_and_is_recomputed() {
let url = redis_url();
runtime::block_on(async {
let rl = build_limiter(&url, 1, 1000, 10f64).await;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let decision = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(matches!(
decision,
RateLimitDecision::Allowed | RateLimitDecision::Suppressed { .. }
));
});
}
#[test]
fn verify_suppression_factor_calculation_spread_redis() {
let url = redis_url();
runtime::block_on(async {
let rl = build_limiter(&url, 10, 100, 10f64).await;
let k = key("k");
let rate_limit = RateLimit::try_from(1f64).unwrap();
for _ in 0..20 {
let _ = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
runtime::async_sleep(Duration::from_millis(3000 / 20)).await;
}
runtime::async_sleep(Duration::from_millis(1200)).await;
let expected_suppression_factor = 1f64 - (1f64 / 2f64);
let decision = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
eprintln!("decision: {:?}", decision);
assert!(
matches!(
decision,
RateLimitDecision::Suppressed {
suppression_factor,
..
} if (suppression_factor - expected_suppression_factor).abs() < 1e-12
),
"decision: {:?}",
decision
);
});
}
#[test]
fn verify_suppression_factor_calculation_last_second_redis() {
let url = redis_url();
runtime::block_on(async {
let rl = build_limiter(&url, 10, 100, 10f64).await;
let k = key("k");
let rate_limit = RateLimit::try_from(1f64).unwrap();
let _ = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 10)
.await
.unwrap();
runtime::async_sleep(Duration::from_millis(1001)).await;
let _ = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 20)
.await
.unwrap();
runtime::async_sleep(Duration::from_millis(101)).await;
let expected_suppression_factor = 1f64 - (1f64 / 20f64);
let decision = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(
matches!(
decision,
RateLimitDecision::Suppressed {
suppression_factor,
..
} if (suppression_factor - expected_suppression_factor).abs() < 1e-12
),
"decision: {:?}, expected sf: {expected_suppression_factor}",
decision
);
});
}
#[test]
fn verify_hard_limit_rejects() {
let url = redis_url();
runtime::block_on(async {
let rl = build_limiter(&url, 10, 100, 10f64).await;
let k = key("k");
let rate_limit = RateLimit::try_from(1f64).unwrap();
let _ = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 100)
.await
.unwrap();
runtime::async_sleep(Duration::from_millis(1001)).await;
let _ = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 20)
.await
.unwrap();
let decision = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(
matches!(
decision,
RateLimitDecision::Suppressed {
suppression_factor,
is_allowed: false,
} if suppression_factor == 1.0f64
),
"decision: {:?}",
decision
);
});
}
#[test]
fn suppressed_is_deterministically_allowed_until_base_capacity_boundary_redis() {
let url = redis_url();
runtime::block_on(async {
let window_size_seconds = 10_u64;
let hard_limit_factor = 2f64;
let rl = build_limiter(&url, window_size_seconds, 1000, hard_limit_factor).await;
let k = key("k_base");
let rate_limit = RateLimit::try_from(1f64).unwrap();
let base_capacity = window_size_seconds;
let d1 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, base_capacity - 1)
.await
.unwrap();
assert!(matches!(d1, RateLimitDecision::Allowed), "d1: {d1:?}");
let d2 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(matches!(d2, RateLimitDecision::Allowed), "d2: {d2:?}");
});
}
#[test]
fn suppressed_is_fully_denied_after_hard_limit_observed_redis() {
let url = redis_url();
runtime::block_on(async {
let window_size_seconds = 10_u64;
let hard_limit_factor = 2f64;
let rl = build_limiter_with_cache_ms(&url, window_size_seconds, 1000, hard_limit_factor, 1)
.await;
let k = key("k_hard");
let rate_limit = RateLimit::try_from(1f64).unwrap();
let hard_capacity = window_size_seconds * 2;
let d1 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, hard_capacity)
.await
.unwrap();
assert!(matches!(d1, RateLimitDecision::Allowed), "d1: {d1:?}");
runtime::async_sleep(Duration::from_millis(5)).await;
for i in 0..5u64 {
let d = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(
matches!(
d,
RateLimitDecision::Suppressed {
suppression_factor,
is_allowed: false,
} if (suppression_factor - 1.0).abs() < 1e-12
),
"i={i} d={d:?}"
);
}
});
}
#[test]
fn suppressed_redis_window_eviction_allows_fresh_burst_after_expiry() {
let url = redis_url();
runtime::block_on(async {
let window_size_seconds = 1_u64;
let hard_limit_factor = 1.0_f64;
let cache_ms = 5_u64;
let rate_limit = RateLimit::try_from(5f64).unwrap();
let window_limit = (window_size_seconds as f64 * *rate_limit * hard_limit_factor) as u64;
let rl = build_limiter_with_cache_ms(
&url,
window_size_seconds,
1_000,
hard_limit_factor,
cache_ms,
)
.await;
let k = key("k_evict");
let d1 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, window_limit)
.await
.unwrap();
assert!(matches!(d1, RateLimitDecision::Allowed), "d1: {d1:?}");
runtime::async_sleep(Duration::from_millis(cache_ms + 50)).await;
let sf_before = rl
.redis()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
assert!(
(sf_before - 1.0).abs() < 1e-12,
"expected sf=1.0 before window expiry, got {sf_before}"
);
std::thread::sleep(Duration::from_millis(window_size_seconds * 1_000 + 50));
runtime::async_sleep(Duration::from_millis(cache_ms + 50)).await;
let sf_after = rl
.redis()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
assert!(
(sf_after - 0.0).abs() < 1e-12,
"expected sf=0.0 after window expiry but got {sf_after} — \
old buckets were not evicted (window_size_ms passed instead of window_size_seconds?)"
);
let d2 = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
assert!(
matches!(d2, RateLimitDecision::Allowed),
"expected Allowed after window expiry, got {d2:?}"
);
});
}
#[test]
fn suppressed_redis_throughput_over_multiple_windows_stays_at_rate_limit() {
let url = redis_url();
runtime::block_on(async {
let window_size_seconds = 1_u64;
let hard_limit_factor = 1.5_f64;
let cache_ms = 5_u64;
let num_windows = 3_u64;
let rate_limit = RateLimit::try_from(10f64).unwrap();
let soft_limit = (window_size_seconds as f64 * *rate_limit) as u64;
let hard_limit = (soft_limit as f64 * hard_limit_factor) as u64;
let rl = build_limiter_with_cache_ms(
&url,
window_size_seconds,
100,
hard_limit_factor,
cache_ms,
)
.await;
let k = key("k_multi");
let mut total_allowed: u64 = 0;
for _window in 0..num_windows {
let burst = soft_limit * 10;
for _ in 0..burst {
let d = rl
.redis()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
match d {
RateLimitDecision::Allowed => total_allowed += 1,
RateLimitDecision::Suppressed { is_allowed, .. } => {
if is_allowed {
total_allowed += 1;
}
}
RateLimitDecision::Rejected { .. } => {
panic!("suppressed strategy must never return Rejected")
}
}
}
std::thread::sleep(Duration::from_millis(window_size_seconds * 1_000 + 50));
runtime::async_sleep(Duration::from_millis(cache_ms + 50)).await;
}
let expected_min = soft_limit * num_windows;
assert!(
total_allowed >= expected_min,
"total_allowed={total_allowed} but expected >= {expected_min} over {num_windows} windows \
(hard_limit={hard_limit}) — window eviction is likely broken"
);
});
}