1use std::cmp::Ordering as CmpOrdering;
6use std::collections::HashSet;
7use std::net::IpAddr;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use dashmap::DashMap;
13use parking_lot::RwLock as PLRwLock;
14use serde::{Deserialize, Serialize};
15use tokio::sync::Notify;
16
17#[derive(Debug, Clone)]
23pub struct ActorConfig {
24 pub max_actors: usize,
27
28 pub decay_interval_secs: u64,
31
32 pub persist_interval_secs: u64,
35
36 pub correlation_threshold: f64,
39
40 pub risk_decay_factor: f64,
43
44 pub max_rule_matches: usize,
47
48 pub max_session_ids: usize,
52
53 pub max_fingerprints_per_actor: usize,
57
58 pub max_fingerprint_mappings: usize,
62
63 pub enabled: bool,
66
67 pub max_risk: f64,
69}
70
71impl Default for ActorConfig {
72 fn default() -> Self {
73 Self {
74 max_actors: 100_000,
75 decay_interval_secs: 900,
76 persist_interval_secs: 300,
77 correlation_threshold: 0.7,
78 risk_decay_factor: 0.9,
79 max_rule_matches: 100,
80 max_session_ids: 50, max_fingerprints_per_actor: 20, max_fingerprint_mappings: 500_000, enabled: true,
84 max_risk: 100.0,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct RuleMatch {
96 pub rule_id: String,
98
99 pub timestamp: u64,
101
102 pub risk_contribution: f64,
104
105 pub category: String,
107}
108
109impl RuleMatch {
110 pub fn new(rule_id: String, risk_contribution: f64, category: String) -> Self {
112 Self {
113 rule_id,
114 timestamp: now_ms(),
115 risk_contribution,
116 category,
117 }
118 }
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct ActorState {
128 pub actor_id: String,
130
131 pub risk_score: f64,
133
134 pub rule_matches: Vec<RuleMatch>,
136
137 pub anomaly_count: u64,
139
140 pub session_ids: Vec<String>,
142
143 pub first_seen: u64,
145
146 pub last_seen: u64,
148
149 #[serde(with = "ip_set_serde")]
151 pub ips: HashSet<IpAddr>,
152
153 pub fingerprints: HashSet<String>,
155
156 pub is_blocked: bool,
158
159 pub block_reason: Option<String>,
161
162 pub blocked_since: Option<u64>,
164}
165
166impl ActorState {
167 pub fn new(actor_id: String) -> Self {
169 let now = now_ms();
170 Self {
171 actor_id,
172 risk_score: 0.0,
173 rule_matches: Vec::new(),
174 anomaly_count: 0,
175 session_ids: Vec::new(),
176 first_seen: now,
177 last_seen: now,
178 ips: HashSet::new(),
179 fingerprints: HashSet::new(),
180 is_blocked: false,
181 block_reason: None,
182 blocked_since: None,
183 }
184 }
185
186 pub fn touch(&mut self) {
188 self.last_seen = now_ms();
189 }
190
191 pub fn add_ip(&mut self, ip: IpAddr) {
193 self.ips.insert(ip);
194 self.touch();
195 }
196
197 pub fn add_fingerprint(&mut self, fingerprint: String, max_fingerprints: usize) -> bool {
201 if fingerprint.is_empty() {
202 return false;
203 }
204
205 if self.fingerprints.contains(&fingerprint) {
207 self.touch();
208 return true;
209 }
210
211 if self.fingerprints.len() >= max_fingerprints {
213 self.touch();
215 return false;
216 }
217
218 self.fingerprints.insert(fingerprint);
219 self.touch();
220 true
221 }
222
223 pub fn add_rule_match(&mut self, rule_match: RuleMatch, max_matches: usize) {
225 self.rule_matches.push(rule_match);
226 self.touch();
227
228 if self.rule_matches.len() > max_matches {
230 let excess = self.rule_matches.len() - max_matches;
231 self.rule_matches.drain(0..excess);
232 }
233 }
234
235 pub fn get_rule_match_count(&self, rule_id: &str) -> usize {
237 self.rule_matches
238 .iter()
239 .filter(|m| m.rule_id == rule_id)
240 .count()
241 }
242}
243
244mod ip_set_serde {
246 use serde::{Deserialize, Deserializer, Serialize, Serializer};
247 use std::collections::HashSet;
248 use std::net::IpAddr;
249
250 pub fn serialize<S>(set: &HashSet<IpAddr>, serializer: S) -> Result<S::Ok, S::Error>
251 where
252 S: Serializer,
253 {
254 let strings: Vec<String> = set.iter().map(|ip| ip.to_string()).collect();
255 strings.serialize(serializer)
256 }
257
258 pub fn deserialize<'de, D>(deserializer: D) -> Result<HashSet<IpAddr>, D::Error>
259 where
260 D: Deserializer<'de>,
261 {
262 let strings: Vec<String> = Vec::deserialize(deserializer)?;
263 let mut set = HashSet::new();
264 for s in strings {
265 if let Ok(ip) = s.parse() {
266 set.insert(ip);
267 }
268 }
269 Ok(set)
270 }
271}
272
273#[derive(Debug, Default)]
279pub struct ActorStats {
280 pub total_actors: AtomicU64,
282
283 pub blocked_actors: AtomicU64,
285
286 pub correlations_made: AtomicU64,
288
289 pub evictions: AtomicU64,
291
292 pub total_created: AtomicU64,
294
295 pub total_rule_matches: AtomicU64,
297
298 pub fingerprint_evictions: AtomicU64,
300}
301
302impl ActorStats {
303 pub fn new() -> Self {
305 Self::default()
306 }
307
308 pub fn snapshot(&self) -> ActorStatsSnapshot {
310 ActorStatsSnapshot {
311 total_actors: self.total_actors.load(Ordering::Relaxed),
312 blocked_actors: self.blocked_actors.load(Ordering::Relaxed),
313 correlations_made: self.correlations_made.load(Ordering::Relaxed),
314 evictions: self.evictions.load(Ordering::Relaxed),
315 total_created: self.total_created.load(Ordering::Relaxed),
316 total_rule_matches: self.total_rule_matches.load(Ordering::Relaxed),
317 fingerprint_evictions: self.fingerprint_evictions.load(Ordering::Relaxed),
318 }
319 }
320}
321
322#[derive(Debug, Clone, Serialize)]
324pub struct ActorStatsSnapshot {
325 pub total_actors: u64,
326 pub blocked_actors: u64,
327 pub correlations_made: u64,
328 pub evictions: u64,
329 pub total_created: u64,
330 pub total_rule_matches: u64,
331 pub fingerprint_evictions: u64,
332}
333
334type FingerprintClusterCache = Option<(Instant, Vec<(String, Vec<String>, f64)>)>;
342
343#[derive(Debug)]
345pub struct ActorManager {
346 actors: DashMap<String, ActorState>,
348
349 ip_to_actor: DashMap<IpAddr, String>,
351
352 fingerprint_to_actor: DashMap<String, String>,
354
355 config: ActorConfig,
357
358 stats: Arc<ActorStats>,
360
361 shutdown: Arc<Notify>,
363
364 touch_counter: AtomicU32,
366
367 fingerprint_groups_cache: PLRwLock<FingerprintClusterCache>,
369}
370
371impl ActorManager {
372 pub fn new(config: ActorConfig) -> Self {
374 Self {
375 actors: DashMap::with_capacity(config.max_actors),
376 ip_to_actor: DashMap::with_capacity(config.max_actors),
377 fingerprint_to_actor: DashMap::with_capacity(config.max_actors * 2),
378 config,
379 stats: Arc::new(ActorStats::new()),
380 shutdown: Arc::new(Notify::new()),
381 touch_counter: AtomicU32::new(0),
382 fingerprint_groups_cache: PLRwLock::new(None),
383 }
384 }
385
386 pub fn config(&self) -> &ActorConfig {
388 &self.config
389 }
390
391 pub fn is_enabled(&self) -> bool {
393 self.config.enabled
394 }
395
396 pub fn len(&self) -> usize {
398 self.actors.len()
399 }
400
401 pub fn is_empty(&self) -> bool {
403 self.actors.is_empty()
404 }
405
406 pub fn get_or_create_actor(&self, ip: IpAddr, fingerprint: Option<&str>) -> String {
417 if !self.config.enabled {
418 return generate_actor_id();
419 }
420
421 self.maybe_evict();
423
424 if let Some(actor_id) = self.correlate_actor(ip, fingerprint) {
426 if let Some(mut entry) = self.actors.get_mut(&actor_id) {
428 entry.add_ip(ip);
429 if let Some(fp) = fingerprint {
430 if !fp.is_empty() {
431 if entry
433 .add_fingerprint(fp.to_string(), self.config.max_fingerprints_per_actor)
434 {
435 self.maybe_evict_fingerprint_mappings();
437 self.fingerprint_to_actor
438 .insert(fp.to_string(), actor_id.clone());
439 }
440 }
441 }
442 self.ip_to_actor.insert(ip, actor_id.clone());
444 }
445 return actor_id;
446 }
447
448 let actor_id = generate_actor_id();
450 let mut actor = ActorState::new(actor_id.clone());
451 actor.add_ip(ip);
452
453 if let Some(fp) = fingerprint {
454 if !fp.is_empty() {
455 if actor.add_fingerprint(fp.to_string(), self.config.max_fingerprints_per_actor) {
457 self.maybe_evict_fingerprint_mappings();
459 self.fingerprint_to_actor
460 .insert(fp.to_string(), actor_id.clone());
461 }
462 }
463 }
464
465 self.ip_to_actor.insert(ip, actor_id.clone());
467 self.actors.insert(actor_id.clone(), actor);
468
469 self.stats.total_actors.fetch_add(1, Ordering::Relaxed);
471 self.stats.total_created.fetch_add(1, Ordering::Relaxed);
472
473 actor_id
474 }
475
476 pub fn record_rule_match(
484 &self,
485 actor_id: &str,
486 rule_id: &str,
487 risk_contribution: f64,
488 category: &str,
489 ) {
490 if !self.config.enabled {
491 return;
492 }
493
494 if let Some(mut entry) = self.actors.get_mut(actor_id) {
495 let rule_match =
496 RuleMatch::new(rule_id.to_string(), risk_contribution, category.to_string());
497
498 entry.risk_score = (entry.risk_score + risk_contribution).min(self.config.max_risk);
500
501 entry.add_rule_match(rule_match, self.config.max_rule_matches);
503
504 self.stats
506 .total_rule_matches
507 .fetch_add(1, Ordering::Relaxed);
508 }
509 }
510
511 pub fn touch_actor(&self, actor_id: &str) {
513 if !self.config.enabled {
514 return;
515 }
516
517 if let Some(mut entry) = self.actors.get_mut(actor_id) {
518 entry.touch();
519 }
520 }
521
522 pub fn get_actor(&self, actor_id: &str) -> Option<ActorState> {
524 self.actors.get(actor_id).map(|entry| entry.value().clone())
525 }
526
527 pub fn get_actor_by_ip(&self, ip: IpAddr) -> Option<ActorState> {
529 self.ip_to_actor
530 .get(&ip)
531 .and_then(|actor_id| self.actors.get(actor_id.value()).map(|e| e.value().clone()))
532 }
533
534 pub fn get_actor_by_fingerprint(&self, fingerprint: &str) -> Option<ActorState> {
536 self.fingerprint_to_actor
537 .get(fingerprint)
538 .and_then(|actor_id| self.actors.get(actor_id.value()).map(|e| e.value().clone()))
539 }
540
541 pub fn block_actor(&self, actor_id: &str, reason: &str) -> bool {
546 if let Some(mut entry) = self.actors.get_mut(actor_id) {
547 if !entry.is_blocked {
548 entry.is_blocked = true;
549 entry.block_reason = Some(reason.to_string());
550 entry.blocked_since = Some(now_ms());
551 self.stats.blocked_actors.fetch_add(1, Ordering::Relaxed);
552 }
553 true
554 } else {
555 false
556 }
557 }
558
559 pub fn unblock_actor(&self, actor_id: &str) -> bool {
564 if let Some(mut entry) = self.actors.get_mut(actor_id) {
565 if entry.is_blocked {
566 entry.is_blocked = false;
567 entry.block_reason = None;
568 entry.blocked_since = None;
569 self.stats.blocked_actors.fetch_sub(1, Ordering::Relaxed);
570 }
571 true
572 } else {
573 false
574 }
575 }
576
577 pub fn is_blocked(&self, actor_id: &str) -> bool {
579 self.actors
580 .get(actor_id)
581 .map(|entry| entry.is_blocked)
582 .unwrap_or(false)
583 }
584
585 pub fn bind_session(&self, actor_id: &str, session_id: &str) {
588 if let Some(mut entry) = self.actors.get_mut(actor_id) {
589 if !entry.session_ids.contains(&session_id.to_string()) {
590 if entry.session_ids.len() >= self.config.max_session_ids {
592 entry.session_ids.remove(0);
594 }
595 entry.session_ids.push(session_id.to_string());
596 entry.touch();
597 }
598 }
599 }
600
601 pub fn list_actors(&self, limit: usize, offset: usize) -> Vec<ActorState> {
610 let mut actors: Vec<ActorState> = self
611 .actors
612 .iter()
613 .map(|entry| entry.value().clone())
614 .collect();
615
616 actors.sort_by_key(|a| std::cmp::Reverse(a.last_seen));
618
619 actors.into_iter().skip(offset).take(limit).collect()
621 }
622
623 pub fn list_by_min_risk(&self, min_risk: f64, limit: usize, offset: usize) -> Vec<ActorState> {
627 let mut actors: Vec<ActorState> = self
628 .actors
629 .iter()
630 .filter(|entry| entry.value().risk_score >= min_risk)
631 .map(|entry| entry.value().clone())
632 .collect();
633
634 actors.sort_by(|a, b| {
635 b.risk_score
636 .partial_cmp(&a.risk_score)
637 .unwrap_or(CmpOrdering::Equal)
638 .then_with(|| b.last_seen.cmp(&a.last_seen))
639 });
640
641 actors.into_iter().skip(offset).take(limit).collect()
642 }
643
644 pub fn list_blocked_actors(&self) -> Vec<ActorState> {
646 self.actors
647 .iter()
648 .filter(|entry| entry.is_blocked)
649 .map(|entry| entry.value().clone())
650 .collect()
651 }
652
653 pub fn get_fingerprint_groups(&self, limit: usize) -> Vec<(String, Vec<String>, f64)> {
658 {
660 let cache = self.fingerprint_groups_cache.read();
661 if let Some((timestamp, data)) = &*cache {
662 if timestamp.elapsed() < Duration::from_secs(1) {
663 let mut result = data.clone();
664 result.truncate(limit);
665 return result;
666 }
667 }
668 }
669
670 use std::collections::HashMap;
671 let mut groups: HashMap<String, (Vec<String>, f64)> = HashMap::new();
672
673 for entry in self.actors.iter() {
674 let actor = entry.value();
675 for fp in &actor.fingerprints {
676 let group = groups
677 .entry(fp.clone())
678 .or_insert_with(|| (Vec::new(), 0.0));
679 group.0.push(actor.actor_id.clone());
680 group.1 = group.1.max(actor.risk_score);
681 }
682 }
683
684 let mut sorted_groups: Vec<_> = groups
685 .into_iter()
686 .map(|(fp, (actors, risk))| (fp, actors, risk))
687 .collect();
688
689 sorted_groups.sort_by_key(|a| std::cmp::Reverse(a.1.len()));
691
692 {
694 let mut cache = self.fingerprint_groups_cache.write();
695 *cache = Some((Instant::now(), sorted_groups.clone()));
696 }
697
698 sorted_groups.truncate(limit);
699 sorted_groups
700 }
701
702 pub fn start_background_tasks(self: Arc<Self>) {
708 let manager = self;
709 let decay_interval = Duration::from_secs(manager.config.decay_interval_secs);
710
711 tokio::spawn(async move {
712 let mut interval = tokio::time::interval(decay_interval);
713
714 loop {
715 tokio::select! {
716 _ = interval.tick() => {
717 if Arc::strong_count(&manager.shutdown) == 1 {
719 break;
721 }
722
723 manager.decay_scores();
725
726 manager.evict_if_needed();
728 }
729 _ = manager.shutdown.notified() => {
730 log::info!("Actor manager background tasks shutting down");
731 break;
732 }
733 }
734 }
735 });
736 }
737
738 pub fn shutdown(&self) {
740 self.shutdown.notify_one();
741 }
742
743 pub fn stats(&self) -> &ActorStats {
745 &self.stats
746 }
747
748 pub fn clear(&self) {
750 self.actors.clear();
751 self.ip_to_actor.clear();
752 self.fingerprint_to_actor.clear();
753 self.stats.total_actors.store(0, Ordering::Relaxed);
754 self.stats.blocked_actors.store(0, Ordering::Relaxed);
755 }
756
757 pub fn snapshot(&self) -> Vec<ActorState> {
761 self.actors.iter().map(|e| e.value().clone()).collect()
762 }
763
764 pub fn restore(&self, actors: Vec<ActorState>) {
768 self.clear();
770
771 let mut blocked_count: u64 = 0;
772
773 for actor in actors {
775 let actor_id = actor.actor_id.clone();
776
777 for ip in &actor.ips {
779 self.ip_to_actor.insert(*ip, actor_id.clone());
780 }
781
782 for fp in &actor.fingerprints {
784 self.fingerprint_to_actor
785 .insert(fp.clone(), actor_id.clone());
786 }
787
788 if actor.is_blocked {
790 blocked_count += 1;
791 }
792
793 self.actors.insert(actor_id, actor);
795 }
796
797 let actor_count = self.actors.len() as u64;
799 self.stats
800 .total_actors
801 .store(actor_count, Ordering::Relaxed);
802 self.stats
803 .blocked_actors
804 .store(blocked_count, Ordering::Relaxed);
805 self.stats
806 .total_created
807 .store(actor_count, Ordering::Relaxed);
808 }
809
810 fn decay_scores(&self) {
816 let decay_factor = self.config.risk_decay_factor;
817
818 for mut entry in self.actors.iter_mut() {
819 let actor = entry.value_mut();
820 if actor.risk_score > 0.0 {
821 actor.risk_score *= decay_factor;
822
823 if actor.risk_score < 0.01 {
825 actor.risk_score = 0.0;
826 }
827 }
828 }
829 }
830
831 fn evict_if_needed(&self) {
833 let current_len = self.actors.len();
834 if current_len <= self.config.max_actors {
835 return;
836 }
837
838 let evict_count = (self.config.max_actors / 100).max(1);
840 self.evict_oldest(evict_count);
841 }
842
843 fn maybe_evict(&self) {
847 let count = self.touch_counter.fetch_add(1, Ordering::Relaxed);
848 if !count.is_multiple_of(100) {
849 return;
850 }
851
852 if self.actors.len() < self.config.max_actors {
853 return;
854 }
855
856 let evict_count = (self.config.max_actors / 100).max(1);
858 self.evict_oldest(evict_count);
859 }
860
861 fn maybe_evict_fingerprint_mappings(&self) {
867 let current_len = self.fingerprint_to_actor.len();
868 if current_len < self.config.max_fingerprint_mappings {
869 return;
870 }
871
872 let target_len = (self.config.max_fingerprint_mappings * 9) / 10;
874 let to_evict = current_len.saturating_sub(target_len);
875
876 if to_evict == 0 {
877 return;
878 }
879
880 let keys_to_evict: Vec<String> = self
882 .fingerprint_to_actor
883 .iter()
884 .take(to_evict)
885 .map(|entry| entry.key().clone())
886 .collect();
887
888 for key in keys_to_evict {
890 self.fingerprint_to_actor.remove(&key);
891 }
892
893 self.stats
894 .fingerprint_evictions
895 .fetch_add(to_evict as u64, Ordering::Relaxed);
896 }
897
898 fn evict_oldest(&self, count: usize) {
902 let sample_size = (count * 10).min(1000).min(self.actors.len());
903
904 if sample_size == 0 {
905 return;
906 }
907
908 let mut candidates: Vec<(String, u64)> = Vec::with_capacity(sample_size);
910 for entry in self.actors.iter().take(sample_size) {
911 candidates.push((entry.key().clone(), entry.value().last_seen));
912 }
913
914 candidates.sort_unstable_by_key(|(_, ts)| *ts);
916
917 for (actor_id, _) in candidates.into_iter().take(count) {
919 self.remove_actor(&actor_id);
920 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
921 }
922 }
923
924 fn remove_actor(&self, actor_id: &str) {
926 if let Some((_, actor)) = self.actors.remove(actor_id) {
927 for ip in &actor.ips {
929 self.ip_to_actor.remove(ip);
930 }
931
932 for fp in &actor.fingerprints {
934 self.fingerprint_to_actor.remove(fp);
935 }
936
937 self.stats.total_actors.fetch_sub(1, Ordering::Relaxed);
939 if actor.is_blocked {
940 self.stats.blocked_actors.fetch_sub(1, Ordering::Relaxed);
941 }
942 }
943 }
944
945 fn correlate_actor(&self, ip: IpAddr, fingerprint: Option<&str>) -> Option<String> {
950 let ip_actor = self.ip_to_actor.get(&ip).map(|r| r.value().clone());
951
952 let fp_actor = fingerprint.and_then(|fp| {
953 if fp.is_empty() {
954 None
955 } else {
956 self.fingerprint_to_actor.get(fp).map(|r| r.value().clone())
957 }
958 });
959
960 match (ip_actor, fp_actor) {
961 (Some(ip_id), Some(fp_id)) => {
962 if ip_id == fp_id {
964 Some(ip_id)
965 } else {
966 self.stats.correlations_made.fetch_add(1, Ordering::Relaxed);
968 Some(fp_id)
969 }
970 }
971 (Some(id), None) => {
972 self.stats.correlations_made.fetch_add(1, Ordering::Relaxed);
973 Some(id)
974 }
975 (None, Some(id)) => {
976 self.stats.correlations_made.fetch_add(1, Ordering::Relaxed);
977 Some(id)
978 }
979 (None, None) => None,
980 }
981 }
982}
983
984impl Default for ActorManager {
985 fn default() -> Self {
986 Self::new(ActorConfig::default())
987 }
988}
989
990fn generate_actor_id() -> String {
996 let mut bytes = [0u8; 16];
998 getrandom::getrandom(&mut bytes).expect("Failed to get random bytes");
999
1000 bytes[6] = (bytes[6] & 0x0F) | 0x40; bytes[8] = (bytes[8] & 0x3F) | 0x80; format!(
1005 "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
1006 u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
1007 u16::from_be_bytes([bytes[4], bytes[5]]),
1008 u16::from_be_bytes([bytes[6], bytes[7]]),
1009 u16::from_be_bytes([bytes[8], bytes[9]]),
1010 u64::from_be_bytes([
1011 0, 0, bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]
1012 ])
1013 )
1014}
1015
1016#[inline]
1018fn now_ms() -> u64 {
1019 SystemTime::now()
1020 .duration_since(UNIX_EPOCH)
1021 .map(|d| d.as_millis() as u64)
1022 .unwrap_or(0)
1023}
1024
1025#[cfg(test)]
1030mod tests {
1031 use super::*;
1032 use std::thread;
1033
1034 fn create_test_manager() -> ActorManager {
1039 ActorManager::new(ActorConfig {
1040 max_actors: 1000,
1041 ..Default::default()
1042 })
1043 }
1044
1045 fn create_test_ip(last_octet: u8) -> IpAddr {
1046 format!("192.168.1.{}", last_octet).parse().unwrap()
1047 }
1048
1049 #[test]
1054 fn test_actor_creation() {
1055 let manager = create_test_manager();
1056 let ip = create_test_ip(1);
1057
1058 let actor_id = manager.get_or_create_actor(ip, None);
1059
1060 assert!(!actor_id.is_empty());
1061 assert_eq!(manager.len(), 1);
1062
1063 let actor = manager.get_actor(&actor_id).unwrap();
1064 assert_eq!(actor.actor_id, actor_id);
1065 assert!(actor.ips.contains(&ip));
1066 assert!(!actor.is_blocked);
1067 }
1068
1069 #[test]
1070 fn test_actor_retrieval_by_ip() {
1071 let manager = create_test_manager();
1072 let ip = create_test_ip(1);
1073
1074 let actor_id = manager.get_or_create_actor(ip, None);
1075 let retrieved = manager.get_actor_by_ip(ip).unwrap();
1076
1077 assert_eq!(retrieved.actor_id, actor_id);
1078 }
1079
1080 #[test]
1081 fn test_actor_retrieval_by_fingerprint() {
1082 let manager = create_test_manager();
1083 let ip = create_test_ip(1);
1084 let fingerprint = "t13d1516h2_abc123";
1085
1086 let actor_id = manager.get_or_create_actor(ip, Some(fingerprint));
1087 let retrieved = manager.get_actor_by_fingerprint(fingerprint).unwrap();
1088
1089 assert_eq!(retrieved.actor_id, actor_id);
1090 }
1091
1092 #[test]
1093 fn test_actor_nonexistent() {
1094 let manager = create_test_manager();
1095
1096 assert!(manager.get_actor("nonexistent").is_none());
1097 assert!(manager.get_actor_by_ip(create_test_ip(99)).is_none());
1098 assert!(manager.get_actor_by_fingerprint("nonexistent").is_none());
1099 }
1100
1101 #[test]
1106 fn test_ip_correlation() {
1107 let manager = create_test_manager();
1108 let ip = create_test_ip(1);
1109
1110 let actor_id1 = manager.get_or_create_actor(ip, None);
1112
1113 let actor_id2 = manager.get_or_create_actor(ip, None);
1115
1116 assert_eq!(actor_id1, actor_id2);
1117 assert_eq!(manager.len(), 1);
1118 }
1119
1120 #[test]
1121 fn test_fingerprint_correlation() {
1122 let manager = create_test_manager();
1123 let ip1 = create_test_ip(1);
1124 let ip2 = create_test_ip(2);
1125 let fingerprint = "t13d1516h2_shared";
1126
1127 let actor_id1 = manager.get_or_create_actor(ip1, Some(fingerprint));
1129
1130 let actor_id2 = manager.get_or_create_actor(ip2, Some(fingerprint));
1132
1133 assert_eq!(actor_id1, actor_id2);
1134 assert_eq!(manager.len(), 1);
1135
1136 let actor = manager.get_actor(&actor_id1).unwrap();
1138 assert!(actor.ips.contains(&ip1));
1139 assert!(actor.ips.contains(&ip2));
1140 }
1141
1142 #[test]
1143 fn test_fingerprint_preferred_over_ip() {
1144 let manager = create_test_manager();
1145 let ip1 = create_test_ip(1);
1146 let ip2 = create_test_ip(2);
1147 let fp1 = "fingerprint_1";
1148 let fp2 = "fingerprint_2";
1149
1150 let actor_id1 = manager.get_or_create_actor(ip1, Some(fp1));
1152
1153 let actor_id2 = manager.get_or_create_actor(ip2, Some(fp2));
1155
1156 assert_ne!(actor_id1, actor_id2);
1157
1158 let actor_id3 = manager.get_or_create_actor(ip1, Some(fp2));
1160
1161 assert_eq!(actor_id3, actor_id2);
1162 }
1163
1164 #[test]
1169 fn test_record_rule_match() {
1170 let manager = create_test_manager();
1171 let ip = create_test_ip(1);
1172
1173 let actor_id = manager.get_or_create_actor(ip, None);
1174 manager.record_rule_match(&actor_id, "sqli-001", 25.0, "sqli");
1175
1176 let actor = manager.get_actor(&actor_id).unwrap();
1177 assert_eq!(actor.rule_matches.len(), 1);
1178 assert_eq!(actor.rule_matches[0].rule_id, "sqli-001");
1179 assert_eq!(actor.rule_matches[0].risk_contribution, 25.0);
1180 assert_eq!(actor.rule_matches[0].category, "sqli");
1181 assert_eq!(actor.risk_score, 25.0);
1182 }
1183
1184 #[test]
1185 fn test_rule_match_risk_accumulation() {
1186 let manager = create_test_manager();
1187 let ip = create_test_ip(1);
1188
1189 let actor_id = manager.get_or_create_actor(ip, None);
1190 manager.record_rule_match(&actor_id, "sqli-001", 25.0, "sqli");
1191 manager.record_rule_match(&actor_id, "xss-001", 20.0, "xss");
1192 manager.record_rule_match(&actor_id, "sqli-002", 30.0, "sqli");
1193
1194 let actor = manager.get_actor(&actor_id).unwrap();
1195 assert_eq!(actor.rule_matches.len(), 3);
1196 assert_eq!(actor.risk_score, 75.0);
1197 }
1198
1199 #[test]
1200 fn test_rule_match_risk_capped() {
1201 let manager = create_test_manager();
1202 let ip = create_test_ip(1);
1203
1204 let actor_id = manager.get_or_create_actor(ip, None);
1205
1206 for _ in 0..15 {
1208 manager.record_rule_match(&actor_id, "sqli-001", 10.0, "sqli");
1209 }
1210
1211 let actor = manager.get_actor(&actor_id).unwrap();
1212 assert!(actor.risk_score <= 100.0);
1213 }
1214
1215 #[test]
1216 fn test_rule_match_history_limit() {
1217 let config = ActorConfig {
1218 max_rule_matches: 5,
1219 ..Default::default()
1220 };
1221 let manager = ActorManager::new(config);
1222 let ip = create_test_ip(1);
1223
1224 let actor_id = manager.get_or_create_actor(ip, None);
1225
1226 for i in 0..10 {
1228 manager.record_rule_match(&actor_id, &format!("rule-{}", i), 5.0, "test");
1229 }
1230
1231 let actor = manager.get_actor(&actor_id).unwrap();
1232 assert_eq!(actor.rule_matches.len(), 5);
1233
1234 assert_eq!(actor.rule_matches[0].rule_id, "rule-5");
1236 assert_eq!(actor.rule_matches[4].rule_id, "rule-9");
1237 }
1238
1239 #[test]
1244 fn test_block_actor() {
1245 let manager = create_test_manager();
1246 let ip = create_test_ip(1);
1247
1248 let actor_id = manager.get_or_create_actor(ip, None);
1249
1250 assert!(!manager.is_blocked(&actor_id));
1251
1252 let result = manager.block_actor(&actor_id, "High risk score");
1253
1254 assert!(result);
1255 assert!(manager.is_blocked(&actor_id));
1256
1257 let actor = manager.get_actor(&actor_id).unwrap();
1258 assert!(actor.is_blocked);
1259 assert_eq!(actor.block_reason, Some("High risk score".to_string()));
1260 assert!(actor.blocked_since.is_some());
1261 }
1262
1263 #[test]
1264 fn test_unblock_actor() {
1265 let manager = create_test_manager();
1266 let ip = create_test_ip(1);
1267
1268 let actor_id = manager.get_or_create_actor(ip, None);
1269 manager.block_actor(&actor_id, "Test");
1270
1271 assert!(manager.is_blocked(&actor_id));
1272
1273 let result = manager.unblock_actor(&actor_id);
1274
1275 assert!(result);
1276 assert!(!manager.is_blocked(&actor_id));
1277
1278 let actor = manager.get_actor(&actor_id).unwrap();
1279 assert!(!actor.is_blocked);
1280 assert!(actor.block_reason.is_none());
1281 assert!(actor.blocked_since.is_none());
1282 }
1283
1284 #[test]
1285 fn test_block_nonexistent() {
1286 let manager = create_test_manager();
1287
1288 assert!(!manager.block_actor("nonexistent", "Test"));
1289 assert!(!manager.unblock_actor("nonexistent"));
1290 assert!(!manager.is_blocked("nonexistent"));
1291 }
1292
1293 #[test]
1298 fn test_lru_eviction() {
1299 let config = ActorConfig {
1300 max_actors: 100,
1301 ..Default::default()
1302 };
1303 let manager = ActorManager::new(config);
1304
1305 for i in 0..150 {
1308 let ip = format!("10.0.{}.{}", i / 256, i % 256).parse().unwrap();
1309 manager.get_or_create_actor(ip, None);
1310 }
1311
1312 assert!(manager.len() <= 150);
1315
1316 for i in 0..200 {
1318 let ip = format!("10.1.{}.{}", i / 256, i % 256).parse().unwrap();
1319 manager.get_or_create_actor(ip, None);
1320 }
1321
1322 let final_len = manager.len();
1325 let evictions = manager.stats().evictions.load(Ordering::Relaxed);
1326
1327 assert!(evictions > 0, "Expected evictions to occur, got 0");
1329
1330 let created = manager.stats().total_created.load(Ordering::Relaxed);
1332 assert!(
1333 created > final_len as u64,
1334 "Expected some actors to be evicted"
1335 );
1336
1337 println!(
1338 "LRU eviction test: created={}, evicted={}, final_len={}",
1339 created, evictions, final_len
1340 );
1341 }
1342
1343 #[test]
1344 fn test_eviction_removes_mappings() {
1345 let config = ActorConfig {
1346 max_actors: 10,
1347 ..Default::default()
1348 };
1349 let manager = ActorManager::new(config);
1350
1351 let first_ip = create_test_ip(1);
1353 let first_fingerprint = "first_fp";
1354 let first_actor_id = manager.get_or_create_actor(first_ip, Some(first_fingerprint));
1355
1356 std::thread::sleep(std::time::Duration::from_millis(10));
1358
1359 for i in 10..200 {
1361 let ip = format!("10.0.{}.{}", i / 256, i % 256).parse().unwrap();
1362 manager.get_or_create_actor(ip, Some(&format!("fp_{}", i)));
1363 }
1364
1365 if manager.get_actor(&first_actor_id).is_none() {
1367 assert!(manager.ip_to_actor.get(&first_ip).is_none());
1368 assert!(manager
1369 .fingerprint_to_actor
1370 .get(first_fingerprint)
1371 .is_none());
1372 }
1373 }
1374
1375 #[test]
1380 fn test_decay_scores() {
1381 let config = ActorConfig {
1382 risk_decay_factor: 0.5,
1383 ..Default::default()
1384 };
1385 let manager = ActorManager::new(config);
1386 let ip = create_test_ip(1);
1387
1388 let actor_id = manager.get_or_create_actor(ip, None);
1389 manager.record_rule_match(&actor_id, "test", 100.0, "test");
1390
1391 let actor = manager.get_actor(&actor_id).unwrap();
1393 assert_eq!(actor.risk_score, 100.0);
1394
1395 manager.decay_scores();
1397
1398 let actor = manager.get_actor(&actor_id).unwrap();
1400 assert_eq!(actor.risk_score, 50.0);
1401
1402 manager.decay_scores();
1404
1405 let actor = manager.get_actor(&actor_id).unwrap();
1406 assert_eq!(actor.risk_score, 25.0);
1407 }
1408
1409 #[test]
1410 fn test_decay_floors_to_zero() {
1411 let config = ActorConfig {
1412 risk_decay_factor: 0.001,
1413 ..Default::default()
1414 };
1415 let manager = ActorManager::new(config);
1416 let ip = create_test_ip(1);
1417
1418 let actor_id = manager.get_or_create_actor(ip, None);
1419 manager.record_rule_match(&actor_id, "test", 1.0, "test");
1420
1421 for _ in 0..5 {
1423 manager.decay_scores();
1424 }
1425
1426 let actor = manager.get_actor(&actor_id).unwrap();
1428 assert_eq!(actor.risk_score, 0.0);
1429 }
1430
1431 #[test]
1436 fn test_bind_session() {
1437 let manager = create_test_manager();
1438 let ip = create_test_ip(1);
1439
1440 let actor_id = manager.get_or_create_actor(ip, None);
1441 manager.bind_session(&actor_id, "session-123");
1442 manager.bind_session(&actor_id, "session-456");
1443 manager.bind_session(&actor_id, "session-123"); let actor = manager.get_actor(&actor_id).unwrap();
1446 assert_eq!(actor.session_ids.len(), 2);
1447 assert!(actor.session_ids.contains(&"session-123".to_string()));
1448 assert!(actor.session_ids.contains(&"session-456".to_string()));
1449 }
1450
1451 #[test]
1456 fn test_list_actors() {
1457 let manager = create_test_manager();
1458
1459 for i in 0..10 {
1461 let ip = create_test_ip(i);
1462 manager.get_or_create_actor(ip, None);
1463 std::thread::sleep(std::time::Duration::from_millis(1));
1464 }
1465
1466 let first_page = manager.list_actors(5, 0);
1468 assert_eq!(first_page.len(), 5);
1469
1470 let second_page = manager.list_actors(5, 5);
1471 assert_eq!(second_page.len(), 5);
1472
1473 for window in first_page.windows(2) {
1475 assert!(window[0].last_seen >= window[1].last_seen);
1476 }
1477 }
1478
1479 #[test]
1480 fn test_list_blocked_actors() {
1481 let manager = create_test_manager();
1482
1483 for i in 0..10 {
1485 let ip = create_test_ip(i);
1486 let actor_id = manager.get_or_create_actor(ip, None);
1487 if i % 2 == 0 {
1488 manager.block_actor(&actor_id, "Test");
1489 }
1490 }
1491
1492 let blocked = manager.list_blocked_actors();
1493 assert_eq!(blocked.len(), 5);
1494
1495 for actor in blocked {
1496 assert!(actor.is_blocked);
1497 }
1498 }
1499
1500 #[test]
1505 fn test_concurrent_access() {
1506 let manager = Arc::new(create_test_manager());
1507 let mut handles = vec![];
1508
1509 for thread_id in 0..10 {
1511 let manager = Arc::clone(&manager);
1512 handles.push(thread::spawn(move || {
1513 for i in 0..100 {
1514 let ip: IpAddr = format!("10.{}.0.{}", thread_id, i % 256).parse().unwrap();
1515 let fingerprint = format!("fp_t{}_{}", thread_id, i % 5);
1516 let actor_id = manager.get_or_create_actor(ip, Some(&fingerprint));
1517 manager.record_rule_match(&actor_id, "test", 1.0, "test");
1518 }
1519 }));
1520 }
1521
1522 for handle in handles {
1523 handle.join().unwrap();
1524 }
1525
1526 assert!(manager.len() > 0);
1528 assert!(manager.stats().total_created.load(Ordering::Relaxed) > 0);
1529 }
1530
1531 #[test]
1532 fn test_stress_concurrent_updates() {
1533 let manager = Arc::new(ActorManager::new(ActorConfig {
1534 max_actors: 10_000,
1535 max_fingerprint_mappings: 50_000,
1536 ..Default::default()
1537 }));
1538 let mut handles = vec![];
1539
1540 for thread_id in 0..16 {
1542 let manager = Arc::clone(&manager);
1543 handles.push(thread::spawn(move || {
1544 for i in 0..500 {
1545 let ip: IpAddr = format!("10.{}.{}.{}", thread_id, i / 256, i % 256)
1546 .parse()
1547 .unwrap();
1548 let fingerprint = format!("fp_t{}_{}", thread_id, i % 20);
1549 let actor_id = manager.get_or_create_actor(ip, Some(&fingerprint));
1550
1551 manager.record_rule_match(&actor_id, "stress", 0.5, "stress");
1552
1553 if i % 5 == 0 {
1554 manager.touch_actor(&actor_id);
1555 }
1556 if i % 200 == 0 {
1557 manager.block_actor(&actor_id, "stress");
1558 }
1559 }
1560 }));
1561 }
1562
1563 for handle in handles {
1564 handle.join().unwrap();
1565 }
1566
1567 let stats = manager.stats();
1568 assert!(manager.len() > 0);
1569 assert!(stats.total_created.load(Ordering::Relaxed) > 0);
1570 assert!(stats.total_rule_matches.load(Ordering::Relaxed) > 0);
1571 }
1572
1573 #[test]
1578 fn test_stats() {
1579 let manager = create_test_manager();
1580
1581 let stats = manager.stats().snapshot();
1583 assert_eq!(stats.total_actors, 0);
1584 assert_eq!(stats.blocked_actors, 0);
1585 assert_eq!(stats.total_created, 0);
1586
1587 for i in 0..5 {
1589 let ip = create_test_ip(i);
1590 let actor_id = manager.get_or_create_actor(ip, None);
1591 manager.record_rule_match(&actor_id, "test", 10.0, "test");
1592 }
1593
1594 let actor = manager.list_actors(1, 0)[0].clone();
1596 manager.block_actor(&actor.actor_id, "Test");
1597
1598 let stats = manager.stats().snapshot();
1599 assert_eq!(stats.total_actors, 5);
1600 assert_eq!(stats.blocked_actors, 1);
1601 assert_eq!(stats.total_created, 5);
1602 assert_eq!(stats.total_rule_matches, 5);
1603 }
1604
1605 #[test]
1610 fn test_clear() {
1611 let manager = create_test_manager();
1612
1613 for i in 0..10 {
1615 let ip = create_test_ip(i);
1616 let actor_id = manager.get_or_create_actor(ip, Some(&format!("fp_{}", i)));
1617 manager.block_actor(&actor_id, "Test");
1618 }
1619
1620 assert_eq!(manager.len(), 10);
1621
1622 manager.clear();
1623
1624 assert_eq!(manager.len(), 0);
1625 assert!(manager.ip_to_actor.is_empty());
1626 assert!(manager.fingerprint_to_actor.is_empty());
1627 assert_eq!(manager.stats().total_actors.load(Ordering::Relaxed), 0);
1628 assert_eq!(manager.stats().blocked_actors.load(Ordering::Relaxed), 0);
1629 }
1630
1631 #[test]
1636 fn test_default() {
1637 let manager = ActorManager::default();
1638
1639 assert!(manager.is_enabled());
1640 assert!(manager.is_empty());
1641 assert_eq!(manager.config().max_actors, 100_000);
1642 }
1643
1644 #[test]
1649 fn test_actor_id_uniqueness() {
1650 let mut ids = HashSet::new();
1651 for _ in 0..1000 {
1652 let id = generate_actor_id();
1653 assert!(!ids.contains(&id), "Duplicate ID generated: {}", id);
1654 ids.insert(id);
1655 }
1656 }
1657
1658 #[test]
1659 fn test_actor_id_format() {
1660 let id = generate_actor_id();
1661
1662 assert_eq!(id.len(), 36);
1664 assert_eq!(id.chars().nth(8), Some('-'));
1665 assert_eq!(id.chars().nth(13), Some('-'));
1666 assert_eq!(id.chars().nth(14), Some('4')); assert_eq!(id.chars().nth(18), Some('-'));
1668 assert_eq!(id.chars().nth(23), Some('-'));
1669 }
1670
1671 #[test]
1676 fn test_empty_fingerprint() {
1677 let manager = create_test_manager();
1678 let ip = create_test_ip(1);
1679
1680 let actor_id = manager.get_or_create_actor(ip, Some(""));
1682
1683 let actor = manager.get_actor(&actor_id).unwrap();
1684 assert!(actor.fingerprints.is_empty());
1685 assert!(manager.fingerprint_to_actor.is_empty());
1686 }
1687
1688 #[test]
1689 fn test_ipv6_addresses() {
1690 let manager = create_test_manager();
1691
1692 let ipv6_1: IpAddr = "2001:db8::1".parse().unwrap();
1693 let ipv6_2: IpAddr = "2001:db8::2".parse().unwrap();
1694
1695 let actor_id1 = manager.get_or_create_actor(ipv6_1, Some("ipv6_fp"));
1696 let actor_id2 = manager.get_or_create_actor(ipv6_2, Some("ipv6_fp"));
1697
1698 assert_eq!(actor_id1, actor_id2);
1699
1700 let actor = manager.get_actor(&actor_id1).unwrap();
1701 assert!(actor.ips.contains(&ipv6_1));
1702 assert!(actor.ips.contains(&ipv6_2));
1703 }
1704
1705 #[test]
1706 fn test_disabled_manager() {
1707 let config = ActorConfig {
1708 enabled: false,
1709 ..Default::default()
1710 };
1711 let manager = ActorManager::new(config);
1712
1713 assert!(!manager.is_enabled());
1714
1715 let ip = create_test_ip(1);
1716 let actor_id = manager.get_or_create_actor(ip, None);
1717
1718 assert!(!actor_id.is_empty());
1720 assert!(manager.is_empty());
1721
1722 manager.record_rule_match(&actor_id, "test", 10.0, "test");
1724 assert!(manager.get_actor(&actor_id).is_none());
1725 }
1726}