use std::{collections::HashMap, time::Duration};
use redis::AsyncCommands;
use super::common::{key, key_gen, redis_url, unique_prefix, wait_for_hybrid_sync};
use super::runtime;
use crate::common::RateType;
use crate::hybrid::SyncIntervalMs;
use crate::{
HardLimitFactor, LocalRateLimiterOptions, RateGroupSizeMs, RateLimit, RateLimitDecision,
RateLimiter, RateLimiterOptions, RedisKey, RedisRateLimiterOptions, SuppressionFactorCacheMs,
WindowSizeSeconds,
};
async fn build_limiter(
url: &str,
window_size_seconds: u64,
rate_group_size_ms: u64,
hard_limit_factor: f64,
suppression_factor_cache_ms: u64,
sync_interval_ms: u64,
prefix: RedisKey,
) -> std::sync::Arc<RateLimiter> {
let client = redis::Client::open(url).unwrap();
let cm = client.get_connection_manager().await.unwrap();
let options = RateLimiterOptions {
local: LocalRateLimiterOptions {
window_size_seconds: WindowSizeSeconds::try_from(window_size_seconds).unwrap(),
rate_group_size_ms: RateGroupSizeMs::try_from(rate_group_size_ms).unwrap(),
hard_limit_factor: HardLimitFactor::try_from(hard_limit_factor).unwrap(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::try_from(
suppression_factor_cache_ms,
)
.unwrap(),
},
redis: RedisRateLimiterOptions {
connection_manager: cm,
prefix: Some(prefix),
window_size_seconds: WindowSizeSeconds::try_from(window_size_seconds).unwrap(),
rate_group_size_ms: RateGroupSizeMs::try_from(rate_group_size_ms).unwrap(),
hard_limit_factor: HardLimitFactor::try_from(hard_limit_factor).unwrap(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::try_from(
suppression_factor_cache_ms,
)
.unwrap(),
sync_interval_ms: SyncIntervalMs::try_from(sync_interval_ms).unwrap(),
},
};
std::sync::Arc::new(RateLimiter::new(options))
}
fn redis_key(prefix: &RedisKey, user_key: &RedisKey, suffix: &str) -> String {
let kg = key_gen(prefix, RateType::HybridSuppressed);
match suffix {
"h" => kg.get_hash_key(user_key),
"hd" => kg.get_hash_declined_key(user_key),
"a" => kg.get_active_keys(user_key),
"w" => kg.get_window_limit_key(user_key),
"t" => kg.get_total_count_key(user_key),
"d" => kg.get_total_declined_key(user_key),
"sf" => kg.get_suppression_factor_key(user_key),
_ => panic!("unknown suffix for hybrid_suppressed rate type: {suffix}"),
}
}
#[test]
fn redis_state_hybrid_suppressed_no_redis_keys_before_soft_limit_overflow() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 1_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 2000_u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
100,
sync_interval_ms,
prefix.clone(),
)
.await;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
for _ in 0..soft_cap {
let d = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
match d {
RateLimitDecision::Allowed => {}
RateLimitDecision::Suppressed {
is_allowed: true, ..
} => {}
other => panic!("unexpected: {other:?}"),
}
}
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let total: Option<u64> = conn.get(redis_key(&prefix, &k, "t")).await.unwrap();
assert!(
total.is_none(),
"total count key must not exist before soft-limit overflow"
);
let hash_len: u64 = conn.hlen(redis_key(&prefix, &k, "h")).await.unwrap();
assert_eq!(hash_len, 0, "hash must be empty before soft-limit overflow");
});
}
#[test]
fn redis_state_hybrid_suppressed_commit_writes_total_count_after_overflow() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 1_u64;
let hard_limit_factor = 1.0_f64; let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
for _ in 0..soft_cap {
let d = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
match d {
RateLimitDecision::Allowed => {}
RateLimitDecision::Suppressed {
is_allowed: true, ..
} => {}
other => panic!("unexpected: {other:?}"),
}
}
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
wait_for_hybrid_sync(sync_interval_ms).await;
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let total: u64 = conn.get(redis_key(&prefix, &k, "t")).await.unwrap();
assert!(
total > 0,
"total count should be > 0 after overflow commit, got {total}"
);
assert!(
total <= soft_cap + 1,
"total count ({total}) should not exceed soft_cap + 1 ({soft_cap} + 1)"
);
let hash: HashMap<String, u64> = conn.hgetall(redis_key(&prefix, &k, "h")).await.unwrap();
assert!(
!hash.is_empty(),
"hash must have at least one bucket after commit"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_sf_is_one_at_hard_limit_after_commit() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 1_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let hard_cap = (window_size_seconds as f64 * *rate_limit * hard_limit_factor) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=hard_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
runtime::async_sleep(Duration::from_millis(cache_ms + 50)).await;
let rl2 = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let sf = rl2
.hybrid()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
assert!(
(sf - 1.0).abs() < 1e-12,
"suppression factor must be 1.0 at hard limit, got {sf}"
);
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let sf_raw: Option<String> = conn.get(redis_key(&prefix, &k, "sf")).await.unwrap();
assert!(
sf_raw.is_some(),
"sf cache key must be set after read_state at hard limit"
);
let sf_val: f64 = sf_raw.unwrap().parse().expect("sf must be a valid float");
assert!(
(sf_val - 1.0).abs() < 1e-12,
"sf cache must equal 1.0 at hard limit, got {sf_val}"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_sf_cache_key_has_ttl() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 10_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 25_u64;
let cache_ms = 500_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(1f64).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=soft_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let rl2 = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let _ = rl2
.hybrid()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let pttl: i64 = conn.pttl(redis_key(&prefix, &k, "sf")).await.unwrap();
assert!(
pttl > 0,
"sf cache key must have a positive TTL (PX set), got {pttl}"
);
assert!(
pttl <= cache_ms as i64,
"TTL must be <= cache_ms={cache_ms}, got {pttl}"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_hash_sums_match_counters_after_commit_with_denials() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 1_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let hard_cap = (window_size_seconds as f64 * *rate_limit * hard_limit_factor) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=hard_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let rl2 = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..5 {
let _ = rl2
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let total: u64 = conn.get(redis_key(&prefix, &k, "t")).await.unwrap();
let declined: u64 = conn.get(redis_key(&prefix, &k, "d")).await.unwrap_or(0u64);
let hash: HashMap<String, u64> = conn.hgetall(redis_key(&prefix, &k, "h")).await.unwrap();
let hash_d: HashMap<String, u64> =
conn.hgetall(redis_key(&prefix, &k, "hd")).await.unwrap();
let hash_sum: u64 = hash.values().sum();
let hash_d_sum: u64 = hash_d.values().sum();
assert_eq!(
hash_sum, total,
"hash sum ({hash_sum}) must equal total count ({total})"
);
assert_eq!(
hash_d_sum, declined,
"declined hash sum ({hash_d_sum}) must equal declined count ({declined})"
);
assert!(
total >= declined,
"total ({total}) must be >= declined ({declined})"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_evicts_expired_buckets_on_next_read_state() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 1_u64;
let hard_limit_factor = 1.0_f64; let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let cap = (window_size_seconds as f64 * *rate_limit) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
runtime::async_sleep(Duration::from_millis(window_size_seconds * 1000 + 100)).await;
runtime::async_sleep(Duration::from_millis(cache_ms + 50)).await;
let rl2 = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let sf = rl2
.hybrid()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
assert!(
(sf - 0.0).abs() < 1e-12,
"suppression factor must be 0.0 after window expiry, got {sf}"
);
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let total: u64 = conn.get(redis_key(&prefix, &k, "t")).await.unwrap_or(0);
assert_eq!(
total, 0,
"total count must be 0 after window eviction, got {total}"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_different_prefixes_are_isolated() {
let url = redis_url();
runtime::block_on(async {
let prefix_a = unique_prefix();
let prefix_b = unique_prefix();
let window_size_seconds = 1_u64;
let sync_interval_ms = 25_u64;
let hard_limit_factor = 1.0_f64;
let cache_ms = 5_u64;
let rl_a = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix_a.clone(),
)
.await;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let cap = (window_size_seconds as f64 * *rate_limit) as u64;
for _ in 0..=cap {
let _ = rl_a
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let total_b: Option<u64> = conn.get(redis_key(&prefix_b, &k, "t")).await.unwrap();
assert!(
total_b.is_none(),
"prefix B should not have any state after prefix A's commit"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_does_not_contaminate_redis_suppressed_keyspace() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 1_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 2000_u64; let cache_ms = 100_u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
for _ in 0..soft_cap {
let d = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
match d {
RateLimitDecision::Allowed => {}
RateLimitDecision::Suppressed {
is_allowed: true, ..
} => {}
other => panic!("unexpected: {other:?}"),
}
}
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let suppressed_total_key = format!("{}:{}:suppressed:t", prefix.as_str(), k.as_str());
let suppressed_total: Option<u64> = conn.get(&suppressed_total_key).await.unwrap();
assert!(
suppressed_total.is_none(),
"redis suppressed keyspace must not be contaminated by hybrid suppressed writes"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_cleanup_removes_all_redis_keys_for_stale_entity() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 5_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 25_u64;
let cache_ms = 60_000_u64; let stale_after_ms = 150_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let hard_cap = (window_size_seconds as f64 * *rate_limit * hard_limit_factor) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..hard_cap + 2 {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let rl2 = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let _ = rl2
.hybrid()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
wait_for_hybrid_sync(sync_interval_ms * 3).await;
let kg = key_gen(&prefix, RateType::HybridSuppressed);
let active_entities_key = kg.get_active_entities_key();
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
for entity_key in kg.get_all_entity_keys(&k) {
let exists: bool = conn.exists(&entity_key).await.unwrap();
assert!(exists, "key {entity_key} must exist before cleanup");
}
let score: Option<f64> = conn.zscore(&active_entities_key, k.as_str()).await.unwrap();
assert!(
score.is_some(),
"entity must be in active_entities before cleanup"
);
runtime::async_sleep(Duration::from_millis(stale_after_ms + 50)).await;
rl2.hybrid()
.suppressed()
.cleanup(stale_after_ms)
.await
.unwrap();
for entity_key in kg.get_all_entity_keys(&k) {
let exists: bool = conn.exists(&entity_key).await.unwrap();
assert!(!exists, "key {entity_key} must be deleted after cleanup");
}
let score_after: Option<f64> = conn.zscore(&active_entities_key, k.as_str()).await.unwrap();
assert!(
score_after.is_none(),
"entity must be removed from active_entities after cleanup"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_cleanup_does_not_remove_active_entity() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 5_u64;
let hard_limit_factor = 1.0_f64;
let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let stale_after_ms = 5_000_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=soft_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
rl.hybrid()
.suppressed()
.cleanup(stale_after_ms)
.await
.unwrap();
let active_entities_key =
key_gen(&prefix, RateType::HybridSuppressed).get_active_entities_key();
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let t_exists: bool = conn.exists(redis_key(&prefix, &k, "t")).await.unwrap();
assert!(
t_exists,
"total count key must still exist for active entity"
);
let score: Option<f64> = conn.zscore(&active_entities_key, k.as_str()).await.unwrap();
assert!(
score.is_some(),
"active entity must remain in active_entities after cleanup"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_cleanup_allows_fresh_requests_after_cleanup() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 5_u64;
let hard_limit_factor = 2.0_f64; let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let stale_after_ms = 150_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(2f64).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=soft_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
runtime::async_sleep(Duration::from_millis(stale_after_ms + 50)).await;
rl.hybrid()
.suppressed()
.cleanup(stale_after_ms)
.await
.unwrap();
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let t_exists: bool = conn.exists(redis_key(&prefix, &k, "t")).await.unwrap();
assert!(!t_exists, "total count key must be deleted after cleanup");
let decision = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
let is_allowed = match decision {
RateLimitDecision::Allowed => true,
RateLimitDecision::Suppressed { is_allowed, .. } => is_allowed,
_ => false,
};
assert!(
is_allowed,
"expected Allowed or Suppressed{{is_allowed:true}} after cleanup but got {decision:?}"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_sf_key_deleted_by_cleanup() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 5_u64;
let hard_limit_factor = 2.0_f64;
let sync_interval_ms = 25_u64;
let cache_ms = 60_000_u64; let stale_after_ms = 150_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(5f64).unwrap();
let hard_cap = (window_size_seconds as f64 * *rate_limit * hard_limit_factor) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=hard_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let rl2 = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
let _ = rl2
.hybrid()
.suppressed()
.get_suppression_factor(&k)
.await
.unwrap();
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let pttl: i64 = conn.pttl(redis_key(&prefix, &k, "sf")).await.unwrap();
assert!(
pttl > 1_000,
"sf key must have a long TTL before cleanup, got {pttl}ms"
);
runtime::async_sleep(Duration::from_millis(stale_after_ms + 50)).await;
rl2.hybrid()
.suppressed()
.cleanup(stale_after_ms)
.await
.unwrap();
let sf_exists: bool = conn.exists(redis_key(&prefix, &k, "sf")).await.unwrap();
assert!(
!sf_exists,
"sf key must be deleted by cleanup, not left to expire via TTL"
);
});
}
#[test]
fn redis_state_hybrid_suppressed_window_limit_key_equals_hard_limit_after_commit() {
let url = redis_url();
runtime::block_on(async {
let prefix = unique_prefix();
let window_size_seconds = 5_u64;
let rate_limit_value = 2f64;
let hard_limit_factor = 3.0_f64;
let expected_hard_limit = 30_u64;
let sync_interval_ms = 25_u64;
let cache_ms = 5_u64;
let k = key("k");
let rate_limit = RateLimit::try_from(rate_limit_value).unwrap();
let soft_cap = (window_size_seconds as f64 * *rate_limit) as u64;
let rl = build_limiter(
&url,
window_size_seconds,
1000,
hard_limit_factor,
cache_ms,
sync_interval_ms,
prefix.clone(),
)
.await;
for _ in 0..=soft_cap {
let _ = rl
.hybrid()
.suppressed()
.inc(&k, &rate_limit, 1)
.await
.unwrap();
}
wait_for_hybrid_sync(sync_interval_ms).await;
let mut conn = redis::Client::open(url.as_str())
.unwrap()
.get_multiplexed_async_connection()
.await
.unwrap();
let stored_limit: u64 = conn.get(redis_key(&prefix, &k, "w")).await.unwrap();
assert_eq!(
stored_limit, expected_hard_limit,
"stored window limit should equal hard_limit"
);
});
}