Skip to main content

synapse_pingora/actor/
manager.rs

1//! Thread-safe actor manager using DashMap for concurrent access.
2//!
3//! Implements per-actor state tracking with LRU eviction and background cleanup.
4
5use std::cmp::Ordering as CmpOrdering;
6use std::collections::HashSet;
7use std::net::IpAddr;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use dashmap::DashMap;
13use parking_lot::RwLock as PLRwLock;
14use serde::{Deserialize, Serialize};
15use tokio::sync::Notify;
16
17// ============================================================================
18// Configuration
19// ============================================================================
20
21/// Configuration for ActorManager.
22#[derive(Debug, Clone)]
23pub struct ActorConfig {
24    /// Maximum number of actors to track (LRU eviction when exceeded).
25    /// Default: 100,000
26    pub max_actors: usize,
27
28    /// Interval in seconds between decay cycles.
29    /// Default: 900 (15 minutes)
30    pub decay_interval_secs: u64,
31
32    /// Interval in seconds between persistence cycles.
33    /// Default: 300 (5 minutes)
34    pub persist_interval_secs: u64,
35
36    /// Threshold for correlation confidence.
37    /// Default: 0.7
38    pub correlation_threshold: f64,
39
40    /// Factor by which risk scores decay each cycle.
41    /// Default: 0.9
42    pub risk_decay_factor: f64,
43
44    /// Maximum number of rule matches to track per actor.
45    /// Default: 100
46    pub max_rule_matches: usize,
47
48    /// Maximum number of session IDs to track per actor.
49    /// Prevents memory exhaustion from session hijacking attacks.
50    /// Default: 50
51    pub max_session_ids: usize,
52
53    /// Maximum number of fingerprints to track per actor.
54    /// Prevents memory exhaustion from fingerprint flooding attacks.
55    /// Default: 20
56    pub max_fingerprints_per_actor: usize,
57
58    /// Maximum number of global fingerprint-to-actor mappings.
59    /// Prevents memory exhaustion from unique fingerprint attacks.
60    /// Default: 500,000 (5x max_actors)
61    pub max_fingerprint_mappings: usize,
62
63    /// Whether actor tracking is enabled.
64    /// Default: true
65    pub enabled: bool,
66
67    /// Maximum risk score (default: 100.0).
68    pub max_risk: f64,
69}
70
71impl Default for ActorConfig {
72    fn default() -> Self {
73        Self {
74            max_actors: 100_000,
75            decay_interval_secs: 900,
76            persist_interval_secs: 300,
77            correlation_threshold: 0.7,
78            risk_decay_factor: 0.9,
79            max_rule_matches: 100,
80            max_session_ids: 50,               // Prevents memory exhaustion
81            max_fingerprints_per_actor: 20,    // Prevents per-actor memory exhaustion
82            max_fingerprint_mappings: 500_000, // 5x max_actors for global limit
83            enabled: true,
84            max_risk: 100.0,
85        }
86    }
87}
88
89// ============================================================================
90// Rule Match Record
91// ============================================================================
92
93/// Rule match record for actor history.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct RuleMatch {
96    /// Rule identifier (e.g., "sqli-001").
97    pub rule_id: String,
98
99    /// Timestamp when the rule was matched (ms since epoch).
100    pub timestamp: u64,
101
102    /// Risk contribution from this rule match.
103    pub risk_contribution: f64,
104
105    /// Category of the rule (e.g., "sqli", "xss", "path_traversal").
106    pub category: String,
107}
108
109impl RuleMatch {
110    /// Create a new rule match record.
111    pub fn new(rule_id: String, risk_contribution: f64, category: String) -> Self {
112        Self {
113            rule_id,
114            timestamp: now_ms(),
115            risk_contribution,
116            category,
117        }
118    }
119}
120
121// ============================================================================
122// Actor State
123// ============================================================================
124
125/// Per-actor state tracking.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct ActorState {
128    /// Unique actor identifier (UUID v4).
129    pub actor_id: String,
130
131    /// Accumulated risk score (0.0 - max_risk).
132    pub risk_score: f64,
133
134    /// History of rule matches for this actor.
135    pub rule_matches: Vec<RuleMatch>,
136
137    /// Count of anomalous behaviors detected.
138    pub anomaly_count: u64,
139
140    /// Session IDs associated with this actor.
141    pub session_ids: Vec<String>,
142
143    /// First seen timestamp (ms since epoch).
144    pub first_seen: u64,
145
146    /// Last seen timestamp (ms since epoch).
147    pub last_seen: u64,
148
149    /// IP addresses associated with this actor.
150    #[serde(with = "ip_set_serde")]
151    pub ips: HashSet<IpAddr>,
152
153    /// Fingerprints associated with this actor.
154    pub fingerprints: HashSet<String>,
155
156    /// Whether this actor is currently blocked.
157    pub is_blocked: bool,
158
159    /// Reason for blocking (if blocked).
160    pub block_reason: Option<String>,
161
162    /// Timestamp when blocked (ms since epoch).
163    pub blocked_since: Option<u64>,
164}
165
166impl ActorState {
167    /// Create a new actor state.
168    pub fn new(actor_id: String) -> Self {
169        let now = now_ms();
170        Self {
171            actor_id,
172            risk_score: 0.0,
173            rule_matches: Vec::new(),
174            anomaly_count: 0,
175            session_ids: Vec::new(),
176            first_seen: now,
177            last_seen: now,
178            ips: HashSet::new(),
179            fingerprints: HashSet::new(),
180            is_blocked: false,
181            block_reason: None,
182            blocked_since: None,
183        }
184    }
185
186    /// Update last seen timestamp.
187    pub fn touch(&mut self) {
188        self.last_seen = now_ms();
189    }
190
191    /// Add an IP address to this actor.
192    pub fn add_ip(&mut self, ip: IpAddr) {
193        self.ips.insert(ip);
194        self.touch();
195    }
196
197    /// Add a fingerprint to this actor with a maximum limit.
198    ///
199    /// Returns `true` if the fingerprint was added, `false` if at capacity or empty.
200    pub fn add_fingerprint(&mut self, fingerprint: String, max_fingerprints: usize) -> bool {
201        if fingerprint.is_empty() {
202            return false;
203        }
204
205        // Check if already present (no limit needed)
206        if self.fingerprints.contains(&fingerprint) {
207            self.touch();
208            return true;
209        }
210
211        // Enforce per-actor fingerprint limit to prevent memory exhaustion
212        if self.fingerprints.len() >= max_fingerprints {
213            // At capacity - don't add new fingerprint
214            self.touch();
215            return false;
216        }
217
218        self.fingerprints.insert(fingerprint);
219        self.touch();
220        true
221    }
222
223    /// Add a rule match to this actor's history.
224    pub fn add_rule_match(&mut self, rule_match: RuleMatch, max_matches: usize) {
225        self.rule_matches.push(rule_match);
226        self.touch();
227
228        // Trim to max matches (keep most recent)
229        if self.rule_matches.len() > max_matches {
230            let excess = self.rule_matches.len() - max_matches;
231            self.rule_matches.drain(0..excess);
232        }
233    }
234
235    /// Get the count of matches for a specific rule.
236    pub fn get_rule_match_count(&self, rule_id: &str) -> usize {
237        self.rule_matches
238            .iter()
239            .filter(|m| m.rule_id == rule_id)
240            .count()
241    }
242}
243
244/// Custom serde implementation for HashSet<IpAddr>.
245mod ip_set_serde {
246    use serde::{Deserialize, Deserializer, Serialize, Serializer};
247    use std::collections::HashSet;
248    use std::net::IpAddr;
249
250    pub fn serialize<S>(set: &HashSet<IpAddr>, serializer: S) -> Result<S::Ok, S::Error>
251    where
252        S: Serializer,
253    {
254        let strings: Vec<String> = set.iter().map(|ip| ip.to_string()).collect();
255        strings.serialize(serializer)
256    }
257
258    pub fn deserialize<'de, D>(deserializer: D) -> Result<HashSet<IpAddr>, D::Error>
259    where
260        D: Deserializer<'de>,
261    {
262        let strings: Vec<String> = Vec::deserialize(deserializer)?;
263        let mut set = HashSet::new();
264        for s in strings {
265            if let Ok(ip) = s.parse() {
266                set.insert(ip);
267            }
268        }
269        Ok(set)
270    }
271}
272
273// ============================================================================
274// Statistics
275// ============================================================================
276
277/// Statistics for monitoring the actor manager.
278#[derive(Debug, Default)]
279pub struct ActorStats {
280    /// Total number of actors currently tracked.
281    pub total_actors: AtomicU64,
282
283    /// Number of blocked actors.
284    pub blocked_actors: AtomicU64,
285
286    /// Total correlations made (IP-to-actor or fingerprint-to-actor).
287    pub correlations_made: AtomicU64,
288
289    /// Total actors evicted due to LRU capacity.
290    pub evictions: AtomicU64,
291
292    /// Total actors created.
293    pub total_created: AtomicU64,
294
295    /// Total rule matches recorded.
296    pub total_rule_matches: AtomicU64,
297
298    /// Total fingerprint mappings evicted due to capacity limits.
299    pub fingerprint_evictions: AtomicU64,
300}
301
302impl ActorStats {
303    /// Create a new stats instance.
304    pub fn new() -> Self {
305        Self::default()
306    }
307
308    /// Get a snapshot of the current statistics.
309    pub fn snapshot(&self) -> ActorStatsSnapshot {
310        ActorStatsSnapshot {
311            total_actors: self.total_actors.load(Ordering::Relaxed),
312            blocked_actors: self.blocked_actors.load(Ordering::Relaxed),
313            correlations_made: self.correlations_made.load(Ordering::Relaxed),
314            evictions: self.evictions.load(Ordering::Relaxed),
315            total_created: self.total_created.load(Ordering::Relaxed),
316            total_rule_matches: self.total_rule_matches.load(Ordering::Relaxed),
317            fingerprint_evictions: self.fingerprint_evictions.load(Ordering::Relaxed),
318        }
319    }
320}
321
322/// Snapshot of actor statistics (for serialization).
323#[derive(Debug, Clone, Serialize)]
324pub struct ActorStatsSnapshot {
325    pub total_actors: u64,
326    pub blocked_actors: u64,
327    pub correlations_made: u64,
328    pub evictions: u64,
329    pub total_created: u64,
330    pub total_rule_matches: u64,
331    pub fingerprint_evictions: u64,
332}
333
334// ============================================================================
335// Actor Manager
336// ============================================================================
337
338/// Manages actor state with LRU eviction.
339///
340/// Cached JA4 cluster data: (timestamp, [(fingerprint, actor_ids, max_risk)])
341type FingerprintClusterCache = Option<(Instant, Vec<(String, Vec<String>, f64)>)>;
342
343/// Thread-safe implementation using DashMap for lock-free concurrent access.
344#[derive(Debug)]
345pub struct ActorManager {
346    /// Actors by actor_id (primary storage).
347    actors: DashMap<String, ActorState>,
348
349    /// IP address to actor_id mapping.
350    ip_to_actor: DashMap<IpAddr, String>,
351
352    /// Fingerprint to actor_id mapping.
353    fingerprint_to_actor: DashMap<String, String>,
354
355    /// Configuration.
356    config: ActorConfig,
357
358    /// Statistics.
359    stats: Arc<ActorStats>,
360
361    /// Shutdown signal.
362    shutdown: Arc<Notify>,
363
364    /// Touch counter for lazy eviction.
365    touch_counter: AtomicU32,
366
367    /// JA4 cluster cache for TUI performance (labs-tui optimization).
368    fingerprint_groups_cache: PLRwLock<FingerprintClusterCache>,
369}
370
371impl ActorManager {
372    /// Create a new actor manager with the given configuration.
373    pub fn new(config: ActorConfig) -> Self {
374        Self {
375            actors: DashMap::with_capacity(config.max_actors),
376            ip_to_actor: DashMap::with_capacity(config.max_actors),
377            fingerprint_to_actor: DashMap::with_capacity(config.max_actors * 2),
378            config,
379            stats: Arc::new(ActorStats::new()),
380            shutdown: Arc::new(Notify::new()),
381            touch_counter: AtomicU32::new(0),
382            fingerprint_groups_cache: PLRwLock::new(None),
383        }
384    }
385
386    /// Get the configuration.
387    pub fn config(&self) -> &ActorConfig {
388        &self.config
389    }
390
391    /// Check if actor tracking is enabled.
392    pub fn is_enabled(&self) -> bool {
393        self.config.enabled
394    }
395
396    /// Get the number of tracked actors.
397    pub fn len(&self) -> usize {
398        self.actors.len()
399    }
400
401    /// Check if the store is empty.
402    pub fn is_empty(&self) -> bool {
403        self.actors.is_empty()
404    }
405
406    /// Get or create an actor for the given IP and optional fingerprint.
407    ///
408    /// # Correlation Logic
409    /// 1. Check if IP is already mapped to an actor
410    /// 2. Check if fingerprint is already mapped to an actor
411    /// 3. If both match different actors, prefer fingerprint (more stable)
412    /// 4. If no match, create a new actor
413    ///
414    /// # Returns
415    /// The actor_id for the correlated or newly created actor.
416    pub fn get_or_create_actor(&self, ip: IpAddr, fingerprint: Option<&str>) -> String {
417        if !self.config.enabled {
418            return generate_actor_id();
419        }
420
421        // Check capacity and evict if needed
422        self.maybe_evict();
423
424        // Try to correlate to existing actor
425        if let Some(actor_id) = self.correlate_actor(ip, fingerprint) {
426            // Update the existing actor
427            if let Some(mut entry) = self.actors.get_mut(&actor_id) {
428                entry.add_ip(ip);
429                if let Some(fp) = fingerprint {
430                    if !fp.is_empty() {
431                        // Only update fingerprint mapping if the actor accepted it
432                        if entry
433                            .add_fingerprint(fp.to_string(), self.config.max_fingerprints_per_actor)
434                        {
435                            // Check global fingerprint mapping capacity before inserting
436                            self.maybe_evict_fingerprint_mappings();
437                            self.fingerprint_to_actor
438                                .insert(fp.to_string(), actor_id.clone());
439                        }
440                    }
441                }
442                // Ensure IP mapping is current
443                self.ip_to_actor.insert(ip, actor_id.clone());
444            }
445            return actor_id;
446        }
447
448        // Create new actor
449        let actor_id = generate_actor_id();
450        let mut actor = ActorState::new(actor_id.clone());
451        actor.add_ip(ip);
452
453        if let Some(fp) = fingerprint {
454            if !fp.is_empty() {
455                // Only update fingerprint mapping if the actor accepted it
456                if actor.add_fingerprint(fp.to_string(), self.config.max_fingerprints_per_actor) {
457                    // Check global fingerprint mapping capacity before inserting
458                    self.maybe_evict_fingerprint_mappings();
459                    self.fingerprint_to_actor
460                        .insert(fp.to_string(), actor_id.clone());
461                }
462            }
463        }
464
465        // Insert mappings
466        self.ip_to_actor.insert(ip, actor_id.clone());
467        self.actors.insert(actor_id.clone(), actor);
468
469        // Update stats
470        self.stats.total_actors.fetch_add(1, Ordering::Relaxed);
471        self.stats.total_created.fetch_add(1, Ordering::Relaxed);
472
473        actor_id
474    }
475
476    /// Record a rule match for an actor.
477    ///
478    /// # Arguments
479    /// * `actor_id` - The actor ID to record the match for
480    /// * `rule_id` - The rule that matched
481    /// * `risk_contribution` - Risk points to add
482    /// * `category` - Category of the rule (e.g., "sqli", "xss")
483    pub fn record_rule_match(
484        &self,
485        actor_id: &str,
486        rule_id: &str,
487        risk_contribution: f64,
488        category: &str,
489    ) {
490        if !self.config.enabled {
491            return;
492        }
493
494        if let Some(mut entry) = self.actors.get_mut(actor_id) {
495            let rule_match =
496                RuleMatch::new(rule_id.to_string(), risk_contribution, category.to_string());
497
498            // Add risk (capped at max)
499            entry.risk_score = (entry.risk_score + risk_contribution).min(self.config.max_risk);
500
501            // Add rule match to history
502            entry.add_rule_match(rule_match, self.config.max_rule_matches);
503
504            // Update stats
505            self.stats
506                .total_rule_matches
507                .fetch_add(1, Ordering::Relaxed);
508        }
509    }
510
511    /// Touch an actor to update last seen timestamp.
512    pub fn touch_actor(&self, actor_id: &str) {
513        if !self.config.enabled {
514            return;
515        }
516
517        if let Some(mut entry) = self.actors.get_mut(actor_id) {
518            entry.touch();
519        }
520    }
521
522    /// Get actor state by ID.
523    pub fn get_actor(&self, actor_id: &str) -> Option<ActorState> {
524        self.actors.get(actor_id).map(|entry| entry.value().clone())
525    }
526
527    /// Get actor by IP address.
528    pub fn get_actor_by_ip(&self, ip: IpAddr) -> Option<ActorState> {
529        self.ip_to_actor
530            .get(&ip)
531            .and_then(|actor_id| self.actors.get(actor_id.value()).map(|e| e.value().clone()))
532    }
533
534    /// Get actor by fingerprint.
535    pub fn get_actor_by_fingerprint(&self, fingerprint: &str) -> Option<ActorState> {
536        self.fingerprint_to_actor
537            .get(fingerprint)
538            .and_then(|actor_id| self.actors.get(actor_id.value()).map(|e| e.value().clone()))
539    }
540
541    /// Block an actor.
542    ///
543    /// # Returns
544    /// `true` if the actor was blocked, `false` if not found.
545    pub fn block_actor(&self, actor_id: &str, reason: &str) -> bool {
546        if let Some(mut entry) = self.actors.get_mut(actor_id) {
547            if !entry.is_blocked {
548                entry.is_blocked = true;
549                entry.block_reason = Some(reason.to_string());
550                entry.blocked_since = Some(now_ms());
551                self.stats.blocked_actors.fetch_add(1, Ordering::Relaxed);
552            }
553            true
554        } else {
555            false
556        }
557    }
558
559    /// Unblock an actor.
560    ///
561    /// # Returns
562    /// `true` if the actor was unblocked, `false` if not found.
563    pub fn unblock_actor(&self, actor_id: &str) -> bool {
564        if let Some(mut entry) = self.actors.get_mut(actor_id) {
565            if entry.is_blocked {
566                entry.is_blocked = false;
567                entry.block_reason = None;
568                entry.blocked_since = None;
569                self.stats.blocked_actors.fetch_sub(1, Ordering::Relaxed);
570            }
571            true
572        } else {
573            false
574        }
575    }
576
577    /// Check if an actor is blocked.
578    pub fn is_blocked(&self, actor_id: &str) -> bool {
579        self.actors
580            .get(actor_id)
581            .map(|entry| entry.is_blocked)
582            .unwrap_or(false)
583    }
584
585    /// Associate a session with an actor.
586    /// Session IDs are bounded by max_session_ids to prevent memory exhaustion.
587    pub fn bind_session(&self, actor_id: &str, session_id: &str) {
588        if let Some(mut entry) = self.actors.get_mut(actor_id) {
589            if !entry.session_ids.contains(&session_id.to_string()) {
590                // SECURITY: Enforce max session_ids to prevent memory exhaustion
591                if entry.session_ids.len() >= self.config.max_session_ids {
592                    // Remove oldest session (FIFO)
593                    entry.session_ids.remove(0);
594                }
595                entry.session_ids.push(session_id.to_string());
596                entry.touch();
597            }
598        }
599    }
600
601    /// List actors with pagination.
602    ///
603    /// # Arguments
604    /// * `limit` - Maximum number of actors to return
605    /// * `offset` - Number of actors to skip
606    ///
607    /// # Returns
608    /// Vector of actor states sorted by last_seen (most recent first).
609    pub fn list_actors(&self, limit: usize, offset: usize) -> Vec<ActorState> {
610        let mut actors: Vec<ActorState> = self
611            .actors
612            .iter()
613            .map(|entry| entry.value().clone())
614            .collect();
615
616        // Sort by last_seen (most recent first)
617        actors.sort_by_key(|a| std::cmp::Reverse(a.last_seen));
618
619        // Apply pagination
620        actors.into_iter().skip(offset).take(limit).collect()
621    }
622
623    /// List actors above a minimum risk score.
624    ///
625    /// Results are sorted by risk score (desc), then last_seen (desc).
626    pub fn list_by_min_risk(&self, min_risk: f64, limit: usize, offset: usize) -> Vec<ActorState> {
627        let mut actors: Vec<ActorState> = self
628            .actors
629            .iter()
630            .filter(|entry| entry.value().risk_score >= min_risk)
631            .map(|entry| entry.value().clone())
632            .collect();
633
634        actors.sort_by(|a, b| {
635            b.risk_score
636                .partial_cmp(&a.risk_score)
637                .unwrap_or(CmpOrdering::Equal)
638                .then_with(|| b.last_seen.cmp(&a.last_seen))
639        });
640
641        actors.into_iter().skip(offset).take(limit).collect()
642    }
643
644    /// List blocked actors.
645    pub fn list_blocked_actors(&self) -> Vec<ActorState> {
646        self.actors
647            .iter()
648            .filter(|entry| entry.is_blocked)
649            .map(|entry| entry.value().clone())
650            .collect()
651    }
652
653    /// Returns groups of actors sharing the same fingerprints.
654    /// Used for identifying botnet clusters in the TUI.
655    ///
656    /// Optimized with a 1-second cache to avoid full table scans on every TUI tick.
657    pub fn get_fingerprint_groups(&self, limit: usize) -> Vec<(String, Vec<String>, f64)> {
658        // Check cache first
659        {
660            let cache = self.fingerprint_groups_cache.read();
661            if let Some((timestamp, data)) = &*cache {
662                if timestamp.elapsed() < Duration::from_secs(1) {
663                    let mut result = data.clone();
664                    result.truncate(limit);
665                    return result;
666                }
667            }
668        }
669
670        use std::collections::HashMap;
671        let mut groups: HashMap<String, (Vec<String>, f64)> = HashMap::new();
672
673        for entry in self.actors.iter() {
674            let actor = entry.value();
675            for fp in &actor.fingerprints {
676                let group = groups
677                    .entry(fp.clone())
678                    .or_insert_with(|| (Vec::new(), 0.0));
679                group.0.push(actor.actor_id.clone());
680                group.1 = group.1.max(actor.risk_score);
681            }
682        }
683
684        let mut sorted_groups: Vec<_> = groups
685            .into_iter()
686            .map(|(fp, (actors, risk))| (fp, actors, risk))
687            .collect();
688
689        // Sort by number of actors in group (descending)
690        sorted_groups.sort_by_key(|a| std::cmp::Reverse(a.1.len()));
691
692        // Update cache
693        {
694            let mut cache = self.fingerprint_groups_cache.write();
695            *cache = Some((Instant::now(), sorted_groups.clone()));
696        }
697
698        sorted_groups.truncate(limit);
699        sorted_groups
700    }
701
702    /// Start background tasks (decay, cleanup).
703    ///
704    /// Spawns a background task that periodically:
705    /// 1. Decays risk scores by the decay factor
706    /// 2. Evicts stale actors if over capacity
707    pub fn start_background_tasks(self: Arc<Self>) {
708        let manager = self;
709        let decay_interval = Duration::from_secs(manager.config.decay_interval_secs);
710
711        tokio::spawn(async move {
712            let mut interval = tokio::time::interval(decay_interval);
713
714            loop {
715                tokio::select! {
716                    _ = interval.tick() => {
717                        // Check shutdown
718                        if Arc::strong_count(&manager.shutdown) == 1 {
719                            // Only this task holds a reference, shutting down
720                            break;
721                        }
722
723                        // Decay risk scores
724                        manager.decay_scores();
725
726                        // Evict stale actors
727                        manager.evict_if_needed();
728                    }
729                    _ = manager.shutdown.notified() => {
730                        log::info!("Actor manager background tasks shutting down");
731                        break;
732                    }
733                }
734            }
735        });
736    }
737
738    /// Signal shutdown for background tasks.
739    pub fn shutdown(&self) {
740        self.shutdown.notify_one();
741    }
742
743    /// Get statistics.
744    pub fn stats(&self) -> &ActorStats {
745        &self.stats
746    }
747
748    /// Clear all actors (primarily for testing).
749    pub fn clear(&self) {
750        self.actors.clear();
751        self.ip_to_actor.clear();
752        self.fingerprint_to_actor.clear();
753        self.stats.total_actors.store(0, Ordering::Relaxed);
754        self.stats.blocked_actors.store(0, Ordering::Relaxed);
755    }
756
757    /// Create a snapshot of all actors for persistence.
758    ///
759    /// Returns all actors regardless of status.
760    pub fn snapshot(&self) -> Vec<ActorState> {
761        self.actors.iter().map(|e| e.value().clone()).collect()
762    }
763
764    /// Restore actors from a snapshot.
765    ///
766    /// Clears existing state and loads the provided actors.
767    pub fn restore(&self, actors: Vec<ActorState>) {
768        // Clear existing state
769        self.clear();
770
771        let mut blocked_count: u64 = 0;
772
773        // Restore actors and mappings
774        for actor in actors {
775            let actor_id = actor.actor_id.clone();
776
777            // Restore IP mappings
778            for ip in &actor.ips {
779                self.ip_to_actor.insert(*ip, actor_id.clone());
780            }
781
782            // Restore fingerprint mappings
783            for fp in &actor.fingerprints {
784                self.fingerprint_to_actor
785                    .insert(fp.clone(), actor_id.clone());
786            }
787
788            // Track blocked count
789            if actor.is_blocked {
790                blocked_count += 1;
791            }
792
793            // Insert actor
794            self.actors.insert(actor_id, actor);
795        }
796
797        // Update stats
798        let actor_count = self.actors.len() as u64;
799        self.stats
800            .total_actors
801            .store(actor_count, Ordering::Relaxed);
802        self.stats
803            .blocked_actors
804            .store(blocked_count, Ordering::Relaxed);
805        self.stats
806            .total_created
807            .store(actor_count, Ordering::Relaxed);
808    }
809
810    // ========================================================================
811    // Private Methods
812    // ========================================================================
813
814    /// Decay risk scores for all actors.
815    fn decay_scores(&self) {
816        let decay_factor = self.config.risk_decay_factor;
817
818        for mut entry in self.actors.iter_mut() {
819            let actor = entry.value_mut();
820            if actor.risk_score > 0.0 {
821                actor.risk_score *= decay_factor;
822
823                // Floor very small values to zero
824                if actor.risk_score < 0.01 {
825                    actor.risk_score = 0.0;
826                }
827            }
828        }
829    }
830
831    /// Evict actors if over capacity.
832    fn evict_if_needed(&self) {
833        let current_len = self.actors.len();
834        if current_len <= self.config.max_actors {
835            return;
836        }
837
838        // Evict oldest 1% of actors
839        let evict_count = (self.config.max_actors / 100).max(1);
840        self.evict_oldest(evict_count);
841    }
842
843    /// Maybe evict oldest actors if at capacity.
844    ///
845    /// Uses lazy eviction: only check every 100th operation.
846    fn maybe_evict(&self) {
847        let count = self.touch_counter.fetch_add(1, Ordering::Relaxed);
848        if !count.is_multiple_of(100) {
849            return;
850        }
851
852        if self.actors.len() < self.config.max_actors {
853            return;
854        }
855
856        // Evict oldest 1% of actors
857        let evict_count = (self.config.max_actors / 100).max(1);
858        self.evict_oldest(evict_count);
859    }
860
861    /// Maybe evict fingerprint mappings if at capacity.
862    ///
863    /// Prevents unbounded memory growth from fingerprint flooding attacks.
864    /// Evicts random entries when capacity is exceeded (fingerprints don't
865    /// have timestamps, so we use random eviction).
866    fn maybe_evict_fingerprint_mappings(&self) {
867        let current_len = self.fingerprint_to_actor.len();
868        if current_len < self.config.max_fingerprint_mappings {
869            return;
870        }
871
872        // Evict 10% of entries to avoid repeated evictions
873        let target_len = (self.config.max_fingerprint_mappings * 9) / 10;
874        let to_evict = current_len.saturating_sub(target_len);
875
876        if to_evict == 0 {
877            return;
878        }
879
880        // Collect keys to evict (sample from iteration order)
881        let keys_to_evict: Vec<String> = self
882            .fingerprint_to_actor
883            .iter()
884            .take(to_evict)
885            .map(|entry| entry.key().clone())
886            .collect();
887
888        // Remove collected entries
889        for key in keys_to_evict {
890            self.fingerprint_to_actor.remove(&key);
891        }
892
893        self.stats
894            .fingerprint_evictions
895            .fetch_add(to_evict as u64, Ordering::Relaxed);
896    }
897
898    /// Evict the N oldest actors by last_seen timestamp.
899    ///
900    /// Uses sampling to avoid O(n) collection of all actors.
901    fn evict_oldest(&self, count: usize) {
902        let sample_size = (count * 10).min(1000).min(self.actors.len());
903
904        if sample_size == 0 {
905            return;
906        }
907
908        // Sample actors
909        let mut candidates: Vec<(String, u64)> = Vec::with_capacity(sample_size);
910        for entry in self.actors.iter().take(sample_size) {
911            candidates.push((entry.key().clone(), entry.value().last_seen));
912        }
913
914        // Sort by last_seen (oldest first)
915        candidates.sort_unstable_by_key(|(_, ts)| *ts);
916
917        // Evict oldest N from sample
918        for (actor_id, _) in candidates.into_iter().take(count) {
919            self.remove_actor(&actor_id);
920            self.stats.evictions.fetch_add(1, Ordering::Relaxed);
921        }
922    }
923
924    /// Remove an actor and clean up all mappings.
925    fn remove_actor(&self, actor_id: &str) {
926        if let Some((_, actor)) = self.actors.remove(actor_id) {
927            // Remove IP mappings
928            for ip in &actor.ips {
929                self.ip_to_actor.remove(ip);
930            }
931
932            // Remove fingerprint mappings
933            for fp in &actor.fingerprints {
934                self.fingerprint_to_actor.remove(fp);
935            }
936
937            // Update stats
938            self.stats.total_actors.fetch_sub(1, Ordering::Relaxed);
939            if actor.is_blocked {
940                self.stats.blocked_actors.fetch_sub(1, Ordering::Relaxed);
941            }
942        }
943    }
944
945    /// Correlate an IP and/or fingerprint to an existing actor.
946    ///
947    /// # Returns
948    /// The actor_id if correlation found, None otherwise.
949    fn correlate_actor(&self, ip: IpAddr, fingerprint: Option<&str>) -> Option<String> {
950        let ip_actor = self.ip_to_actor.get(&ip).map(|r| r.value().clone());
951
952        let fp_actor = fingerprint.and_then(|fp| {
953            if fp.is_empty() {
954                None
955            } else {
956                self.fingerprint_to_actor.get(fp).map(|r| r.value().clone())
957            }
958        });
959
960        match (ip_actor, fp_actor) {
961            (Some(ip_id), Some(fp_id)) => {
962                // Both match - prefer fingerprint (more stable)
963                if ip_id == fp_id {
964                    Some(ip_id)
965                } else {
966                    // Different actors - merge them by preferring fingerprint
967                    self.stats.correlations_made.fetch_add(1, Ordering::Relaxed);
968                    Some(fp_id)
969                }
970            }
971            (Some(id), None) => {
972                self.stats.correlations_made.fetch_add(1, Ordering::Relaxed);
973                Some(id)
974            }
975            (None, Some(id)) => {
976                self.stats.correlations_made.fetch_add(1, Ordering::Relaxed);
977                Some(id)
978            }
979            (None, None) => None,
980        }
981    }
982}
983
984impl Default for ActorManager {
985    fn default() -> Self {
986        Self::new(ActorConfig::default())
987    }
988}
989
990// ============================================================================
991// Helper Functions
992// ============================================================================
993
994/// Generate a unique actor ID using cryptographically secure random bytes.
995fn generate_actor_id() -> String {
996    // Use getrandom for cryptographically secure random bytes
997    let mut bytes = [0u8; 16];
998    getrandom::getrandom(&mut bytes).expect("Failed to get random bytes");
999
1000    // Format as UUID v4 with proper version and variant bits
1001    bytes[6] = (bytes[6] & 0x0F) | 0x40; // Version 4
1002    bytes[8] = (bytes[8] & 0x3F) | 0x80; // Variant 1
1003
1004    format!(
1005        "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
1006        u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
1007        u16::from_be_bytes([bytes[4], bytes[5]]),
1008        u16::from_be_bytes([bytes[6], bytes[7]]),
1009        u16::from_be_bytes([bytes[8], bytes[9]]),
1010        u64::from_be_bytes([
1011            0, 0, bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]
1012        ])
1013    )
1014}
1015
1016/// Get current time in milliseconds since Unix epoch.
1017#[inline]
1018fn now_ms() -> u64 {
1019    SystemTime::now()
1020        .duration_since(UNIX_EPOCH)
1021        .map(|d| d.as_millis() as u64)
1022        .unwrap_or(0)
1023}
1024
1025// ============================================================================
1026// Tests
1027// ============================================================================
1028
1029#[cfg(test)]
1030mod tests {
1031    use super::*;
1032    use std::thread;
1033
1034    // ========================================================================
1035    // Helper Functions
1036    // ========================================================================
1037
1038    fn create_test_manager() -> ActorManager {
1039        ActorManager::new(ActorConfig {
1040            max_actors: 1000,
1041            ..Default::default()
1042        })
1043    }
1044
1045    fn create_test_ip(last_octet: u8) -> IpAddr {
1046        format!("192.168.1.{}", last_octet).parse().unwrap()
1047    }
1048
1049    // ========================================================================
1050    // Actor Creation and Retrieval Tests
1051    // ========================================================================
1052
1053    #[test]
1054    fn test_actor_creation() {
1055        let manager = create_test_manager();
1056        let ip = create_test_ip(1);
1057
1058        let actor_id = manager.get_or_create_actor(ip, None);
1059
1060        assert!(!actor_id.is_empty());
1061        assert_eq!(manager.len(), 1);
1062
1063        let actor = manager.get_actor(&actor_id).unwrap();
1064        assert_eq!(actor.actor_id, actor_id);
1065        assert!(actor.ips.contains(&ip));
1066        assert!(!actor.is_blocked);
1067    }
1068
1069    #[test]
1070    fn test_actor_retrieval_by_ip() {
1071        let manager = create_test_manager();
1072        let ip = create_test_ip(1);
1073
1074        let actor_id = manager.get_or_create_actor(ip, None);
1075        let retrieved = manager.get_actor_by_ip(ip).unwrap();
1076
1077        assert_eq!(retrieved.actor_id, actor_id);
1078    }
1079
1080    #[test]
1081    fn test_actor_retrieval_by_fingerprint() {
1082        let manager = create_test_manager();
1083        let ip = create_test_ip(1);
1084        let fingerprint = "t13d1516h2_abc123";
1085
1086        let actor_id = manager.get_or_create_actor(ip, Some(fingerprint));
1087        let retrieved = manager.get_actor_by_fingerprint(fingerprint).unwrap();
1088
1089        assert_eq!(retrieved.actor_id, actor_id);
1090    }
1091
1092    #[test]
1093    fn test_actor_nonexistent() {
1094        let manager = create_test_manager();
1095
1096        assert!(manager.get_actor("nonexistent").is_none());
1097        assert!(manager.get_actor_by_ip(create_test_ip(99)).is_none());
1098        assert!(manager.get_actor_by_fingerprint("nonexistent").is_none());
1099    }
1100
1101    // ========================================================================
1102    // IP and Fingerprint Correlation Tests
1103    // ========================================================================
1104
1105    #[test]
1106    fn test_ip_correlation() {
1107        let manager = create_test_manager();
1108        let ip = create_test_ip(1);
1109
1110        // First request
1111        let actor_id1 = manager.get_or_create_actor(ip, None);
1112
1113        // Second request from same IP
1114        let actor_id2 = manager.get_or_create_actor(ip, None);
1115
1116        assert_eq!(actor_id1, actor_id2);
1117        assert_eq!(manager.len(), 1);
1118    }
1119
1120    #[test]
1121    fn test_fingerprint_correlation() {
1122        let manager = create_test_manager();
1123        let ip1 = create_test_ip(1);
1124        let ip2 = create_test_ip(2);
1125        let fingerprint = "t13d1516h2_shared";
1126
1127        // First request
1128        let actor_id1 = manager.get_or_create_actor(ip1, Some(fingerprint));
1129
1130        // Second request from different IP but same fingerprint
1131        let actor_id2 = manager.get_or_create_actor(ip2, Some(fingerprint));
1132
1133        assert_eq!(actor_id1, actor_id2);
1134        assert_eq!(manager.len(), 1);
1135
1136        // Verify both IPs are associated
1137        let actor = manager.get_actor(&actor_id1).unwrap();
1138        assert!(actor.ips.contains(&ip1));
1139        assert!(actor.ips.contains(&ip2));
1140    }
1141
1142    #[test]
1143    fn test_fingerprint_preferred_over_ip() {
1144        let manager = create_test_manager();
1145        let ip1 = create_test_ip(1);
1146        let ip2 = create_test_ip(2);
1147        let fp1 = "fingerprint_1";
1148        let fp2 = "fingerprint_2";
1149
1150        // Create actor with IP1 and FP1
1151        let actor_id1 = manager.get_or_create_actor(ip1, Some(fp1));
1152
1153        // Create actor with IP2 and FP2
1154        let actor_id2 = manager.get_or_create_actor(ip2, Some(fp2));
1155
1156        assert_ne!(actor_id1, actor_id2);
1157
1158        // Now request with IP1 but FP2 - should correlate to FP2's actor
1159        let actor_id3 = manager.get_or_create_actor(ip1, Some(fp2));
1160
1161        assert_eq!(actor_id3, actor_id2);
1162    }
1163
1164    // ========================================================================
1165    // Rule Match Recording Tests
1166    // ========================================================================
1167
1168    #[test]
1169    fn test_record_rule_match() {
1170        let manager = create_test_manager();
1171        let ip = create_test_ip(1);
1172
1173        let actor_id = manager.get_or_create_actor(ip, None);
1174        manager.record_rule_match(&actor_id, "sqli-001", 25.0, "sqli");
1175
1176        let actor = manager.get_actor(&actor_id).unwrap();
1177        assert_eq!(actor.rule_matches.len(), 1);
1178        assert_eq!(actor.rule_matches[0].rule_id, "sqli-001");
1179        assert_eq!(actor.rule_matches[0].risk_contribution, 25.0);
1180        assert_eq!(actor.rule_matches[0].category, "sqli");
1181        assert_eq!(actor.risk_score, 25.0);
1182    }
1183
1184    #[test]
1185    fn test_rule_match_risk_accumulation() {
1186        let manager = create_test_manager();
1187        let ip = create_test_ip(1);
1188
1189        let actor_id = manager.get_or_create_actor(ip, None);
1190        manager.record_rule_match(&actor_id, "sqli-001", 25.0, "sqli");
1191        manager.record_rule_match(&actor_id, "xss-001", 20.0, "xss");
1192        manager.record_rule_match(&actor_id, "sqli-002", 30.0, "sqli");
1193
1194        let actor = manager.get_actor(&actor_id).unwrap();
1195        assert_eq!(actor.rule_matches.len(), 3);
1196        assert_eq!(actor.risk_score, 75.0);
1197    }
1198
1199    #[test]
1200    fn test_rule_match_risk_capped() {
1201        let manager = create_test_manager();
1202        let ip = create_test_ip(1);
1203
1204        let actor_id = manager.get_or_create_actor(ip, None);
1205
1206        // Add more than max_risk
1207        for _ in 0..15 {
1208            manager.record_rule_match(&actor_id, "sqli-001", 10.0, "sqli");
1209        }
1210
1211        let actor = manager.get_actor(&actor_id).unwrap();
1212        assert!(actor.risk_score <= 100.0);
1213    }
1214
1215    #[test]
1216    fn test_rule_match_history_limit() {
1217        let config = ActorConfig {
1218            max_rule_matches: 5,
1219            ..Default::default()
1220        };
1221        let manager = ActorManager::new(config);
1222        let ip = create_test_ip(1);
1223
1224        let actor_id = manager.get_or_create_actor(ip, None);
1225
1226        // Add more than max_rule_matches
1227        for i in 0..10 {
1228            manager.record_rule_match(&actor_id, &format!("rule-{}", i), 5.0, "test");
1229        }
1230
1231        let actor = manager.get_actor(&actor_id).unwrap();
1232        assert_eq!(actor.rule_matches.len(), 5);
1233
1234        // Should keep most recent
1235        assert_eq!(actor.rule_matches[0].rule_id, "rule-5");
1236        assert_eq!(actor.rule_matches[4].rule_id, "rule-9");
1237    }
1238
1239    // ========================================================================
1240    // Blocking/Unblocking Tests
1241    // ========================================================================
1242
1243    #[test]
1244    fn test_block_actor() {
1245        let manager = create_test_manager();
1246        let ip = create_test_ip(1);
1247
1248        let actor_id = manager.get_or_create_actor(ip, None);
1249
1250        assert!(!manager.is_blocked(&actor_id));
1251
1252        let result = manager.block_actor(&actor_id, "High risk score");
1253
1254        assert!(result);
1255        assert!(manager.is_blocked(&actor_id));
1256
1257        let actor = manager.get_actor(&actor_id).unwrap();
1258        assert!(actor.is_blocked);
1259        assert_eq!(actor.block_reason, Some("High risk score".to_string()));
1260        assert!(actor.blocked_since.is_some());
1261    }
1262
1263    #[test]
1264    fn test_unblock_actor() {
1265        let manager = create_test_manager();
1266        let ip = create_test_ip(1);
1267
1268        let actor_id = manager.get_or_create_actor(ip, None);
1269        manager.block_actor(&actor_id, "Test");
1270
1271        assert!(manager.is_blocked(&actor_id));
1272
1273        let result = manager.unblock_actor(&actor_id);
1274
1275        assert!(result);
1276        assert!(!manager.is_blocked(&actor_id));
1277
1278        let actor = manager.get_actor(&actor_id).unwrap();
1279        assert!(!actor.is_blocked);
1280        assert!(actor.block_reason.is_none());
1281        assert!(actor.blocked_since.is_none());
1282    }
1283
1284    #[test]
1285    fn test_block_nonexistent() {
1286        let manager = create_test_manager();
1287
1288        assert!(!manager.block_actor("nonexistent", "Test"));
1289        assert!(!manager.unblock_actor("nonexistent"));
1290        assert!(!manager.is_blocked("nonexistent"));
1291    }
1292
1293    // ========================================================================
1294    // LRU Eviction Tests
1295    // ========================================================================
1296
1297    #[test]
1298    fn test_lru_eviction() {
1299        let config = ActorConfig {
1300            max_actors: 100,
1301            ..Default::default()
1302        };
1303        let manager = ActorManager::new(config);
1304
1305        // Add 150 actors (over capacity)
1306        // Lazy eviction triggers every 100 operations, evicting 1% (1 actor) each time
1307        for i in 0..150 {
1308            let ip = format!("10.0.{}.{}", i / 256, i % 256).parse().unwrap();
1309            manager.get_or_create_actor(ip, None);
1310        }
1311
1312        // Lazy eviction doesn't aggressively enforce the limit
1313        // At most we create 150 - evictions triggered at operations 100 and 150 (with some actors evicted)
1314        assert!(manager.len() <= 150);
1315
1316        // Force more evictions by touching the manager more times
1317        for i in 0..200 {
1318            let ip = format!("10.1.{}.{}", i / 256, i % 256).parse().unwrap();
1319            manager.get_or_create_actor(ip, None);
1320        }
1321
1322        // After 350 total operations (multiple eviction cycles), should be closer to limit
1323        // but still may exceed due to lazy eviction nature
1324        let final_len = manager.len();
1325        let evictions = manager.stats().evictions.load(Ordering::Relaxed);
1326
1327        // Verify evictions did occur
1328        assert!(evictions > 0, "Expected evictions to occur, got 0");
1329
1330        // The key invariant: we should have created many actors but evicted some
1331        let created = manager.stats().total_created.load(Ordering::Relaxed);
1332        assert!(
1333            created > final_len as u64,
1334            "Expected some actors to be evicted"
1335        );
1336
1337        println!(
1338            "LRU eviction test: created={}, evicted={}, final_len={}",
1339            created, evictions, final_len
1340        );
1341    }
1342
1343    #[test]
1344    fn test_eviction_removes_mappings() {
1345        let config = ActorConfig {
1346            max_actors: 10,
1347            ..Default::default()
1348        };
1349        let manager = ActorManager::new(config);
1350
1351        // Create first actor and get its ID
1352        let first_ip = create_test_ip(1);
1353        let first_fingerprint = "first_fp";
1354        let first_actor_id = manager.get_or_create_actor(first_ip, Some(first_fingerprint));
1355
1356        // Sleep to ensure different timestamps
1357        std::thread::sleep(std::time::Duration::from_millis(10));
1358
1359        // Add many more actors to trigger eviction of the first
1360        for i in 10..200 {
1361            let ip = format!("10.0.{}.{}", i / 256, i % 256).parse().unwrap();
1362            manager.get_or_create_actor(ip, Some(&format!("fp_{}", i)));
1363        }
1364
1365        // If first actor was evicted, its mappings should be gone
1366        if manager.get_actor(&first_actor_id).is_none() {
1367            assert!(manager.ip_to_actor.get(&first_ip).is_none());
1368            assert!(manager
1369                .fingerprint_to_actor
1370                .get(first_fingerprint)
1371                .is_none());
1372        }
1373    }
1374
1375    // ========================================================================
1376    // Score Decay Tests
1377    // ========================================================================
1378
1379    #[test]
1380    fn test_decay_scores() {
1381        let config = ActorConfig {
1382            risk_decay_factor: 0.5,
1383            ..Default::default()
1384        };
1385        let manager = ActorManager::new(config);
1386        let ip = create_test_ip(1);
1387
1388        let actor_id = manager.get_or_create_actor(ip, None);
1389        manager.record_rule_match(&actor_id, "test", 100.0, "test");
1390
1391        // Verify initial score
1392        let actor = manager.get_actor(&actor_id).unwrap();
1393        assert_eq!(actor.risk_score, 100.0);
1394
1395        // Apply decay
1396        manager.decay_scores();
1397
1398        // Verify decayed score
1399        let actor = manager.get_actor(&actor_id).unwrap();
1400        assert_eq!(actor.risk_score, 50.0);
1401
1402        // Apply decay again
1403        manager.decay_scores();
1404
1405        let actor = manager.get_actor(&actor_id).unwrap();
1406        assert_eq!(actor.risk_score, 25.0);
1407    }
1408
1409    #[test]
1410    fn test_decay_floors_to_zero() {
1411        let config = ActorConfig {
1412            risk_decay_factor: 0.001,
1413            ..Default::default()
1414        };
1415        let manager = ActorManager::new(config);
1416        let ip = create_test_ip(1);
1417
1418        let actor_id = manager.get_or_create_actor(ip, None);
1419        manager.record_rule_match(&actor_id, "test", 1.0, "test");
1420
1421        // Apply decay multiple times
1422        for _ in 0..5 {
1423            manager.decay_scores();
1424        }
1425
1426        // Very small values should floor to zero
1427        let actor = manager.get_actor(&actor_id).unwrap();
1428        assert_eq!(actor.risk_score, 0.0);
1429    }
1430
1431    // ========================================================================
1432    // Session Binding Tests
1433    // ========================================================================
1434
1435    #[test]
1436    fn test_bind_session() {
1437        let manager = create_test_manager();
1438        let ip = create_test_ip(1);
1439
1440        let actor_id = manager.get_or_create_actor(ip, None);
1441        manager.bind_session(&actor_id, "session-123");
1442        manager.bind_session(&actor_id, "session-456");
1443        manager.bind_session(&actor_id, "session-123"); // Duplicate
1444
1445        let actor = manager.get_actor(&actor_id).unwrap();
1446        assert_eq!(actor.session_ids.len(), 2);
1447        assert!(actor.session_ids.contains(&"session-123".to_string()));
1448        assert!(actor.session_ids.contains(&"session-456".to_string()));
1449    }
1450
1451    // ========================================================================
1452    // List Tests
1453    // ========================================================================
1454
1455    #[test]
1456    fn test_list_actors() {
1457        let manager = create_test_manager();
1458
1459        // Create some actors
1460        for i in 0..10 {
1461            let ip = create_test_ip(i);
1462            manager.get_or_create_actor(ip, None);
1463            std::thread::sleep(std::time::Duration::from_millis(1));
1464        }
1465
1466        // List with pagination
1467        let first_page = manager.list_actors(5, 0);
1468        assert_eq!(first_page.len(), 5);
1469
1470        let second_page = manager.list_actors(5, 5);
1471        assert_eq!(second_page.len(), 5);
1472
1473        // Should be sorted by last_seen (most recent first)
1474        for window in first_page.windows(2) {
1475            assert!(window[0].last_seen >= window[1].last_seen);
1476        }
1477    }
1478
1479    #[test]
1480    fn test_list_blocked_actors() {
1481        let manager = create_test_manager();
1482
1483        // Create actors and block some
1484        for i in 0..10 {
1485            let ip = create_test_ip(i);
1486            let actor_id = manager.get_or_create_actor(ip, None);
1487            if i % 2 == 0 {
1488                manager.block_actor(&actor_id, "Test");
1489            }
1490        }
1491
1492        let blocked = manager.list_blocked_actors();
1493        assert_eq!(blocked.len(), 5);
1494
1495        for actor in blocked {
1496            assert!(actor.is_blocked);
1497        }
1498    }
1499
1500    // ========================================================================
1501    // Concurrent Access Tests
1502    // ========================================================================
1503
1504    #[test]
1505    fn test_concurrent_access() {
1506        let manager = Arc::new(create_test_manager());
1507        let mut handles = vec![];
1508
1509        // Spawn 10 threads, each creating and updating actors
1510        for thread_id in 0..10 {
1511            let manager = Arc::clone(&manager);
1512            handles.push(thread::spawn(move || {
1513                for i in 0..100 {
1514                    let ip: IpAddr = format!("10.{}.0.{}", thread_id, i % 256).parse().unwrap();
1515                    let fingerprint = format!("fp_t{}_{}", thread_id, i % 5);
1516                    let actor_id = manager.get_or_create_actor(ip, Some(&fingerprint));
1517                    manager.record_rule_match(&actor_id, "test", 1.0, "test");
1518                }
1519            }));
1520        }
1521
1522        for handle in handles {
1523            handle.join().unwrap();
1524        }
1525
1526        // Verify no panics and reasonable state
1527        assert!(manager.len() > 0);
1528        assert!(manager.stats().total_created.load(Ordering::Relaxed) > 0);
1529    }
1530
1531    #[test]
1532    fn test_stress_concurrent_updates() {
1533        let manager = Arc::new(ActorManager::new(ActorConfig {
1534            max_actors: 10_000,
1535            max_fingerprint_mappings: 50_000,
1536            ..Default::default()
1537        }));
1538        let mut handles = vec![];
1539
1540        // Higher concurrency and mixed operations to exercise thread safety.
1541        for thread_id in 0..16 {
1542            let manager = Arc::clone(&manager);
1543            handles.push(thread::spawn(move || {
1544                for i in 0..500 {
1545                    let ip: IpAddr = format!("10.{}.{}.{}", thread_id, i / 256, i % 256)
1546                        .parse()
1547                        .unwrap();
1548                    let fingerprint = format!("fp_t{}_{}", thread_id, i % 20);
1549                    let actor_id = manager.get_or_create_actor(ip, Some(&fingerprint));
1550
1551                    manager.record_rule_match(&actor_id, "stress", 0.5, "stress");
1552
1553                    if i % 5 == 0 {
1554                        manager.touch_actor(&actor_id);
1555                    }
1556                    if i % 200 == 0 {
1557                        manager.block_actor(&actor_id, "stress");
1558                    }
1559                }
1560            }));
1561        }
1562
1563        for handle in handles {
1564            handle.join().unwrap();
1565        }
1566
1567        let stats = manager.stats();
1568        assert!(manager.len() > 0);
1569        assert!(stats.total_created.load(Ordering::Relaxed) > 0);
1570        assert!(stats.total_rule_matches.load(Ordering::Relaxed) > 0);
1571    }
1572
1573    // ========================================================================
1574    // Statistics Tests
1575    // ========================================================================
1576
1577    #[test]
1578    fn test_stats() {
1579        let manager = create_test_manager();
1580
1581        // Initial stats
1582        let stats = manager.stats().snapshot();
1583        assert_eq!(stats.total_actors, 0);
1584        assert_eq!(stats.blocked_actors, 0);
1585        assert_eq!(stats.total_created, 0);
1586
1587        // Create actors
1588        for i in 0..5 {
1589            let ip = create_test_ip(i);
1590            let actor_id = manager.get_or_create_actor(ip, None);
1591            manager.record_rule_match(&actor_id, "test", 10.0, "test");
1592        }
1593
1594        // Block one
1595        let actor = manager.list_actors(1, 0)[0].clone();
1596        manager.block_actor(&actor.actor_id, "Test");
1597
1598        let stats = manager.stats().snapshot();
1599        assert_eq!(stats.total_actors, 5);
1600        assert_eq!(stats.blocked_actors, 1);
1601        assert_eq!(stats.total_created, 5);
1602        assert_eq!(stats.total_rule_matches, 5);
1603    }
1604
1605    // ========================================================================
1606    // Clear Tests
1607    // ========================================================================
1608
1609    #[test]
1610    fn test_clear() {
1611        let manager = create_test_manager();
1612
1613        // Add some actors
1614        for i in 0..10 {
1615            let ip = create_test_ip(i);
1616            let actor_id = manager.get_or_create_actor(ip, Some(&format!("fp_{}", i)));
1617            manager.block_actor(&actor_id, "Test");
1618        }
1619
1620        assert_eq!(manager.len(), 10);
1621
1622        manager.clear();
1623
1624        assert_eq!(manager.len(), 0);
1625        assert!(manager.ip_to_actor.is_empty());
1626        assert!(manager.fingerprint_to_actor.is_empty());
1627        assert_eq!(manager.stats().total_actors.load(Ordering::Relaxed), 0);
1628        assert_eq!(manager.stats().blocked_actors.load(Ordering::Relaxed), 0);
1629    }
1630
1631    // ========================================================================
1632    // Default Implementation Tests
1633    // ========================================================================
1634
1635    #[test]
1636    fn test_default() {
1637        let manager = ActorManager::default();
1638
1639        assert!(manager.is_enabled());
1640        assert!(manager.is_empty());
1641        assert_eq!(manager.config().max_actors, 100_000);
1642    }
1643
1644    // ========================================================================
1645    // Actor ID Generation Tests
1646    // ========================================================================
1647
1648    #[test]
1649    fn test_actor_id_uniqueness() {
1650        let mut ids = HashSet::new();
1651        for _ in 0..1000 {
1652            let id = generate_actor_id();
1653            assert!(!ids.contains(&id), "Duplicate ID generated: {}", id);
1654            ids.insert(id);
1655        }
1656    }
1657
1658    #[test]
1659    fn test_actor_id_format() {
1660        let id = generate_actor_id();
1661
1662        // Should be UUID-like format: xxxxxxxx-xxxx-4xxx-xxxx-xxxxxxxxxxxx
1663        assert_eq!(id.len(), 36);
1664        assert_eq!(id.chars().nth(8), Some('-'));
1665        assert_eq!(id.chars().nth(13), Some('-'));
1666        assert_eq!(id.chars().nth(14), Some('4')); // Version 4
1667        assert_eq!(id.chars().nth(18), Some('-'));
1668        assert_eq!(id.chars().nth(23), Some('-'));
1669    }
1670
1671    // ========================================================================
1672    // Edge Case Tests
1673    // ========================================================================
1674
1675    #[test]
1676    fn test_empty_fingerprint() {
1677        let manager = create_test_manager();
1678        let ip = create_test_ip(1);
1679
1680        // Empty fingerprint should be ignored
1681        let actor_id = manager.get_or_create_actor(ip, Some(""));
1682
1683        let actor = manager.get_actor(&actor_id).unwrap();
1684        assert!(actor.fingerprints.is_empty());
1685        assert!(manager.fingerprint_to_actor.is_empty());
1686    }
1687
1688    #[test]
1689    fn test_ipv6_addresses() {
1690        let manager = create_test_manager();
1691
1692        let ipv6_1: IpAddr = "2001:db8::1".parse().unwrap();
1693        let ipv6_2: IpAddr = "2001:db8::2".parse().unwrap();
1694
1695        let actor_id1 = manager.get_or_create_actor(ipv6_1, Some("ipv6_fp"));
1696        let actor_id2 = manager.get_or_create_actor(ipv6_2, Some("ipv6_fp"));
1697
1698        assert_eq!(actor_id1, actor_id2);
1699
1700        let actor = manager.get_actor(&actor_id1).unwrap();
1701        assert!(actor.ips.contains(&ipv6_1));
1702        assert!(actor.ips.contains(&ipv6_2));
1703    }
1704
1705    #[test]
1706    fn test_disabled_manager() {
1707        let config = ActorConfig {
1708            enabled: false,
1709            ..Default::default()
1710        };
1711        let manager = ActorManager::new(config);
1712
1713        assert!(!manager.is_enabled());
1714
1715        let ip = create_test_ip(1);
1716        let actor_id = manager.get_or_create_actor(ip, None);
1717
1718        // Should still generate an ID but not track
1719        assert!(!actor_id.is_empty());
1720        assert!(manager.is_empty());
1721
1722        // Record rule match should be no-op
1723        manager.record_rule_match(&actor_id, "test", 10.0, "test");
1724        assert!(manager.get_actor(&actor_id).is_none());
1725    }
1726}