1use parking_lot::Mutex;
6use serde::Deserialize;
7use std::collections::{HashMap, VecDeque};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use tokio::sync::Semaphore;
12
13use dashmap::DashMap;
14
15#[derive(Debug, Clone)]
21struct LruEntry {
22 key: String,
24 generation: u64,
26}
27
28struct LruTracker {
30 queue: VecDeque<LruEntry>,
32 generations: HashMap<String, u64>,
34 next_generation: u64,
36}
37
38impl LruTracker {
39 fn new(capacity: usize) -> Self {
40 Self {
41 queue: VecDeque::with_capacity(capacity),
42 generations: HashMap::with_capacity(capacity),
43 next_generation: 0,
44 }
45 }
46
47 fn touch(&mut self, key: &str) -> bool {
49 let generation = self.next_generation;
50 self.next_generation = self.next_generation.wrapping_add(1);
51
52 let is_new = !self.generations.contains_key(key);
53 self.generations.insert(key.to_string(), generation);
54 self.queue.push_back(LruEntry {
55 key: key.to_string(),
56 generation,
57 });
58
59 is_new
60 }
61
62 fn evict_oldest(&mut self) -> Option<String> {
64 while let Some(entry) = self.queue.pop_front() {
65 if let Some(¤t_gen) = self.generations.get(&entry.key) {
66 if current_gen == entry.generation {
67 self.generations.remove(&entry.key);
68 return Some(entry.key);
69 }
70 }
71 }
72 None
73 }
74
75 fn remove(&mut self, key: &str) {
76 self.generations.remove(key);
77 }
78
79 fn clear(&mut self) {
80 self.queue.clear();
81 self.generations.clear();
82 self.next_generation = 0;
83 }
84}
85
86#[derive(Debug, Clone, Deserialize)]
88pub struct TarpitConfig {
89 pub base_delay_ms: u64,
91 pub max_delay_ms: u64,
93 pub progressive_multiplier: f64,
95 pub enabled: bool,
97 pub max_states: usize,
99 pub decay_threshold_ms: u64,
101 pub cleanup_threshold_ms: u64,
103 #[serde(default = "default_max_concurrent")]
106 pub max_concurrent_tarpits: usize,
107}
108
109fn default_max_concurrent() -> usize {
110 1000
111}
112
113impl Default for TarpitConfig {
114 fn default() -> Self {
115 Self {
116 base_delay_ms: 1000, max_delay_ms: 30000, progressive_multiplier: 1.5, enabled: true,
120 max_states: 10_000,
121 decay_threshold_ms: 5 * 60 * 1000, cleanup_threshold_ms: 30 * 60 * 1000, max_concurrent_tarpits: default_max_concurrent(),
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
130pub struct TarpitState {
131 pub ip: String,
133 pub delay_level: u32,
135 pub hit_count: u64,
137 pub last_tarpit_at: u64,
139 pub total_delay_ms: u64,
141 pub first_tarpit_at: u64,
143}
144
145impl TarpitState {
146 pub fn new(ip: String, now: u64) -> Self {
148 Self {
149 ip,
150 delay_level: 1,
151 hit_count: 0,
152 last_tarpit_at: now,
153 total_delay_ms: 0,
154 first_tarpit_at: now,
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct TarpitDecision {
162 pub delay_ms: u64,
164 pub level: u32,
166 pub hit_count: u64,
168 pub is_tarpitted: bool,
170}
171
172#[derive(Debug, Clone)]
174pub struct TarpitStats {
175 pub total_states: usize,
177 pub active_tarpits: usize,
179 pub total_hits: u64,
181 pub total_delay_ms: u64,
183 pub states_created: u64,
185 pub states_evicted: u64,
187 pub rejected_tarpits: u64,
189 pub available_slots: usize,
191}
192
193pub struct TarpitManager {
198 states: DashMap<String, TarpitState>,
200 config: TarpitConfig,
202 lru: Mutex<LruTracker>,
204 total_created: AtomicU64,
206 total_evicted: AtomicU64,
208 max_level: u32,
210 delay_semaphore: Arc<Semaphore>,
212 rejected_tarpits: AtomicU64,
214}
215
216impl Default for TarpitManager {
217 fn default() -> Self {
218 Self::new(TarpitConfig::default())
219 }
220}
221
222impl TarpitManager {
223 pub fn new(config: TarpitConfig) -> Self {
225 let max_level = if config.progressive_multiplier > 1.0 && config.base_delay_ms > 0 {
228 let ratio = config.max_delay_ms as f64 / config.base_delay_ms as f64;
229 (ratio.ln() / config.progressive_multiplier.ln()).ceil() as u32 + 1
230 } else {
231 1
232 };
233
234 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tarpits));
235
236 Self {
237 states: DashMap::with_capacity(config.max_states),
238 lru: Mutex::new(LruTracker::new(config.max_states)),
239 delay_semaphore: semaphore,
240 rejected_tarpits: AtomicU64::new(0),
241 config,
242 total_created: AtomicU64::new(0),
243 total_evicted: AtomicU64::new(0),
244 max_level,
245 }
246 }
247
248 pub fn config(&self) -> &TarpitConfig {
250 &self.config
251 }
252
253 pub fn is_enabled(&self) -> bool {
255 self.config.enabled
256 }
257
258 pub fn len(&self) -> usize {
260 self.states.len()
261 }
262
263 pub fn is_empty(&self) -> bool {
265 self.states.is_empty()
266 }
267
268 pub fn peek_delay(&self, ip: &str) -> TarpitDecision {
272 if !self.config.enabled {
273 return TarpitDecision {
274 delay_ms: 0,
275 level: 0,
276 hit_count: 0,
277 is_tarpitted: false,
278 };
279 }
280
281 match self.states.get(ip) {
282 Some(entry) => {
283 let state = entry.value();
284 let delay_ms = self.calculate_delay_for_level(state.delay_level);
285 TarpitDecision {
286 delay_ms,
287 level: state.delay_level,
288 hit_count: state.hit_count,
289 is_tarpitted: state.delay_level > 1,
290 }
291 }
292 None => TarpitDecision {
293 delay_ms: self.config.base_delay_ms,
294 level: 1,
295 hit_count: 0,
296 is_tarpitted: false,
297 },
298 }
299 }
300
301 pub fn tarpit(&self, ip: &str) -> TarpitDecision {
305 if !self.config.enabled {
306 return TarpitDecision {
307 delay_ms: 0,
308 level: 0,
309 hit_count: 0,
310 is_tarpitted: false,
311 };
312 }
313
314 let now = now_ms();
315
316 self.maybe_evict();
318
319 self.lru.lock().touch(ip);
321
322 let mut entry = self.states.entry(ip.to_string()).or_insert_with(|| {
324 self.total_created.fetch_add(1, Ordering::Relaxed);
325 TarpitState::new(ip.to_string(), now)
326 });
327
328 let state = entry.value_mut();
329
330 self.apply_decay(state, now);
332
333 let delay_ms = self.calculate_delay_for_level(state.delay_level);
335
336 state.hit_count += 1;
338 state.last_tarpit_at = now;
339 state.total_delay_ms += delay_ms;
340
341 state.delay_level = (state.delay_level + 1).min(self.max_level);
343
344 TarpitDecision {
345 delay_ms,
346 level: state.delay_level,
347 hit_count: state.hit_count,
348 is_tarpitted: state.delay_level > 1,
349 }
350 }
351
352 pub async fn apply_delay(&self, ip: &str) -> TarpitDecision {
360 let decision = self.tarpit(ip);
361
362 if decision.delay_ms > 0 {
363 match self.delay_semaphore.try_acquire() {
365 Ok(permit) => {
366 tokio::time::sleep(tokio::time::Duration::from_millis(decision.delay_ms)).await;
367 drop(permit);
369 }
370 Err(_) => {
371 self.rejected_tarpits.fetch_add(1, Ordering::Relaxed);
373 return TarpitDecision {
375 delay_ms: 0,
376 level: decision.level,
377 hit_count: decision.hit_count,
378 is_tarpitted: decision.is_tarpitted,
379 };
380 }
381 }
382 }
383
384 decision
385 }
386
387 pub fn rejected_count(&self) -> u64 {
389 self.rejected_tarpits.load(Ordering::Relaxed)
390 }
391
392 pub fn available_slots(&self) -> usize {
394 self.delay_semaphore.available_permits()
395 }
396
397 pub fn is_tarpitted(&self, ip: &str) -> bool {
399 self.states
400 .get(ip)
401 .map(|entry| entry.value().delay_level > 1)
402 .unwrap_or(false)
403 }
404
405 pub fn get_state(&self, ip: &str) -> Option<TarpitState> {
407 self.states.get(ip).map(|entry| entry.value().clone())
408 }
409
410 pub fn reset(&self, ip: &str) -> bool {
412 self.states.remove(ip).is_some()
413 }
414
415 pub fn reset_all(&self) -> usize {
417 let count = self.states.len();
418 self.states.clear();
419 count
420 }
421
422 pub fn start_background_tasks(self: Arc<Self>) {
426 let manager = self.clone();
427
428 tokio::spawn(async move {
429 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
431
432 loop {
433 interval.tick().await;
434 manager.decay_all().await;
435 }
436 });
437 }
438
439 pub async fn decay_all(&self) {
443 let now = now_ms();
444 let decay_threshold = self.config.decay_threshold_ms;
445 let cleanup_threshold = self.config.cleanup_threshold_ms;
446
447 let mut actions = Vec::new();
450
451 for entry in self.states.iter() {
453 let state = entry.value();
454 let idle_time = now.saturating_sub(state.last_tarpit_at);
455
456 if idle_time > cleanup_threshold {
457 actions.push((entry.key().clone(), true));
458 } else if idle_time > decay_threshold && state.delay_level > 1 {
459 actions.push((entry.key().clone(), false));
460 }
461 }
462
463 const CHUNK_SIZE: usize = 100;
465 for (i, (ip, remove)) in actions.into_iter().enumerate() {
466 if remove {
467 if self.states.remove(&ip).is_some() {
468 self.total_evicted.fetch_add(1, Ordering::Relaxed);
469 }
470 } else {
471 if let Some(mut entry) = self.states.get_mut(&ip) {
473 let state = entry.value_mut();
474 let idle_time = now.saturating_sub(state.last_tarpit_at);
475 if idle_time > decay_threshold && state.delay_level > 1 {
476 let decay_periods = (idle_time / decay_threshold) as u32;
477 state.delay_level = state.delay_level.saturating_sub(decay_periods).max(1);
478 }
479 }
480 }
481
482 if (i + 1) % CHUNK_SIZE == 0 {
484 tokio::task::yield_now().await;
485 }
486 }
487 }
488
489 pub fn stats(&self) -> TarpitStats {
491 let mut total_hits = 0u64;
492 let mut total_delay_ms = 0u64;
493 let mut active_tarpits = 0usize;
494
495 for entry in self.states.iter() {
496 let state = entry.value();
497 total_hits += state.hit_count;
498 total_delay_ms += state.total_delay_ms;
499 if state.delay_level > 1 {
500 active_tarpits += 1;
501 }
502 }
503
504 TarpitStats {
505 total_states: self.states.len(),
506 active_tarpits,
507 total_hits,
508 total_delay_ms,
509 states_created: self.total_created.load(Ordering::Relaxed),
510 states_evicted: self.total_evicted.load(Ordering::Relaxed),
511 rejected_tarpits: self.rejected_tarpits.load(Ordering::Relaxed),
512 available_slots: self.delay_semaphore.available_permits(),
513 }
514 }
515
516 fn calculate_delay_for_level(&self, level: u32) -> u64 {
520 if level == 0 {
521 return 0;
522 }
523
524 let delay = self.config.base_delay_ms as f64
526 * self.config.progressive_multiplier.powi(level as i32 - 1);
527
528 (delay as u64).min(self.config.max_delay_ms)
529 }
530
531 fn apply_decay(&self, state: &mut TarpitState, now: u64) {
533 let idle_time = now.saturating_sub(state.last_tarpit_at);
534 let decay_threshold = self.config.decay_threshold_ms;
535
536 if idle_time > decay_threshold && state.delay_level > 1 {
538 let decay_periods = (idle_time / decay_threshold) as u32;
539 state.delay_level = state.delay_level.saturating_sub(decay_periods).max(1);
540 }
541 }
542
543 fn maybe_evict(&self) {
545 if self.states.len() < self.config.max_states {
546 return;
547 }
548
549 if let Some(ip) = self.lru.lock().evict_oldest() {
551 if self.states.remove(&ip).is_some() {
552 self.total_evicted.fetch_add(1, Ordering::Relaxed);
553 }
554 }
555 }
556}
557
558#[inline]
560fn now_ms() -> u64 {
561 SystemTime::now()
562 .duration_since(UNIX_EPOCH)
563 .map(|d| d.as_millis() as u64)
564 .unwrap_or(0)
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_tarpit_creation() {
573 let manager = TarpitManager::default();
574 assert!(manager.is_enabled());
575 assert!(manager.is_empty());
576 }
577
578 #[test]
579 fn test_calculate_delay_for_level() {
580 let manager = TarpitManager::default();
581
582 assert_eq!(manager.calculate_delay_for_level(1), 1000);
584
585 assert_eq!(manager.calculate_delay_for_level(2), 1500);
587
588 assert_eq!(manager.calculate_delay_for_level(3), 2250);
590
591 assert_eq!(manager.calculate_delay_for_level(4), 3375);
593
594 assert!(manager.calculate_delay_for_level(20) <= 30000);
596 }
597
598 #[test]
599 fn test_tarpit_progression() {
600 let manager = TarpitManager::default();
601
602 let d1 = manager.tarpit("192.168.1.1");
604 assert_eq!(d1.delay_ms, 1000); assert_eq!(d1.level, 2); assert_eq!(d1.hit_count, 1);
607 assert!(d1.is_tarpitted); let d2 = manager.tarpit("192.168.1.1");
611 assert_eq!(d2.delay_ms, 1500); assert_eq!(d2.level, 3);
613 assert_eq!(d2.hit_count, 2);
614
615 let d3 = manager.tarpit("192.168.1.1");
617 assert_eq!(d3.delay_ms, 2250); assert_eq!(d3.level, 4);
619 assert_eq!(d3.hit_count, 3);
620 }
621
622 #[test]
623 fn test_peek_delay() {
624 let manager = TarpitManager::default();
625
626 let d1 = manager.peek_delay("192.168.1.1");
628 assert_eq!(d1.delay_ms, 1000);
629 assert_eq!(d1.level, 1);
630 assert_eq!(d1.hit_count, 0);
631 assert!(!d1.is_tarpitted);
632
633 manager.tarpit("192.168.1.1");
635
636 let d2 = manager.peek_delay("192.168.1.1");
638 assert_eq!(d2.level, 2);
639 assert_eq!(d2.hit_count, 1);
640
641 let d3 = manager.peek_delay("192.168.1.1");
643 assert_eq!(d3.level, 2);
644 assert_eq!(d3.hit_count, 1);
645 }
646
647 #[test]
648 fn test_max_delay_cap() {
649 let config = TarpitConfig {
650 base_delay_ms: 1000,
651 max_delay_ms: 5000,
652 progressive_multiplier: 2.0,
653 ..Default::default()
654 };
655 let manager = TarpitManager::new(config);
656
657 for _ in 0..20 {
659 manager.tarpit("192.168.1.1");
660 }
661
662 let decision = manager.peek_delay("192.168.1.1");
664 assert!(decision.delay_ms <= 5000);
665 }
666
667 #[test]
668 fn test_reset() {
669 let manager = TarpitManager::default();
670
671 manager.tarpit("192.168.1.1");
672 manager.tarpit("192.168.1.1");
673 assert!(manager.is_tarpitted("192.168.1.1"));
674
675 let removed = manager.reset("192.168.1.1");
676 assert!(removed);
677 assert!(!manager.is_tarpitted("192.168.1.1"));
678
679 assert!(!manager.reset("192.168.1.2"));
681 }
682
683 #[test]
684 fn test_reset_all() {
685 let manager = TarpitManager::default();
686
687 manager.tarpit("192.168.1.1");
688 manager.tarpit("192.168.1.2");
689 manager.tarpit("192.168.1.3");
690 assert_eq!(manager.len(), 3);
691
692 let count = manager.reset_all();
693 assert_eq!(count, 3);
694 assert!(manager.is_empty());
695 }
696
697 #[test]
698 fn test_stats() {
699 let manager = TarpitManager::default();
700
701 manager.tarpit("192.168.1.1");
702 manager.tarpit("192.168.1.1");
703 manager.tarpit("192.168.1.2");
704
705 let stats = manager.stats();
706 assert_eq!(stats.total_states, 2);
707 assert_eq!(stats.total_hits, 3);
708 assert!(stats.total_delay_ms > 0);
709 assert_eq!(stats.active_tarpits, 2); }
711
712 #[test]
713 fn test_disabled() {
714 let config = TarpitConfig {
715 enabled: false,
716 ..Default::default()
717 };
718 let manager = TarpitManager::new(config);
719
720 let decision = manager.tarpit("192.168.1.1");
721 assert_eq!(decision.delay_ms, 0);
722 assert_eq!(decision.level, 0);
723 assert!(!decision.is_tarpitted);
724 }
725
726 #[test]
727 fn test_lru_eviction() {
728 let config = TarpitConfig {
729 max_states: 3,
730 ..Default::default()
731 };
732 let manager = TarpitManager::new(config);
733
734 manager.tarpit("1.1.1.1");
736 std::thread::sleep(std::time::Duration::from_millis(2));
737 manager.tarpit("2.2.2.2");
738 std::thread::sleep(std::time::Duration::from_millis(2));
739 manager.tarpit("3.3.3.3");
740 assert_eq!(manager.len(), 3);
741
742 manager.tarpit("4.4.4.4");
744 assert_eq!(manager.len(), 3);
745
746 assert!(
748 manager.get_state("1.1.1.1").is_none(),
749 "1.1.1.1 should have been evicted as oldest"
750 );
751 assert!(
752 manager.get_state("4.4.4.4").is_some(),
753 "4.4.4.4 should exist"
754 );
755 }
756
757 #[test]
758 fn test_max_level_calculation() {
759 let manager = TarpitManager::default();
762 assert!(manager.max_level >= 8 && manager.max_level <= 12);
763
764 for _ in 0..50 {
766 manager.tarpit("192.168.1.1");
767 }
768 let state = manager.get_state("192.168.1.1").unwrap();
769 assert!(state.delay_level <= manager.max_level);
770 }
771
772 #[tokio::test]
773 async fn test_apply_delay() {
774 let config = TarpitConfig {
775 base_delay_ms: 10, max_delay_ms: 100,
777 ..Default::default()
778 };
779 let manager = TarpitManager::new(config);
780
781 let start = std::time::Instant::now();
782 let decision = manager.apply_delay("192.168.1.1").await;
783 let elapsed = start.elapsed();
784
785 assert!(elapsed.as_millis() >= decision.delay_ms as u128 - 5);
787 }
788}