use super::{config::RateLimiterConfig, core::RateLimiter, utils::current_time_ms};
use dashmap::DashMap;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
use tracing::{debug, info, warn};
const MAX_TRACKED_IPS: usize = 10_000;
const CLEANUP_THRESHOLD: usize = (MAX_TRACKED_IPS * 90) / 100;
const CLEANUP_TARGET: usize = (MAX_TRACKED_IPS * 70) / 100;
const EMERGENCY_CLEANUP_INACTIVE_FACTOR: u64 = 2;
const EMERGENCY_CLEANUP_MIN_INACTIVE_MS: u64 = 1000;
#[derive(Clone)]
pub struct IpRateLimiterManager {
limiters: Arc<DashMap<IpAddr, Arc<RateLimiter>, ahash::RandomState>>,
active_count: Arc<AtomicUsize>,
config: RateLimiterConfig,
cleanup_interval_ms: u64,
inactive_duration_ms: u64,
total_created: Arc<AtomicU64>,
total_cleaned: Arc<AtomicU64>,
cleanup_in_progress: Arc<AtomicBool>,
}
impl IpRateLimiterManager {
pub fn new(config: RateLimiterConfig) -> Self {
let num_shards = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(8)
.next_power_of_two()
.clamp(2, 64);
let initial_capacity = (MAX_TRACKED_IPS / num_shards).max(128);
Self {
limiters: Arc::new(DashMap::with_capacity_and_hasher_and_shard_amount(
initial_capacity,
ahash::RandomState::new(), num_shards,
)),
active_count: Arc::new(AtomicUsize::new(0)),
config,
cleanup_interval_ms: 60_000, inactive_duration_ms: 300_000, total_created: Arc::new(AtomicU64::new(0)),
total_cleaned: Arc::new(AtomicU64::new(0)),
cleanup_in_progress: Arc::new(AtomicBool::new(false)),
}
}
pub fn with_cleanup_settings(
config: RateLimiterConfig,
cleanup_interval_ms: u64,
inactive_duration_ms: u64,
) -> Self {
let mut manager = Self::new(config);
manager.cleanup_interval_ms = cleanup_interval_ms;
manager.inactive_duration_ms = inactive_duration_ms;
manager
}
#[inline]
pub fn get_limiter(&self, ip: IpAddr) -> Option<Arc<RateLimiter>> {
if let Some(limiter) = self.limiters.get(&ip) {
return Some(limiter.clone());
}
self.get_limiter_slow(ip)
}
#[cold]
#[inline(never)]
fn get_limiter_slow(&self, ip: IpAddr) -> Option<Arc<RateLimiter>> {
let current = self.active_count.load(Ordering::Acquire);
if current >= MAX_TRACKED_IPS {
debug!("Rate limiter capacity reached, rejecting IP: {}", ip);
return None;
}
if current >= CLEANUP_THRESHOLD {
self.emergency_cleanup();
if self.active_count.load(Ordering::Acquire) >= MAX_TRACKED_IPS {
debug!(
"Rate limiter capacity reached after cleanup, rejecting IP: {}",
ip
);
return None;
}
}
let entry = self.limiters.entry(ip);
match entry {
dashmap::mapref::entry::Entry::Occupied(occupied) => Some(occupied.get().clone()),
dashmap::mapref::entry::Entry::Vacant(vacant) => {
let prev = self.active_count.fetch_add(1, Ordering::AcqRel);
if prev >= MAX_TRACKED_IPS {
self.active_count.fetch_sub(1, Ordering::AcqRel);
debug!("Rate limiter capacity race detected, rejecting IP: {}", ip);
return None;
}
let limiter = Arc::new(RateLimiter::with_config(self.config.clone()));
vacant.insert(limiter.clone());
self.total_created.fetch_add(1, Ordering::Relaxed);
debug!(
"Created new rate limiter for IP: {} (total: {})",
ip,
prev + 1
);
Some(limiter)
}
}
}
fn emergency_cleanup(&self) {
if self
.cleanup_in_progress
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return;
}
let _guard = CleanupGuard {
flag: &self.cleanup_in_progress,
};
let before = self.active_count.load(Ordering::Acquire);
if before <= CLEANUP_TARGET {
return; }
debug!("Starting emergency cleanup (current: {} IPs)", before);
let to_remove_count = before.saturating_sub(CLEANUP_TARGET);
let mut removed = 0;
let inactive_threshold = if cfg!(test) {
0 } else {
(self.inactive_duration_ms / EMERGENCY_CLEANUP_INACTIVE_FACTOR)
.max(EMERGENCY_CLEANUP_MIN_INACTIVE_MS)
};
let now = current_time_ms();
let mut candidates: Vec<(u64, IpAddr)> = Vec::with_capacity(to_remove_count.min(1000));
for entry in self.limiters.iter() {
let last_access = entry.value().get_last_access_ms();
let idle_time = now.saturating_sub(last_access);
if idle_time >= inactive_threshold {
candidates.push((idle_time, *entry.key()));
}
}
if cfg!(test) && candidates.len() < to_remove_count {
for entry in self.limiters.iter() {
if candidates.iter().any(|(_, ip)| ip == entry.key()) {
continue;
}
let last_access = entry.value().get_last_access_ms();
let idle_time = now.saturating_sub(last_access);
candidates.push((idle_time, *entry.key()));
}
}
if candidates.len() > to_remove_count {
candidates.select_nth_unstable_by(to_remove_count - 1, |a, b| b.0.cmp(&a.0));
}
for (_, ip) in candidates.iter().take(to_remove_count) {
if self.limiters.remove(ip).is_some() {
self.active_count.fetch_sub(1, Ordering::AcqRel);
removed += 1;
}
}
if removed > 0 {
self.total_cleaned.fetch_add(removed, Ordering::Relaxed);
debug!(
"Emergency cleanup removed {} limiters (target was {})",
removed, to_remove_count
);
}
let actual_len = self.limiters.len();
self.active_count.store(actual_len, Ordering::Release);
if actual_len > CLEANUP_TARGET && removed < to_remove_count as u64 {
debug!(
"Emergency cleanup incomplete: removed {}/{} entries, current count: {}",
removed, to_remove_count, actual_len
);
}
}
#[inline(always)]
pub fn try_acquire(&self, ip: IpAddr) -> bool {
if let Some(entry) = self.limiters.get(&ip) {
return entry.value().try_acquire();
}
self.try_acquire_slow(ip)
}
#[cold]
#[inline(never)]
fn try_acquire_slow(&self, ip: IpAddr) -> bool {
match self.get_limiter_slow(ip) {
Some(limiter) => limiter.try_acquire(),
None => false,
}
}
#[inline]
pub fn try_acquire_n(&self, ip: IpAddr, n: u64) -> bool {
if let Some(entry) = self.limiters.get(&ip) {
return entry.value().try_acquire_n(n);
}
match self.get_limiter_slow(ip) {
Some(limiter) => limiter.try_acquire_n(n),
None => false,
}
}
pub fn cleanup(&self) {
if self
.cleanup_in_progress
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return;
}
let _guard = CleanupGuard {
flag: &self.cleanup_in_progress,
};
let before = self.active_count.load(Ordering::Acquire);
let threshold = if before > CLEANUP_THRESHOLD {
self.inactive_duration_ms / 2
} else {
self.inactive_duration_ms
};
let mut removed = 0u64;
self.limiters.retain(|ip, limiter| {
if !limiter.is_inactive(threshold) {
true
} else {
debug!("Removing inactive limiter for IP: {}", ip);
removed += 1;
false
}
});
if removed > 0 {
self.total_cleaned.fetch_add(removed, Ordering::Relaxed);
debug!("Cleanup removed {} inactive limiters", removed);
}
let actual_len = self.limiters.len();
self.active_count.store(actual_len, Ordering::Release);
self.shrink_to_fit();
}
pub fn shrink_to_fit(&self) {
let current_size = self.limiters.len();
let capacity = self.limiters.capacity();
if capacity > current_size * 4 && capacity > 1024 {
self.limiters.shrink_to_fit();
debug!(
"Shrunk limiter map capacity from {} to ~{}",
capacity, current_size
);
}
}
#[inline]
pub fn active_ips(&self) -> usize {
self.active_count.load(Ordering::Acquire)
}
pub fn stats(&self) -> ManagerStats {
ManagerStats {
active_ips: self.active_ips(),
total_created: self.total_created.load(Ordering::Relaxed),
total_cleaned: self.total_cleaned.load(Ordering::Relaxed),
capacity_used: self.active_ips() as f64 / MAX_TRACKED_IPS as f64,
max_capacity: MAX_TRACKED_IPS,
}
}
#[deprecated(
since = "0.1.2",
note = "Use start_stoppable_cleanup_thread() instead for graceful shutdown support"
)]
pub fn start_cleanup_thread(self: Arc<Self>) -> thread::JoinHandle<()> {
let manager = self.clone();
thread::Builder::new()
.name("rater-cleanup".to_string())
.spawn(move || {
info!(
"Started cleanup thread (interval: {}ms, inactive threshold: {}ms)",
manager.cleanup_interval_ms, manager.inactive_duration_ms
);
loop {
thread::sleep(Duration::from_millis(manager.cleanup_interval_ms));
manager.cleanup();
let active = manager.active_ips();
if active > CLEANUP_THRESHOLD {
warn!(
"High IP usage: {} active limiters ({}% of capacity)",
active,
(active * 100) / MAX_TRACKED_IPS
);
}
}
})
.expect("Failed to spawn cleanup thread")
}
pub fn start_stoppable_cleanup_thread(
self: Arc<Self>,
) -> (thread::JoinHandle<()>, mpsc::Sender<()>) {
let (stop_tx, stop_rx) = mpsc::channel();
let manager = self.clone();
let handle = thread::Builder::new()
.name("rater-cleanup".to_string())
.spawn(move || {
info!(
"Started stoppable cleanup thread (interval: {}ms)",
manager.cleanup_interval_ms
);
loop {
match stop_rx.recv_timeout(Duration::from_millis(manager.cleanup_interval_ms)) {
Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => {
info!("Cleanup thread stopping");
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
manager.cleanup();
let active = manager.active_ips();
if active > CLEANUP_THRESHOLD {
warn!(
"High IP usage: {} active limiters ({}% of capacity)",
active,
(active * 100) / MAX_TRACKED_IPS
);
}
}
}
}
})
.expect("Failed to spawn cleanup thread");
(handle, stop_tx)
}
pub fn clear(&self) {
let count = self.limiters.len();
self.limiters.clear();
self.active_count.store(0, Ordering::Release);
self.total_cleaned
.fetch_add(count as u64, Ordering::Relaxed);
info!("Cleared all {} rate limiters", count);
}
}
impl std::fmt::Debug for IpRateLimiterManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IpRateLimiterManager")
.field("active_ips", &self.active_ips())
.field("cleanup_interval_ms", &self.cleanup_interval_ms)
.field("inactive_duration_ms", &self.inactive_duration_ms)
.finish()
}
}
struct CleanupGuard<'a> {
flag: &'a AtomicBool,
}
impl<'a> Drop for CleanupGuard<'a> {
fn drop(&mut self) {
self.flag.store(false, Ordering::Release);
}
}
#[derive(Debug, Clone)]
pub struct ManagerStats {
pub active_ips: usize,
pub total_created: u64,
pub total_cleaned: u64,
pub capacity_used: f64,
pub max_capacity: usize,
}
impl ManagerStats {
pub fn summary(&self) -> String {
format!(
"IP Rate Limiter Manager Stats:\n\
├─ Capacity:\n\
│ ├─ Active IPs: {}/{}\n\
│ ├─ Capacity Used: {:.2}%\n\
│ └─ Available Slots: {}\n\
└─ Lifetime:\n\
├─ Total Created: {}\n\
├─ Total Cleaned: {}\n\
└─ Net Active: {}",
self.active_ips,
self.max_capacity,
self.capacity_used * 100.0,
self.max_capacity - self.active_ips,
self.total_created,
self.total_cleaned,
self.total_created.saturating_sub(self.total_cleaned)
)
}
pub fn is_near_capacity(&self) -> bool {
self.capacity_used > 0.8
}
pub fn cleanup_ratio(&self) -> f64 {
if self.total_created == 0 {
0.0
} else {
self.total_cleaned as f64 / self.total_created as f64
}
}
}
impl std::fmt::Display for ManagerStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.summary())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::MemoryOrdering;
use std::net::{IpAddr, Ipv4Addr};
#[test]
#[cfg_attr(miri, ignore)]
fn test_basic_ip_limiting() {
use super::super::config::MemoryOrdering;
let config = RateLimiterConfig {
max_tokens: 5,
refill_rate: 1,
refill_interval_ms: 600_000,
ordering: MemoryOrdering::AcquireRelease,
};
let manager = IpRateLimiterManager::new(config);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
for _ in 0..5 {
assert!(manager.try_acquire(ip1));
assert!(manager.try_acquire(ip2));
}
assert!(!manager.try_acquire(ip1));
assert!(!manager.try_acquire(ip2));
assert_eq!(manager.active_ips(), 2);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_manager_cleanup() {
let config = RateLimiterConfig::default();
let manager = IpRateLimiterManager::with_cleanup_settings(
config, 1000, 50, );
for i in 0..10 {
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, i));
manager.try_acquire(ip);
}
assert_eq!(manager.active_ips(), 10);
thread::sleep(Duration::from_millis(100));
manager.cleanup();
assert!(manager.active_ips() < 10);
}
#[test]
fn test_manager_stats() {
let config = RateLimiterConfig::default();
let manager = IpRateLimiterManager::new(config);
for i in 0..5 {
let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, i));
manager.try_acquire(ip);
}
let stats = manager.stats();
assert_eq!(stats.active_ips, 5);
assert_eq!(stats.total_created, 5);
assert_eq!(stats.total_cleaned, 0);
assert!(stats.capacity_used > 0.0);
assert!(!stats.is_near_capacity());
let summary = stats.summary();
assert!(summary.contains("Active IPs: 5"));
}
#[test]
fn test_clear() {
let manager = IpRateLimiterManager::new(RateLimiterConfig::default());
for i in 0..10 {
let ip = IpAddr::V4(Ipv4Addr::new(172, 16, 0, i));
manager.try_acquire(ip);
}
assert_eq!(manager.active_ips(), 10);
manager.clear();
assert_eq!(manager.active_ips(), 0);
assert_eq!(manager.stats().total_cleaned, 10);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_concurrent_ip_access() {
use super::super::config::MemoryOrdering;
let config = RateLimiterConfig {
max_tokens: 100,
refill_rate: 10,
refill_interval_ms: 1000,
ordering: MemoryOrdering::AcquireRelease,
};
let manager = Arc::new(IpRateLimiterManager::new(config));
let mut handles = vec![];
for thread_id in 0..10 {
let manager_clone = manager.clone();
let handle = thread::spawn(move || {
let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, thread_id));
let mut acquired = 0;
for _ in 0..50 {
if manager_clone.try_acquire(ip) {
acquired += 1;
}
}
acquired
});
handles.push(handle);
}
let results: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
for acquired in results {
assert!(acquired > 0);
assert!(acquired <= 50);
}
assert_eq!(manager.active_ips(), 10);
}
#[test]
fn test_capacity_limit() {
let config = RateLimiterConfig::default();
let manager = IpRateLimiterManager::new(config);
manager
.active_count
.store(MAX_TRACKED_IPS, Ordering::Release);
let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
assert!(manager.get_limiter(ip).is_none());
manager.active_count.store(0, Ordering::Release);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_emergency_cleanup() {
let config = RateLimiterConfig::default();
let manager = IpRateLimiterManager::new(config);
for i in 0..CLEANUP_THRESHOLD {
let ip = IpAddr::V4(Ipv4Addr::new(
(i / 16777216) as u8,
((i / 65536) % 256) as u8,
((i / 256) % 256) as u8,
(i % 256) as u8,
));
manager.get_limiter(ip);
}
let before_cleanup = manager.active_ips();
assert_eq!(before_cleanup, CLEANUP_THRESHOLD);
let trigger_ip = IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255));
manager.get_limiter(trigger_ip);
let after_cleanup = manager.active_ips();
assert!(
after_cleanup <= CLEANUP_TARGET + 1,
"Expected {} IPs after cleanup, but got {}",
CLEANUP_TARGET + 1,
after_cleanup
);
}
#[test]
fn test_shrink_to_fit() {
let manager = IpRateLimiterManager::new(RateLimiterConfig::default());
for i in 0..100 {
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, i));
manager.get_limiter(ip);
}
manager.clear();
for i in 0..5 {
let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, i));
manager.get_limiter(ip);
}
manager.shrink_to_fit();
assert_eq!(manager.active_ips(), 5);
}
#[test]
#[allow(deprecated)]
#[cfg_attr(miri, ignore)]
fn test_cleanup_thread() {
let manager = Arc::new(IpRateLimiterManager::with_cleanup_settings(
RateLimiterConfig::default(),
100, 50, ));
for i in 0..10 {
let ip = IpAddr::V4(Ipv4Addr::new(172, 16, 0, i));
manager.try_acquire(ip);
}
let manager_clone = manager.clone();
let handle = manager_clone.start_cleanup_thread();
std::thread::sleep(Duration::from_millis(500));
assert!(
manager.active_ips() < 10,
"Expected cleanup to remove some IPs, but active_ips is still {}",
manager.active_ips()
);
drop(handle);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_stoppable_cleanup_thread() {
let manager = Arc::new(IpRateLimiterManager::with_cleanup_settings(
RateLimiterConfig::default(),
100,
50,
));
for i in 0..5 {
let ip = IpAddr::V4(Ipv4Addr::new(10, 10, 10, i));
manager.try_acquire(ip);
}
let (handle, stop_tx) = manager.clone().start_stoppable_cleanup_thread();
std::thread::sleep(Duration::from_millis(500));
stop_tx.send(()).unwrap();
handle.join().unwrap();
assert!(manager.active_ips() <= 5);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_get_limiter_race_condition() {
let manager = Arc::new(IpRateLimiterManager::new(RateLimiterConfig::default()));
let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
let mut handles = vec![];
for _ in 0..10 {
let manager_clone = manager.clone();
handles.push(thread::spawn(move || {
manager_clone.get_limiter(ip).is_some()
}));
}
let results: Vec<bool> = handles.into_iter().map(|h| h.join().unwrap()).collect();
assert!(results.iter().all(|&r| r));
assert_eq!(manager.active_ips(), 1);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_concurrent_emergency_cleanup() {
let manager = Arc::new(IpRateLimiterManager::new(RateLimiterConfig::default()));
for i in 0..CLEANUP_THRESHOLD {
let ip = IpAddr::V4(Ipv4Addr::new(
(i / 16777216) as u8,
((i / 65536) % 256) as u8,
((i / 256) % 256) as u8,
(i % 256) as u8,
));
manager.get_limiter(ip);
}
let mut handles = vec![];
for i in 0..5 {
let manager_clone = manager.clone();
handles.push(thread::spawn(move || {
let ip = IpAddr::V4(Ipv4Addr::new(255, 255, 255, 250 + i));
manager_clone.get_limiter(ip)
}));
}
for handle in handles {
handle.join().unwrap();
}
assert!(manager.active_ips() <= CLEANUP_TARGET + 5);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_cleanup_with_active_limiters() {
let manager =
IpRateLimiterManager::with_cleanup_settings(RateLimiterConfig::default(), 1000, 200);
for i in 0..10 {
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 2, i));
if i < 5 {
manager.try_acquire(ip);
} else {
manager.get_limiter(ip);
}
}
std::thread::sleep(Duration::from_millis(350));
for i in 0..5 {
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 2, i));
manager.try_acquire(ip);
}
manager.cleanup();
let active = manager.active_ips();
assert!(
active >= 5,
"Expected at least 5 active IPs after cleanup, got {}",
active
);
assert!(active <= 10);
}
#[test]
fn test_manager_stats_calculations() {
let manager = IpRateLimiterManager::new(RateLimiterConfig::default());
for i in 0..20 {
let ip = IpAddr::V4(Ipv4Addr::new(10, 20, 30, i));
manager.get_limiter(ip);
}
manager.clear();
let stats = manager.stats();
assert_eq!(stats.total_created, 20);
assert_eq!(stats.total_cleaned, 20);
assert!(stats.cleanup_ratio() > 0.0);
assert!(!stats.is_near_capacity());
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_try_acquire_n() {
let manager = IpRateLimiterManager::new(RateLimiterConfig {
max_tokens: 10,
refill_rate: 1,
refill_interval_ms: 600_000,
ordering: MemoryOrdering::AcquireRelease,
});
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
assert!(manager.try_acquire_n(ip, 5));
assert!(manager.try_acquire_n(ip, 3));
assert!(!manager.try_acquire_n(ip, 5));
}
#[test]
fn test_cleanup_guard_drop() {
use super::CleanupGuard;
let flag = AtomicBool::new(true);
{
let _guard = CleanupGuard { flag: &flag };
assert!(flag.load(Ordering::Acquire));
}
assert!(!flag.load(Ordering::Acquire));
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_active_count_reconciliation_after_cleanup() {
let manager = IpRateLimiterManager::with_cleanup_settings(
RateLimiterConfig::default(),
1000,
50, );
for i in 0..20 {
let ip = IpAddr::V4(Ipv4Addr::new(10, 10, 0, i));
manager.get_limiter(ip);
}
assert_eq!(manager.active_ips(), 20);
thread::sleep(Duration::from_millis(100));
manager.cleanup();
let active = manager.active_ips();
let actual_len = manager.limiters.len();
assert_eq!(active, actual_len);
}
#[test]
fn test_fast_path_try_acquire_existing_ip() {
let manager = IpRateLimiterManager::new(RateLimiterConfig {
max_tokens: 100,
refill_rate: 10,
refill_interval_ms: 1000,
ordering: MemoryOrdering::AcquireRelease,
});
let ip: IpAddr = "10.0.0.1".parse().unwrap();
assert!(manager.try_acquire(ip));
assert_eq!(manager.active_ips(), 1);
for _ in 0..50 {
manager.try_acquire(ip);
}
assert_eq!(manager.active_ips(), 1);
}
#[test]
fn test_fast_path_try_acquire_n_existing_ip() {
let manager = IpRateLimiterManager::new(RateLimiterConfig {
max_tokens: 100,
refill_rate: 10,
refill_interval_ms: 1000,
ordering: MemoryOrdering::AcquireRelease,
});
let ip: IpAddr = "10.0.0.1".parse().unwrap();
assert!(manager.try_acquire_n(ip, 5));
assert!(manager.try_acquire_n(ip, 10));
assert!(manager.try_acquire_n(ip, 20));
assert_eq!(manager.active_ips(), 1);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_ipv6_support() {
let manager = IpRateLimiterManager::new(RateLimiterConfig {
max_tokens: 10,
refill_rate: 1,
refill_interval_ms: 600_000,
ordering: MemoryOrdering::AcquireRelease,
});
let ipv4: IpAddr = "192.168.1.1".parse().unwrap();
let ipv6: IpAddr = "::1".parse().unwrap();
let ipv6_full: IpAddr = "2001:db8::1".parse().unwrap();
for _ in 0..10 {
assert!(manager.try_acquire(ipv4));
assert!(manager.try_acquire(ipv6));
assert!(manager.try_acquire(ipv6_full));
}
assert!(!manager.try_acquire(ipv4));
assert!(!manager.try_acquire(ipv6));
assert!(!manager.try_acquire(ipv6_full));
assert_eq!(manager.active_ips(), 3);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_concurrent_cleanup_and_acquire() {
let manager = Arc::new(IpRateLimiterManager::with_cleanup_settings(
RateLimiterConfig {
max_tokens: 100,
refill_rate: 10,
refill_interval_ms: 1000,
ordering: MemoryOrdering::AcquireRelease,
},
50, 100, ));
for i in 0..50u8 {
let ip = IpAddr::V4(Ipv4Addr::new(10, 10, 0, i));
manager.get_limiter(ip);
}
let mut handles = vec![];
for t in 0..5 {
let m = manager.clone();
handles.push(thread::spawn(move || {
for i in 0..20 {
let ip = IpAddr::V4(Ipv4Addr::new(20, t, 0, i));
m.try_acquire(ip);
}
}));
}
for _ in 0..3 {
let m = manager.clone();
handles.push(thread::spawn(move || {
for _ in 0..5 {
m.cleanup();
thread::sleep(Duration::from_millis(10));
}
}));
}
for h in handles {
h.join().unwrap();
}
let active = manager.active_ips();
let actual = manager.limiters.len();
assert_eq!(active, actual);
}
#[test]
fn test_debug_impl() {
let manager = IpRateLimiterManager::with_cleanup_settings(
RateLimiterConfig::default(),
30_000,
120_000,
);
let debug = format!("{:?}", manager);
assert!(debug.contains("IpRateLimiterManager"));
assert!(debug.contains("cleanup_interval_ms"));
assert!(debug.contains("inactive_duration_ms"));
}
#[test]
fn test_manager_stats_display() {
let manager = IpRateLimiterManager::new(RateLimiterConfig::default());
let stats = manager.stats();
let display = format!("{}", stats);
assert!(display.contains("IP Rate Limiter Manager Stats"));
assert!(display.contains("Active IPs"));
}
#[test]
fn test_cleanup_ratio() {
let stats = ManagerStats {
active_ips: 5,
total_created: 100,
total_cleaned: 95,
capacity_used: 0.05,
max_capacity: MAX_TRACKED_IPS,
};
assert!((stats.cleanup_ratio() - 0.95).abs() < 0.001);
}
#[test]
fn test_cleanup_ratio_zero_created() {
let stats = ManagerStats {
active_ips: 0,
total_created: 0,
total_cleaned: 0,
capacity_used: 0.0,
max_capacity: MAX_TRACKED_IPS,
};
assert_eq!(stats.cleanup_ratio(), 0.0);
}
#[test]
fn test_manager_try_acquire_at_capacity_returns_false() {
let manager = IpRateLimiterManager::new(RateLimiterConfig::default());
manager
.active_count
.store(MAX_TRACKED_IPS, Ordering::Release);
let ip: IpAddr = "1.2.3.4".parse().unwrap();
assert!(!manager.try_acquire(ip));
assert!(!manager.try_acquire_n(ip, 5));
manager.active_count.store(0, Ordering::Release);
}
#[test]
fn test_get_limiter_returns_same_limiter() {
let manager = IpRateLimiterManager::new(RateLimiterConfig {
max_tokens: 10,
refill_rate: 1,
refill_interval_ms: 1000,
ordering: MemoryOrdering::AcquireRelease,
});
let ip: IpAddr = "10.0.0.1".parse().unwrap();
let limiter1 = manager.get_limiter(ip).unwrap();
let limiter2 = manager.get_limiter(ip).unwrap();
assert!(limiter1.try_acquire());
assert_eq!(limiter1.available_tokens(), limiter2.available_tokens());
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_clear_then_reuse() {
let manager = IpRateLimiterManager::new(RateLimiterConfig {
max_tokens: 5,
refill_rate: 1,
refill_interval_ms: 600_000,
ordering: MemoryOrdering::AcquireRelease,
});
let ip: IpAddr = "10.0.0.1".parse().unwrap();
for _ in 0..5 {
manager.try_acquire(ip);
}
assert!(!manager.try_acquire(ip));
manager.clear();
assert_eq!(manager.active_ips(), 0);
assert!(manager.try_acquire(ip));
assert_eq!(manager.active_ips(), 1);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_stoppable_thread_stops_on_sender_drop() {
let manager = Arc::new(IpRateLimiterManager::with_cleanup_settings(
RateLimiterConfig::default(),
50,
50,
));
let (handle, stop_tx) = manager.clone().start_stoppable_cleanup_thread();
drop(stop_tx);
handle.join().unwrap();
}
}