use parking_lot::Mutex;
use serde::Deserialize;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Semaphore;
use dashmap::DashMap;
#[derive(Debug, Clone)]
struct LruEntry {
key: String,
generation: u64,
}
struct LruTracker {
queue: VecDeque<LruEntry>,
generations: HashMap<String, u64>,
next_generation: u64,
}
impl LruTracker {
fn new(capacity: usize) -> Self {
Self {
queue: VecDeque::with_capacity(capacity),
generations: HashMap::with_capacity(capacity),
next_generation: 0,
}
}
fn touch(&mut self, key: &str) -> bool {
let generation = self.next_generation;
self.next_generation = self.next_generation.wrapping_add(1);
let is_new = !self.generations.contains_key(key);
self.generations.insert(key.to_string(), generation);
self.queue.push_back(LruEntry {
key: key.to_string(),
generation,
});
is_new
}
fn evict_oldest(&mut self) -> Option<String> {
while let Some(entry) = self.queue.pop_front() {
if let Some(¤t_gen) = self.generations.get(&entry.key) {
if current_gen == entry.generation {
self.generations.remove(&entry.key);
return Some(entry.key);
}
}
}
None
}
fn remove(&mut self, key: &str) {
self.generations.remove(key);
}
fn clear(&mut self) {
self.queue.clear();
self.generations.clear();
self.next_generation = 0;
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct TarpitConfig {
pub base_delay_ms: u64,
pub max_delay_ms: u64,
pub progressive_multiplier: f64,
pub enabled: bool,
pub max_states: usize,
pub decay_threshold_ms: u64,
pub cleanup_threshold_ms: u64,
#[serde(default = "default_max_concurrent")]
pub max_concurrent_tarpits: usize,
}
fn default_max_concurrent() -> usize {
1000
}
impl Default for TarpitConfig {
fn default() -> Self {
Self {
base_delay_ms: 1000, max_delay_ms: 30000, progressive_multiplier: 1.5, enabled: true,
max_states: 10_000,
decay_threshold_ms: 5 * 60 * 1000, cleanup_threshold_ms: 30 * 60 * 1000, max_concurrent_tarpits: default_max_concurrent(),
}
}
}
#[derive(Debug, Clone)]
pub struct TarpitState {
pub ip: String,
pub delay_level: u32,
pub hit_count: u64,
pub last_tarpit_at: u64,
pub total_delay_ms: u64,
pub first_tarpit_at: u64,
}
impl TarpitState {
pub fn new(ip: String, now: u64) -> Self {
Self {
ip,
delay_level: 1,
hit_count: 0,
last_tarpit_at: now,
total_delay_ms: 0,
first_tarpit_at: now,
}
}
}
#[derive(Debug, Clone)]
pub struct TarpitDecision {
pub delay_ms: u64,
pub level: u32,
pub hit_count: u64,
pub is_tarpitted: bool,
}
#[derive(Debug, Clone)]
pub struct TarpitStats {
pub total_states: usize,
pub active_tarpits: usize,
pub total_hits: u64,
pub total_delay_ms: u64,
pub states_created: u64,
pub states_evicted: u64,
pub rejected_tarpits: u64,
pub available_slots: usize,
}
pub struct TarpitManager {
states: DashMap<String, TarpitState>,
config: TarpitConfig,
lru: Mutex<LruTracker>,
total_created: AtomicU64,
total_evicted: AtomicU64,
max_level: u32,
delay_semaphore: Arc<Semaphore>,
rejected_tarpits: AtomicU64,
}
impl Default for TarpitManager {
fn default() -> Self {
Self::new(TarpitConfig::default())
}
}
impl TarpitManager {
pub fn new(config: TarpitConfig) -> Self {
let max_level = if config.progressive_multiplier > 1.0 && config.base_delay_ms > 0 {
let ratio = config.max_delay_ms as f64 / config.base_delay_ms as f64;
(ratio.ln() / config.progressive_multiplier.ln()).ceil() as u32 + 1
} else {
1
};
let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tarpits));
Self {
states: DashMap::with_capacity(config.max_states),
lru: Mutex::new(LruTracker::new(config.max_states)),
delay_semaphore: semaphore,
rejected_tarpits: AtomicU64::new(0),
config,
total_created: AtomicU64::new(0),
total_evicted: AtomicU64::new(0),
max_level,
}
}
pub fn config(&self) -> &TarpitConfig {
&self.config
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn len(&self) -> usize {
self.states.len()
}
pub fn is_empty(&self) -> bool {
self.states.is_empty()
}
pub fn peek_delay(&self, ip: &str) -> TarpitDecision {
if !self.config.enabled {
return TarpitDecision {
delay_ms: 0,
level: 0,
hit_count: 0,
is_tarpitted: false,
};
}
match self.states.get(ip) {
Some(entry) => {
let state = entry.value();
let delay_ms = self.calculate_delay_for_level(state.delay_level);
TarpitDecision {
delay_ms,
level: state.delay_level,
hit_count: state.hit_count,
is_tarpitted: state.delay_level > 1,
}
}
None => TarpitDecision {
delay_ms: self.config.base_delay_ms,
level: 1,
hit_count: 0,
is_tarpitted: false,
},
}
}
pub fn tarpit(&self, ip: &str) -> TarpitDecision {
if !self.config.enabled {
return TarpitDecision {
delay_ms: 0,
level: 0,
hit_count: 0,
is_tarpitted: false,
};
}
let now = now_ms();
self.maybe_evict();
self.lru.lock().touch(ip);
let mut entry = self.states.entry(ip.to_string()).or_insert_with(|| {
self.total_created.fetch_add(1, Ordering::Relaxed);
TarpitState::new(ip.to_string(), now)
});
let state = entry.value_mut();
self.apply_decay(state, now);
let delay_ms = self.calculate_delay_for_level(state.delay_level);
state.hit_count += 1;
state.last_tarpit_at = now;
state.total_delay_ms += delay_ms;
state.delay_level = (state.delay_level + 1).min(self.max_level);
TarpitDecision {
delay_ms,
level: state.delay_level,
hit_count: state.hit_count,
is_tarpitted: state.delay_level > 1,
}
}
pub async fn apply_delay(&self, ip: &str) -> TarpitDecision {
let decision = self.tarpit(ip);
if decision.delay_ms > 0 {
match self.delay_semaphore.try_acquire() {
Ok(permit) => {
tokio::time::sleep(tokio::time::Duration::from_millis(decision.delay_ms)).await;
drop(permit);
}
Err(_) => {
self.rejected_tarpits.fetch_add(1, Ordering::Relaxed);
return TarpitDecision {
delay_ms: 0,
level: decision.level,
hit_count: decision.hit_count,
is_tarpitted: decision.is_tarpitted,
};
}
}
}
decision
}
pub fn rejected_count(&self) -> u64 {
self.rejected_tarpits.load(Ordering::Relaxed)
}
pub fn available_slots(&self) -> usize {
self.delay_semaphore.available_permits()
}
pub fn is_tarpitted(&self, ip: &str) -> bool {
self.states
.get(ip)
.map(|entry| entry.value().delay_level > 1)
.unwrap_or(false)
}
pub fn get_state(&self, ip: &str) -> Option<TarpitState> {
self.states.get(ip).map(|entry| entry.value().clone())
}
pub fn reset(&self, ip: &str) -> bool {
self.states.remove(ip).is_some()
}
pub fn reset_all(&self) -> usize {
let count = self.states.len();
self.states.clear();
count
}
pub fn start_background_tasks(self: Arc<Self>) {
let manager = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
manager.decay_all().await;
}
});
}
pub async fn decay_all(&self) {
let now = now_ms();
let decay_threshold = self.config.decay_threshold_ms;
let cleanup_threshold = self.config.cleanup_threshold_ms;
let mut actions = Vec::new();
for entry in self.states.iter() {
let state = entry.value();
let idle_time = now.saturating_sub(state.last_tarpit_at);
if idle_time > cleanup_threshold {
actions.push((entry.key().clone(), true));
} else if idle_time > decay_threshold && state.delay_level > 1 {
actions.push((entry.key().clone(), false));
}
}
const CHUNK_SIZE: usize = 100;
for (i, (ip, remove)) in actions.into_iter().enumerate() {
if remove {
if self.states.remove(&ip).is_some() {
self.total_evicted.fetch_add(1, Ordering::Relaxed);
}
} else {
if let Some(mut entry) = self.states.get_mut(&ip) {
let state = entry.value_mut();
let idle_time = now.saturating_sub(state.last_tarpit_at);
if idle_time > decay_threshold && state.delay_level > 1 {
let decay_periods = (idle_time / decay_threshold) as u32;
state.delay_level = state.delay_level.saturating_sub(decay_periods).max(1);
}
}
}
if (i + 1) % CHUNK_SIZE == 0 {
tokio::task::yield_now().await;
}
}
}
pub fn stats(&self) -> TarpitStats {
let mut total_hits = 0u64;
let mut total_delay_ms = 0u64;
let mut active_tarpits = 0usize;
for entry in self.states.iter() {
let state = entry.value();
total_hits += state.hit_count;
total_delay_ms += state.total_delay_ms;
if state.delay_level > 1 {
active_tarpits += 1;
}
}
TarpitStats {
total_states: self.states.len(),
active_tarpits,
total_hits,
total_delay_ms,
states_created: self.total_created.load(Ordering::Relaxed),
states_evicted: self.total_evicted.load(Ordering::Relaxed),
rejected_tarpits: self.rejected_tarpits.load(Ordering::Relaxed),
available_slots: self.delay_semaphore.available_permits(),
}
}
fn calculate_delay_for_level(&self, level: u32) -> u64 {
if level == 0 {
return 0;
}
let delay = self.config.base_delay_ms as f64
* self.config.progressive_multiplier.powi(level as i32 - 1);
(delay as u64).min(self.config.max_delay_ms)
}
fn apply_decay(&self, state: &mut TarpitState, now: u64) {
let idle_time = now.saturating_sub(state.last_tarpit_at);
let decay_threshold = self.config.decay_threshold_ms;
if idle_time > decay_threshold && state.delay_level > 1 {
let decay_periods = (idle_time / decay_threshold) as u32;
state.delay_level = state.delay_level.saturating_sub(decay_periods).max(1);
}
}
fn maybe_evict(&self) {
if self.states.len() < self.config.max_states {
return;
}
if let Some(ip) = self.lru.lock().evict_oldest() {
if self.states.remove(&ip).is_some() {
self.total_evicted.fetch_add(1, Ordering::Relaxed);
}
}
}
}
#[inline]
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tarpit_creation() {
let manager = TarpitManager::default();
assert!(manager.is_enabled());
assert!(manager.is_empty());
}
#[test]
fn test_calculate_delay_for_level() {
let manager = TarpitManager::default();
assert_eq!(manager.calculate_delay_for_level(1), 1000);
assert_eq!(manager.calculate_delay_for_level(2), 1500);
assert_eq!(manager.calculate_delay_for_level(3), 2250);
assert_eq!(manager.calculate_delay_for_level(4), 3375);
assert!(manager.calculate_delay_for_level(20) <= 30000);
}
#[test]
fn test_tarpit_progression() {
let manager = TarpitManager::default();
let d1 = manager.tarpit("192.168.1.1");
assert_eq!(d1.delay_ms, 1000); assert_eq!(d1.level, 2); assert_eq!(d1.hit_count, 1);
assert!(d1.is_tarpitted);
let d2 = manager.tarpit("192.168.1.1");
assert_eq!(d2.delay_ms, 1500); assert_eq!(d2.level, 3);
assert_eq!(d2.hit_count, 2);
let d3 = manager.tarpit("192.168.1.1");
assert_eq!(d3.delay_ms, 2250); assert_eq!(d3.level, 4);
assert_eq!(d3.hit_count, 3);
}
#[test]
fn test_peek_delay() {
let manager = TarpitManager::default();
let d1 = manager.peek_delay("192.168.1.1");
assert_eq!(d1.delay_ms, 1000);
assert_eq!(d1.level, 1);
assert_eq!(d1.hit_count, 0);
assert!(!d1.is_tarpitted);
manager.tarpit("192.168.1.1");
let d2 = manager.peek_delay("192.168.1.1");
assert_eq!(d2.level, 2);
assert_eq!(d2.hit_count, 1);
let d3 = manager.peek_delay("192.168.1.1");
assert_eq!(d3.level, 2);
assert_eq!(d3.hit_count, 1);
}
#[test]
fn test_max_delay_cap() {
let config = TarpitConfig {
base_delay_ms: 1000,
max_delay_ms: 5000,
progressive_multiplier: 2.0,
..Default::default()
};
let manager = TarpitManager::new(config);
for _ in 0..20 {
manager.tarpit("192.168.1.1");
}
let decision = manager.peek_delay("192.168.1.1");
assert!(decision.delay_ms <= 5000);
}
#[test]
fn test_reset() {
let manager = TarpitManager::default();
manager.tarpit("192.168.1.1");
manager.tarpit("192.168.1.1");
assert!(manager.is_tarpitted("192.168.1.1"));
let removed = manager.reset("192.168.1.1");
assert!(removed);
assert!(!manager.is_tarpitted("192.168.1.1"));
assert!(!manager.reset("192.168.1.2"));
}
#[test]
fn test_reset_all() {
let manager = TarpitManager::default();
manager.tarpit("192.168.1.1");
manager.tarpit("192.168.1.2");
manager.tarpit("192.168.1.3");
assert_eq!(manager.len(), 3);
let count = manager.reset_all();
assert_eq!(count, 3);
assert!(manager.is_empty());
}
#[test]
fn test_stats() {
let manager = TarpitManager::default();
manager.tarpit("192.168.1.1");
manager.tarpit("192.168.1.1");
manager.tarpit("192.168.1.2");
let stats = manager.stats();
assert_eq!(stats.total_states, 2);
assert_eq!(stats.total_hits, 3);
assert!(stats.total_delay_ms > 0);
assert_eq!(stats.active_tarpits, 2); }
#[test]
fn test_disabled() {
let config = TarpitConfig {
enabled: false,
..Default::default()
};
let manager = TarpitManager::new(config);
let decision = manager.tarpit("192.168.1.1");
assert_eq!(decision.delay_ms, 0);
assert_eq!(decision.level, 0);
assert!(!decision.is_tarpitted);
}
#[test]
fn test_lru_eviction() {
let config = TarpitConfig {
max_states: 3,
..Default::default()
};
let manager = TarpitManager::new(config);
manager.tarpit("1.1.1.1");
std::thread::sleep(std::time::Duration::from_millis(2));
manager.tarpit("2.2.2.2");
std::thread::sleep(std::time::Duration::from_millis(2));
manager.tarpit("3.3.3.3");
assert_eq!(manager.len(), 3);
manager.tarpit("4.4.4.4");
assert_eq!(manager.len(), 3);
assert!(
manager.get_state("1.1.1.1").is_none(),
"1.1.1.1 should have been evicted as oldest"
);
assert!(
manager.get_state("4.4.4.4").is_some(),
"4.4.4.4 should exist"
);
}
#[test]
fn test_max_level_calculation() {
let manager = TarpitManager::default();
assert!(manager.max_level >= 8 && manager.max_level <= 12);
for _ in 0..50 {
manager.tarpit("192.168.1.1");
}
let state = manager.get_state("192.168.1.1").unwrap();
assert!(state.delay_level <= manager.max_level);
}
#[tokio::test]
async fn test_apply_delay() {
let config = TarpitConfig {
base_delay_ms: 10, max_delay_ms: 100,
..Default::default()
};
let manager = TarpitManager::new(config);
let start = std::time::Instant::now();
let decision = manager.apply_delay("192.168.1.1").await;
let elapsed = start.elapsed();
assert!(elapsed.as_millis() >= decision.delay_ms as u128 - 5);
}
}