saorsa_core/adaptive/
security.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Security hardening for the adaptive P2P network
15//!
16//! This module implements comprehensive security measures including:
17//! - Rate limiting to prevent DoS attacks
18//! - Blacklist management for malicious nodes
19//! - Eclipse attack detection
20//! - Data integrity verification
21//! - Security audit tools
22
23use super::*;
24use anyhow::Result;
25use serde::{Deserialize, Serialize};
26use sha2::{Digest, Sha256};
27use std::{
28    collections::{HashMap, HashSet, VecDeque},
29    net::IpAddr,
30    sync::Arc,
31    time::{Duration, Instant, SystemTime},
32};
33use tokio::sync::{Mutex, RwLock};
34
35/// Security configuration
36#[derive(Debug, Clone, Default)]
37pub struct SecurityConfig {
38    /// Rate limiting configuration
39    pub rate_limit: RateLimitConfig,
40
41    /// Blacklist configuration
42    pub blacklist: BlacklistConfig,
43
44    /// Eclipse detection configuration
45    pub eclipse_detection: EclipseDetectionConfig,
46
47    /// Data integrity configuration
48    pub integrity: IntegrityConfig,
49
50    /// Audit configuration
51    pub audit: AuditConfig,
52}
53
54/// Rate limiting configuration
55#[derive(Debug, Clone)]
56pub struct RateLimitConfig {
57    /// Max requests per node per window
58    pub node_requests_per_window: u32,
59
60    /// Max requests per IP per window
61    pub ip_requests_per_window: u32,
62
63    /// Time window for rate limiting
64    pub window_duration: Duration,
65
66    /// Max concurrent connections per node
67    pub max_connections_per_node: u32,
68
69    /// Max join requests per hour
70    pub max_joins_per_hour: u32,
71
72    /// Max tracked nodes (memory bound)
73    pub max_tracked_nodes: usize,
74
75    /// Max tracked IPs (memory bound)
76    pub max_tracked_ips: usize,
77}
78
79impl Default for RateLimitConfig {
80    fn default() -> Self {
81        Self {
82            node_requests_per_window: 100,
83            ip_requests_per_window: 500,
84            window_duration: Duration::from_secs(60),
85            max_connections_per_node: 10,
86            max_joins_per_hour: 20,
87            max_tracked_nodes: 10000,
88            max_tracked_ips: 10000,
89        }
90    }
91}
92
93/// Blacklist configuration
94#[derive(Debug, Clone)]
95pub struct BlacklistConfig {
96    /// Blacklist entry expiration
97    pub entry_ttl: Duration,
98
99    /// Max blacklist size
100    pub max_entries: usize,
101
102    /// Auto-blacklist threshold
103    pub violation_threshold: u32,
104}
105
106impl Default for BlacklistConfig {
107    fn default() -> Self {
108        Self {
109            entry_ttl: Duration::from_secs(86400), // 24 hours
110            max_entries: 10000,
111            violation_threshold: 3,
112        }
113    }
114}
115
116/// Eclipse detection configuration
117#[derive(Debug, Clone)]
118pub struct EclipseDetectionConfig {
119    /// Minimum routing table diversity score
120    pub min_diversity_score: f64,
121
122    /// Maximum allowed nodes from same subnet
123    pub max_subnet_ratio: f64,
124
125    /// Suspicious pattern threshold
126    pub pattern_threshold: f64,
127}
128
129impl Default for EclipseDetectionConfig {
130    fn default() -> Self {
131        Self {
132            min_diversity_score: 0.5,
133            max_subnet_ratio: 0.2,
134            pattern_threshold: 0.7,
135        }
136    }
137}
138
139/// Data integrity configuration
140#[derive(Debug, Clone)]
141pub struct IntegrityConfig {
142    /// Enable content hash verification
143    pub verify_content_hash: bool,
144
145    /// Enable message signatures
146    pub require_signatures: bool,
147
148    /// Maximum message size
149    pub max_message_size: usize,
150}
151
152impl Default for IntegrityConfig {
153    fn default() -> Self {
154        Self {
155            verify_content_hash: true,
156            require_signatures: true,
157            max_message_size: 10 * 1024 * 1024, // 10MB
158        }
159    }
160}
161
162/// Audit configuration
163#[derive(Debug, Clone)]
164pub struct AuditConfig {
165    /// Enable audit logging
166    pub enabled: bool,
167
168    /// Log security events
169    pub log_security_events: bool,
170
171    /// Log rate limit violations
172    pub log_rate_limits: bool,
173
174    /// Audit log retention
175    pub retention_days: u32,
176}
177
178impl Default for AuditConfig {
179    fn default() -> Self {
180        Self {
181            enabled: true,
182            log_security_events: true,
183            log_rate_limits: true,
184            retention_days: 30,
185        }
186    }
187}
188
189/// Security manager
190pub struct SecurityManager {
191    /// Configuration
192    config: SecurityConfig,
193
194    /// Rate limiter
195    rate_limiter: Arc<RateLimiter>,
196
197    /// Blacklist manager
198    blacklist: Arc<BlacklistManager>,
199
200    /// Eclipse detector
201    eclipse_detector: Arc<EclipseDetector>,
202
203    /// Integrity verifier
204    integrity_verifier: Arc<IntegrityVerifier>,
205
206    /// Security auditor
207    auditor: Arc<SecurityAuditor>,
208
209    /// Node identity for signing
210    _identity: crate::peer_record::UserId,
211}
212
213/// Rate limiter
214pub struct RateLimiter {
215    /// Configuration
216    config: RateLimitConfig,
217
218    /// Node request counts
219    node_requests: Arc<RwLock<HashMap<NodeId, RequestWindow>>>,
220
221    /// IP request counts
222    ip_requests: Arc<RwLock<HashMap<IpAddr, RequestWindow>>>,
223
224    /// Connection counts
225    _connections: Arc<RwLock<HashMap<NodeId, u32>>>,
226
227    /// Join request tracking
228    join_requests: Arc<RwLock<VecDeque<Instant>>>,
229}
230
231/// Request tracking window
232#[derive(Debug, Clone)]
233struct RequestWindow {
234    /// Request count
235    count: u32,
236
237    /// Window start time
238    window_start: Instant,
239}
240
241/// Blacklist manager
242pub struct BlacklistManager {
243    /// Configuration
244    config: BlacklistConfig,
245
246    /// Blacklisted nodes
247    blacklist: Arc<RwLock<HashMap<NodeId, BlacklistEntry>>>,
248
249    /// Violation counts
250    violations: Arc<RwLock<HashMap<NodeId, u32>>>,
251}
252
253/// Blacklist entry
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct BlacklistEntry {
256    /// Node ID
257    pub node_id: NodeId,
258
259    /// Reason for blacklisting
260    pub reason: BlacklistReason,
261
262    /// Timestamp when blacklisted
263    pub timestamp: SystemTime,
264
265    /// Reporter node
266    pub reporter: Option<NodeId>,
267}
268
269/// Blacklist reason
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub enum BlacklistReason {
272    /// Exceeded rate limits
273    RateLimitViolation,
274
275    /// Malicious behavior detected
276    MaliciousBehavior(String),
277
278    /// Eclipse attack attempt
279    EclipseAttack,
280
281    /// Data corruption
282    DataCorruption,
283
284    /// Invalid cryptographic proofs
285    InvalidCrypto,
286
287    /// Manual blacklist
288    Manual(String),
289}
290
291/// Eclipse attack detector
292pub struct EclipseDetector {
293    /// Configuration
294    config: EclipseDetectionConfig,
295
296    /// Anomaly patterns
297    patterns: Arc<RwLock<AnomalyPatterns>>,
298}
299
300/// Anomaly patterns for detection
301#[derive(Debug, Default)]
302struct AnomalyPatterns {
303    /// Rapid connection attempts
304    _rapid_connections: HashMap<NodeId, Vec<Instant>>,
305
306    /// Subnet distribution
307    subnet_distribution: HashMap<String, u32>,
308
309    /// Suspicious routing updates
310    routing_anomalies: Vec<RoutingAnomaly>,
311}
312
313/// Routing anomaly
314#[derive(Debug, Clone)]
315struct RoutingAnomaly {
316    /// Node exhibiting anomaly
317    _node_id: NodeId,
318
319    /// Type of anomaly
320    _anomaly_type: AnomalyType,
321
322    /// Detection time
323    timestamp: Instant,
324}
325
326/// Anomaly types
327#[derive(Debug, Clone)]
328pub enum AnomalyType {
329    /// Too many nodes from same subnet
330    SubnetConcentration,
331
332    /// Rapid routing table changes
333    RapidChurn,
334
335    /// Suspicious connection patterns
336    ConnectionPattern,
337
338    /// Coordinated behavior
339    CoordinatedActivity,
340}
341
342/// Data integrity verifier
343pub struct IntegrityVerifier {
344    /// Configuration
345    _config: IntegrityConfig,
346
347    /// Message verification stats
348    stats: Arc<RwLock<VerificationStats>>,
349}
350
351/// Verification statistics
352#[derive(Debug, Default)]
353struct VerificationStats {
354    /// Total messages verified
355    total_verified: u64,
356
357    /// Failed verifications
358    failed_verifications: u64,
359
360    /// Invalid hashes
361    invalid_hashes: u64,
362
363    /// Invalid signatures
364    _invalid_signatures: u64,
365}
366
367/// Security auditor
368pub struct SecurityAuditor {
369    /// Configuration
370    config: AuditConfig,
371
372    /// Audit log
373    audit_log: Arc<Mutex<VecDeque<AuditEntry>>>,
374
375    /// Event counts
376    event_counts: Arc<RwLock<HashMap<String, u64>>>,
377}
378
379/// Audit log entry
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct AuditEntry {
382    /// Entry timestamp
383    pub timestamp: SystemTime,
384
385    /// Event type
386    pub event_type: SecurityEvent,
387
388    /// Associated node
389    pub node_id: Option<NodeId>,
390
391    /// Event details
392    pub details: String,
393
394    /// Severity level
395    pub severity: Severity,
396}
397
398/// Security event types
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub enum SecurityEvent {
401    /// Rate limit exceeded
402    RateLimitExceeded,
403
404    /// Node blacklisted
405    NodeBlacklisted,
406
407    /// Eclipse attack detected
408    EclipseAttackDetected,
409
410    /// Data integrity failure
411    IntegrityFailure,
412
413    /// Authentication failure
414    AuthenticationFailure,
415
416    /// Suspicious activity
417    SuspiciousActivity,
418
419    /// Security configuration change
420    ConfigurationChange,
421}
422
423/// Event severity
424#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
425pub enum Severity {
426    Debug,
427    Info,
428    Warning,
429    Error,
430    Critical,
431}
432
433/// Security errors
434#[derive(Debug, thiserror::Error)]
435pub enum SecurityError {
436    #[error("Rate limit exceeded")]
437    RateLimitExceeded,
438
439    #[error("Node is blacklisted")]
440    Blacklisted,
441
442    #[error("Invalid cryptographic identity")]
443    InvalidIdentity,
444
445    #[error("Data integrity check failed")]
446    IntegrityCheckFailed,
447
448    #[error("Eclipse attack detected")]
449    EclipseAttackDetected,
450
451    #[error("Message too large")]
452    MessageTooLarge,
453
454    #[error("Invalid signature")]
455    InvalidSignature,
456}
457
458impl SecurityManager {
459    /// Create new security manager
460    pub fn new(config: SecurityConfig, identity: &NodeIdentity) -> Self {
461        let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit.clone()));
462        let blacklist = Arc::new(BlacklistManager::new(config.blacklist.clone()));
463        let eclipse_detector = Arc::new(EclipseDetector::new(config.eclipse_detection.clone()));
464        let integrity_verifier = Arc::new(IntegrityVerifier::new(config.integrity.clone()));
465        let auditor = Arc::new(SecurityAuditor::new(config.audit.clone()));
466
467        Self {
468            config,
469            rate_limiter,
470            blacklist,
471            eclipse_detector,
472            integrity_verifier,
473            auditor,
474            _identity: identity.to_user_id(),
475        }
476    }
477
478    /// Validate node join request
479    pub async fn validate_node_join(&self, node: &NodeDescriptor) -> Result<(), SecurityError> {
480        // Check blacklist
481        if self.blacklist.is_blacklisted(&node.id).await {
482            self.auditor
483                .log_event(
484                    SecurityEvent::NodeBlacklisted,
485                    Some(node.id.clone()),
486                    "Node attempted to join while blacklisted".to_string(),
487                    Severity::Warning,
488                )
489                .await;
490            return Err(SecurityError::Blacklisted);
491        }
492
493        // Check rate limits for joins
494        if !self.rate_limiter.check_join_rate().await {
495            self.auditor
496                .log_event(
497                    SecurityEvent::RateLimitExceeded,
498                    Some(node.id.clone()),
499                    "Join rate limit exceeded".to_string(),
500                    Severity::Warning,
501                )
502                .await;
503            return Err(SecurityError::RateLimitExceeded);
504        }
505
506        // Verify cryptographic identity
507        if !self.verify_identity(node).await {
508            return Err(SecurityError::InvalidIdentity);
509        }
510
511        Ok(())
512    }
513
514    /// Check if request should be rate limited
515    pub async fn check_rate_limit(
516        &self,
517        node_id: &NodeId,
518        ip: Option<IpAddr>,
519    ) -> Result<(), SecurityError> {
520        // Check node rate limit
521        if !self.rate_limiter.check_node_rate(node_id).await {
522            self.blacklist
523                .record_violation(node_id, BlacklistReason::RateLimitViolation)
524                .await;
525            self.auditor
526                .log_event(
527                    SecurityEvent::RateLimitExceeded,
528                    Some(node_id.clone()),
529                    "Node request rate limit exceeded".to_string(),
530                    Severity::Warning,
531                )
532                .await;
533            return Err(SecurityError::RateLimitExceeded);
534        }
535
536        // Check IP rate limit if provided
537        if let Some(ip_addr) = ip
538            && !self.rate_limiter.check_ip_rate(&ip_addr).await
539        {
540            self.auditor
541                .log_event(
542                    SecurityEvent::RateLimitExceeded,
543                    None,
544                    format!("IP {ip_addr} rate limit exceeded"),
545                    Severity::Warning,
546                )
547                .await;
548            return Err(SecurityError::RateLimitExceeded);
549        }
550
551        Ok(())
552    }
553
554    /// Detect eclipse attack
555    pub async fn detect_eclipse_attack(
556        &self,
557        routing_table: &[NodeId],
558    ) -> Result<(), SecurityError> {
559        let diversity_score = self
560            .eclipse_detector
561            .calculate_diversity_score(routing_table)
562            .await;
563
564        if diversity_score < self.config.eclipse_detection.min_diversity_score {
565            self.auditor
566                .log_event(
567                    SecurityEvent::EclipseAttackDetected,
568                    None,
569                    format!("Low routing table diversity: {diversity_score:.2}"),
570                    Severity::Critical,
571                )
572                .await;
573            return Err(SecurityError::EclipseAttackDetected);
574        }
575
576        if self
577            .eclipse_detector
578            .detect_suspicious_patterns(routing_table)
579            .await
580        {
581            self.auditor
582                .log_event(
583                    SecurityEvent::EclipseAttackDetected,
584                    None,
585                    "Suspicious routing patterns detected".to_string(),
586                    Severity::Critical,
587                )
588                .await;
589            return Err(SecurityError::EclipseAttackDetected);
590        }
591
592        Ok(())
593    }
594
595    /// Verify message integrity
596    pub async fn verify_message_integrity(
597        &self,
598        message: &[u8],
599        hash: &[u8],
600        signature: Option<&[u8]>,
601    ) -> Result<(), SecurityError> {
602        // Basic anti-replay check using timestamp prefix if present
603        if message.len() >= 8 {
604            let mut ts_bytes = [0u8; 8];
605            ts_bytes.copy_from_slice(&message[..8]);
606            let msg_ts = u64::from_be_bytes(ts_bytes);
607            let now = SystemTime::now()
608                .duration_since(SystemTime::UNIX_EPOCH)
609                .unwrap_or_default()
610                .as_secs();
611            // 5-minute future tolerance window
612            if msg_ts > now + 300 {
613                return Err(SecurityError::IntegrityCheckFailed);
614            }
615        }
616        // Check message size
617        if message.len() > self.config.integrity.max_message_size {
618            return Err(SecurityError::MessageTooLarge);
619        }
620
621        // Verify content hash
622        if self.config.integrity.verify_content_hash
623            && !self.integrity_verifier.verify_hash(message, hash).await
624        {
625            self.auditor
626                .log_event(
627                    SecurityEvent::IntegrityFailure,
628                    None,
629                    "Content hash verification failed".to_string(),
630                    Severity::Error,
631                )
632                .await;
633            return Err(SecurityError::IntegrityCheckFailed);
634        }
635
636        // Verify signature if required
637        if self.config.integrity.require_signatures {
638            if let Some(sig) = signature {
639                if !self.integrity_verifier.verify_signature(message, sig).await {
640                    self.auditor
641                        .log_event(
642                            SecurityEvent::IntegrityFailure,
643                            None,
644                            "Message signature verification failed".to_string(),
645                            Severity::Error,
646                        )
647                        .await;
648                    return Err(SecurityError::InvalidSignature);
649                }
650            } else {
651                return Err(SecurityError::InvalidSignature);
652            }
653        }
654
655        Ok(())
656    }
657
658    /// Blacklist a node
659    pub async fn blacklist_node(&self, node_id: NodeId, reason: BlacklistReason) {
660        self.blacklist
661            .add_entry(node_id.clone(), reason.clone())
662            .await;
663
664        self.auditor
665            .log_event(
666                SecurityEvent::NodeBlacklisted,
667                Some(node_id),
668                format!("Node blacklisted: {reason:?}"),
669                Severity::Warning,
670            )
671            .await;
672    }
673
674    /// Get security metrics
675    pub async fn get_metrics(&self) -> SecurityMetrics {
676        SecurityMetrics {
677            rate_limit_violations: self.rate_limiter.get_violation_count().await,
678            blacklisted_nodes: self.blacklist.get_blacklist_size().await,
679            verification_failures: self.integrity_verifier.get_failure_count().await,
680            eclipse_detections: self.eclipse_detector.get_detection_count().await,
681            audit_entries: self.auditor.get_entry_count().await,
682        }
683    }
684
685    /// Verify node identity by binding NodeId to the advertised ML-DSA public key
686    async fn verify_identity(&self, node: &NodeDescriptor) -> bool {
687        // Derive BLAKE3 hash of ML-DSA public key bytes to match UserId
688        let bytes = node.public_key.as_bytes();
689        let hash = blake3::hash(bytes);
690        node.id.as_bytes() == hash.as_bytes()
691    }
692}
693
694impl RateLimiter {
695    /// Create new rate limiter
696    pub fn new(config: RateLimitConfig) -> Self {
697        Self {
698            config,
699            node_requests: Arc::new(RwLock::new(HashMap::new())),
700            ip_requests: Arc::new(RwLock::new(HashMap::new())),
701            _connections: Arc::new(RwLock::new(HashMap::new())),
702            join_requests: Arc::new(RwLock::new(VecDeque::new())),
703        }
704    }
705
706    /// Check node rate limit
707    pub async fn check_node_rate(&self, node_id: &NodeId) -> bool {
708        let mut requests = self.node_requests.write().await;
709        let now = Instant::now();
710
711        // Evict oldest entries if at capacity (before inserting new)
712        if requests.len() >= self.config.max_tracked_nodes && !requests.contains_key(node_id) {
713            // Find and remove the oldest entry
714            if let Some(oldest_key) = requests
715                .iter()
716                .min_by_key(|(_, window)| window.window_start)
717                .map(|(k, _)| k.clone())
718            {
719                requests.remove(&oldest_key);
720            }
721        }
722
723        let window = requests.entry(node_id.clone()).or_insert(RequestWindow {
724            count: 0,
725            window_start: now,
726        });
727
728        // Reset window if expired
729        if now.duration_since(window.window_start) > self.config.window_duration {
730            window.count = 0;
731            window.window_start = now;
732        }
733
734        // Check if under limit
735        if window.count < self.config.node_requests_per_window {
736            window.count += 1;
737            true
738        } else {
739            false
740        }
741    }
742
743    /// Get count of tracked nodes (for testing/monitoring)
744    pub async fn get_tracked_node_count(&self) -> usize {
745        self.node_requests.read().await.len()
746    }
747
748    /// Check IP rate limit
749    pub async fn check_ip_rate(&self, ip: &IpAddr) -> bool {
750        let mut requests = self.ip_requests.write().await;
751        let now = Instant::now();
752
753        // Evict oldest entries if at capacity (before inserting new)
754        if requests.len() >= self.config.max_tracked_ips && !requests.contains_key(ip) {
755            // Find and remove the oldest entry
756            if let Some(oldest_key) = requests
757                .iter()
758                .min_by_key(|(_, window)| window.window_start)
759                .map(|(k, _)| *k)
760            {
761                requests.remove(&oldest_key);
762            }
763        }
764
765        let window = requests.entry(*ip).or_insert(RequestWindow {
766            count: 0,
767            window_start: now,
768        });
769
770        // Reset window if expired
771        if now.duration_since(window.window_start) > self.config.window_duration {
772            window.count = 0;
773            window.window_start = now;
774        }
775
776        // Check if under limit
777        if window.count < self.config.ip_requests_per_window {
778            window.count += 1;
779            true
780        } else {
781            false
782        }
783    }
784
785    /// Get count of tracked IPs (for testing/monitoring)
786    pub async fn get_tracked_ip_count(&self) -> usize {
787        self.ip_requests.read().await.len()
788    }
789
790    /// Check join rate limit
791    pub async fn check_join_rate(&self) -> bool {
792        let mut join_requests = self.join_requests.write().await;
793        let now = Instant::now();
794
795        // Remove old entries (use checked_sub for Windows compatibility)
796        if let Some(hour_ago) = now.checked_sub(Duration::from_secs(3600)) {
797            while let Some(front) = join_requests.front() {
798                if *front < hour_ago {
799                    join_requests.pop_front();
800                } else {
801                    break;
802                }
803            }
804        }
805
806        // Check if under limit
807        if join_requests.len() < self.config.max_joins_per_hour as usize {
808            join_requests.push_back(now);
809            true
810        } else {
811            false
812        }
813    }
814
815    /// Get violation count
816    pub async fn get_violation_count(&self) -> u64 {
817        // In a real implementation, would track violations
818        0
819    }
820}
821
822impl BlacklistManager {
823    /// Create new blacklist manager
824    pub fn new(config: BlacklistConfig) -> Self {
825        Self {
826            config,
827            blacklist: Arc::new(RwLock::new(HashMap::new())),
828            violations: Arc::new(RwLock::new(HashMap::new())),
829        }
830    }
831
832    /// Check if node is blacklisted
833    pub async fn is_blacklisted(&self, node_id: &NodeId) -> bool {
834        let blacklist = self.blacklist.read().await;
835
836        if let Some(entry) = blacklist.get(node_id) {
837            // Check if entry has expired
838            let now = SystemTime::now();
839            let elapsed = now
840                .duration_since(entry.timestamp)
841                .unwrap_or(Duration::ZERO);
842
843            elapsed < self.config.entry_ttl
844        } else {
845            false
846        }
847    }
848
849    /// Add blacklist entry
850    pub async fn add_entry(&self, node_id: NodeId, reason: BlacklistReason) {
851        let mut blacklist = self.blacklist.write().await;
852
853        // Enforce max size
854        if blacklist.len() >= self.config.max_entries {
855            // Remove oldest entry
856            if let Some(oldest) = blacklist
857                .iter()
858                .min_by_key(|(_, entry)| entry.timestamp)
859                .map(|(id, _)| id.clone())
860            {
861                blacklist.remove(&oldest);
862            }
863        }
864
865        blacklist.insert(
866            node_id.clone(),
867            BlacklistEntry {
868                node_id,
869                reason,
870                timestamp: SystemTime::now(),
871                reporter: None,
872            },
873        );
874    }
875
876    /// Record violation
877    pub async fn record_violation(&self, node_id: &NodeId, reason: BlacklistReason) {
878        let mut violations = self.violations.write().await;
879        let count = violations.entry(node_id.clone()).or_insert(0);
880        *count += 1;
881
882        // Auto-blacklist if threshold exceeded
883        if *count >= self.config.violation_threshold {
884            drop(violations);
885            self.add_entry(node_id.clone(), reason).await;
886        }
887    }
888
889    /// Get blacklist size
890    pub async fn get_blacklist_size(&self) -> usize {
891        self.blacklist.read().await.len()
892    }
893
894    /// Export blacklist for sharing
895    pub async fn export_blacklist(&self) -> Vec<BlacklistEntry> {
896        let blacklist = self.blacklist.read().await;
897        let now = SystemTime::now();
898
899        blacklist
900            .values()
901            .filter(|entry| {
902                let elapsed = now
903                    .duration_since(entry.timestamp)
904                    .unwrap_or(Duration::ZERO);
905                elapsed < self.config.entry_ttl
906            })
907            .cloned()
908            .collect()
909    }
910
911    /// Import blacklist entries
912    pub async fn import_blacklist(&self, entries: Vec<BlacklistEntry>) {
913        let mut blacklist = self.blacklist.write().await;
914
915        for entry in entries {
916            // Only import if not already present or newer
917            match blacklist.get(&entry.node_id) {
918                Some(existing) if existing.timestamp >= entry.timestamp => continue,
919                _ => {
920                    blacklist.insert(entry.node_id.clone(), entry);
921                }
922            }
923        }
924    }
925}
926
927impl EclipseDetector {
928    /// Create new eclipse detector
929    pub fn new(config: EclipseDetectionConfig) -> Self {
930        Self {
931            config,
932            patterns: Arc::new(RwLock::new(AnomalyPatterns::default())),
933        }
934    }
935
936    /// Calculate diversity score of routing table
937    pub async fn calculate_diversity_score(&self, routing_table: &[NodeId]) -> f64 {
938        if routing_table.is_empty() {
939            return 0.0;
940        }
941
942        // Calculate based on unique hash prefixes
943        let mut prefixes = HashSet::new();
944        for node_id in routing_table {
945            // Use first 4 bytes as prefix
946            let prefix = &node_id.hash[..4];
947            prefixes.insert(prefix.to_vec());
948        }
949
950        // Diversity score is ratio of unique prefixes to total nodes
951        prefixes.len() as f64 / routing_table.len() as f64
952    }
953
954    /// Detect suspicious patterns
955    pub async fn detect_suspicious_patterns(&self, routing_table: &[NodeId]) -> bool {
956        let mut patterns = self.patterns.write().await;
957
958        // Check for subnet concentration
959        // In a real implementation, would extract IPs from node descriptors
960        // For now, use hash prefix as proxy
961        patterns.subnet_distribution.clear();
962
963        for node_id in routing_table {
964            let subnet = format!("{:02x}{:02x}", node_id.hash[0], node_id.hash[1]);
965            *patterns.subnet_distribution.entry(subnet).or_insert(0) += 1;
966        }
967
968        // Check if any subnet has too many nodes
969        let max_allowed = (routing_table.len() as f64 * self.config.max_subnet_ratio) as u32;
970        for count in patterns.subnet_distribution.values() {
971            if *count > max_allowed {
972                return true;
973            }
974        }
975
976        false
977    }
978
979    /// Record routing anomaly
980    pub async fn record_anomaly(&self, node_id: NodeId, anomaly_type: AnomalyType) {
981        let mut patterns = self.patterns.write().await;
982
983        patterns.routing_anomalies.push(RoutingAnomaly {
984            _node_id: node_id,
985            _anomaly_type: anomaly_type,
986            timestamp: Instant::now(),
987        });
988
989        // Keep only recent anomalies (last hour)
990        // Use checked_sub to avoid panic on Windows when program uptime < 1 hour
991        if let Some(cutoff) = Instant::now().checked_sub(Duration::from_secs(3600)) {
992            patterns.routing_anomalies.retain(|a| a.timestamp > cutoff);
993        }
994    }
995
996    /// Get detection count
997    pub async fn get_detection_count(&self) -> u64 {
998        self.patterns.read().await.routing_anomalies.len() as u64
999    }
1000}
1001
1002impl IntegrityVerifier {
1003    /// Create new integrity verifier
1004    pub fn new(config: IntegrityConfig) -> Self {
1005        Self {
1006            _config: config,
1007            stats: Arc::new(RwLock::new(VerificationStats::default())),
1008        }
1009    }
1010
1011    /// Verify content hash
1012    pub async fn verify_hash(&self, content: &[u8], expected_hash: &[u8]) -> bool {
1013        let mut stats = self.stats.write().await;
1014        stats.total_verified += 1;
1015
1016        let mut hasher = Sha256::new();
1017        hasher.update(content);
1018        let computed_hash = hasher.finalize();
1019
1020        if computed_hash.as_slice() == expected_hash {
1021            true
1022        } else {
1023            stats.failed_verifications += 1;
1024            stats.invalid_hashes += 1;
1025            false
1026        }
1027    }
1028
1029    /// Verify signature (placeholder)
1030    pub async fn verify_signature(&self, _message: &[u8], _signature: &[u8]) -> bool {
1031        // In a real implementation, would verify ML-DSA signature
1032        // For now, always return true
1033        true
1034    }
1035
1036    /// Get failure count
1037    pub async fn get_failure_count(&self) -> u64 {
1038        self.stats.read().await.failed_verifications
1039    }
1040}
1041
1042impl SecurityAuditor {
1043    /// Create new security auditor
1044    pub fn new(config: AuditConfig) -> Self {
1045        Self {
1046            config,
1047            audit_log: Arc::new(Mutex::new(VecDeque::new())),
1048            event_counts: Arc::new(RwLock::new(HashMap::new())),
1049        }
1050    }
1051
1052    /// Log security event
1053    pub async fn log_event(
1054        &self,
1055        event_type: SecurityEvent,
1056        node_id: Option<NodeId>,
1057        details: String,
1058        severity: Severity,
1059    ) {
1060        if !self.config.enabled {
1061            return;
1062        }
1063
1064        let entry = AuditEntry {
1065            timestamp: SystemTime::now(),
1066            event_type: event_type.clone(),
1067            node_id,
1068            details,
1069            severity,
1070        };
1071
1072        // Add to log
1073        let mut log = self.audit_log.lock().await;
1074        log.push_back(entry);
1075
1076        // Enforce retention
1077        let retention_duration = Duration::from_secs(self.config.retention_days as u64 * 86400);
1078        let cutoff = SystemTime::now() - retention_duration;
1079
1080        while let Some(front) = log.front() {
1081            if front.timestamp < cutoff {
1082                log.pop_front();
1083            } else {
1084                break;
1085            }
1086        }
1087
1088        // Update event counts
1089        let event_name = format!("{event_type:?}");
1090        let mut counts = self.event_counts.write().await;
1091        *counts.entry(event_name).or_insert(0) += 1;
1092    }
1093
1094    /// Get audit entries
1095    pub async fn get_entries(
1096        &self,
1097        since: Option<SystemTime>,
1098        severity_filter: Option<Severity>,
1099    ) -> Vec<AuditEntry> {
1100        let log = self.audit_log.lock().await;
1101
1102        log.iter()
1103            .filter(|entry| {
1104                if let Some(min_time) = since
1105                    && entry.timestamp < min_time
1106                {
1107                    return false;
1108                }
1109                if let Some(min_severity) = severity_filter
1110                    && (entry.severity as u8) < (min_severity as u8)
1111                {
1112                    return false;
1113                }
1114                true
1115            })
1116            .cloned()
1117            .collect()
1118    }
1119
1120    /// Get entry count
1121    pub async fn get_entry_count(&self) -> u64 {
1122        self.audit_log.lock().await.len() as u64
1123    }
1124
1125    /// Export audit report
1126    pub async fn export_report(&self) -> AuditReport {
1127        let entries = self.get_entries(None, None).await;
1128        let event_counts = self.event_counts.read().await.clone();
1129
1130        AuditReport {
1131            generated_at: SystemTime::now(),
1132            total_entries: entries.len(),
1133            event_counts,
1134            severity_breakdown: self.calculate_severity_breakdown(&entries),
1135            recent_critical_events: entries
1136                .iter()
1137                .filter(|e| e.severity == Severity::Critical)
1138                .take(10)
1139                .cloned()
1140                .collect(),
1141        }
1142    }
1143
1144    /// Calculate severity breakdown
1145    fn calculate_severity_breakdown(&self, entries: &[AuditEntry]) -> HashMap<Severity, u64> {
1146        let mut breakdown = HashMap::new();
1147
1148        for entry in entries {
1149            *breakdown.entry(entry.severity).or_insert(0) += 1;
1150        }
1151
1152        breakdown
1153    }
1154}
1155
1156/// Security metrics
1157#[derive(Debug, Clone, Default)]
1158pub struct SecurityMetrics {
1159    pub rate_limit_violations: u64,
1160    pub blacklisted_nodes: usize,
1161    pub verification_failures: u64,
1162    pub eclipse_detections: u64,
1163    pub audit_entries: u64,
1164}
1165
1166/// Audit report
1167#[derive(Debug, Clone, Serialize, Deserialize)]
1168pub struct AuditReport {
1169    pub generated_at: SystemTime,
1170    pub total_entries: usize,
1171    pub event_counts: HashMap<String, u64>,
1172    pub severity_breakdown: HashMap<Severity, u64>,
1173    pub recent_critical_events: Vec<AuditEntry>,
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179
1180    #[tokio::test]
1181    async fn test_rate_limiter_node_limits() {
1182        let config = RateLimitConfig {
1183            node_requests_per_window: 5,
1184            window_duration: Duration::from_secs(1),
1185            ..Default::default()
1186        };
1187
1188        let limiter = RateLimiter::new(config);
1189        let node_id = NodeId { hash: [1u8; 32] };
1190
1191        // Should allow first 5 requests
1192        for _ in 0..5 {
1193            assert!(limiter.check_node_rate(&node_id).await);
1194        }
1195
1196        // 6th request should fail
1197        assert!(!limiter.check_node_rate(&node_id).await);
1198
1199        // Wait for window to reset
1200        tokio::time::sleep(Duration::from_secs(2)).await;
1201
1202        // Should allow again
1203        assert!(limiter.check_node_rate(&node_id).await);
1204    }
1205
1206    #[tokio::test]
1207    async fn test_rate_limiter_memory_bounds() {
1208        let config = RateLimitConfig {
1209            node_requests_per_window: 100,
1210            window_duration: Duration::from_secs(60),
1211            max_tracked_nodes: 10, // Only track 10 nodes
1212            max_tracked_ips: 10,
1213            ..Default::default()
1214        };
1215
1216        let limiter = RateLimiter::new(config);
1217
1218        // Add requests from 20 different nodes
1219        for i in 0..20u8 {
1220            let mut hash = [0u8; 32];
1221            hash[0] = i;
1222            let node_id = NodeId { hash };
1223            limiter.check_node_rate(&node_id).await;
1224        }
1225
1226        // Should have evicted old entries, keeping only max_tracked_nodes
1227        let node_count = limiter.get_tracked_node_count().await;
1228        assert!(
1229            node_count <= 10,
1230            "Expected <= 10 tracked nodes, got {}",
1231            node_count
1232        );
1233    }
1234
1235    #[tokio::test]
1236    async fn test_rate_limiter_ip_memory_bounds() {
1237        let config = RateLimitConfig {
1238            ip_requests_per_window: 100,
1239            window_duration: Duration::from_secs(60),
1240            max_tracked_ips: 10, // Only track 10 IPs
1241            ..Default::default()
1242        };
1243
1244        let limiter = RateLimiter::new(config);
1245
1246        // Add requests from 20 different IPs
1247        for i in 0..20u8 {
1248            let ip: IpAddr = format!("192.168.1.{}", i).parse().unwrap();
1249            limiter.check_ip_rate(&ip).await;
1250        }
1251
1252        // Should have evicted old entries, keeping only max_tracked_ips
1253        let ip_count = limiter.get_tracked_ip_count().await;
1254        assert!(
1255            ip_count <= 10,
1256            "Expected <= 10 tracked IPs, got {}",
1257            ip_count
1258        );
1259    }
1260
1261    #[tokio::test]
1262    async fn test_blacklist_management() {
1263        let config = BlacklistConfig::default();
1264        let blacklist = BlacklistManager::new(config);
1265
1266        let node_id = NodeId { hash: [2u8; 32] };
1267
1268        // Should not be blacklisted initially
1269        assert!(!blacklist.is_blacklisted(&node_id).await);
1270
1271        // Add to blacklist
1272        blacklist
1273            .add_entry(
1274                node_id.clone(),
1275                BlacklistReason::MaliciousBehavior("Test".to_string()),
1276            )
1277            .await;
1278
1279        // Should now be blacklisted
1280        assert!(blacklist.is_blacklisted(&node_id).await);
1281
1282        // Check blacklist size
1283        assert_eq!(blacklist.get_blacklist_size().await, 1);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_eclipse_detection() {
1288        let config = EclipseDetectionConfig {
1289            min_diversity_score: 0.5,
1290            max_subnet_ratio: 0.3,
1291            pattern_threshold: 0.7,
1292        };
1293
1294        let detector = EclipseDetector::new(config);
1295
1296        // Create routing table with low diversity
1297        let mut routing_table = vec![];
1298        for i in 0..10 {
1299            let mut hash = [0u8; 32];
1300            hash[0] = 1; // Same prefix
1301            hash[31] = i;
1302            routing_table.push(NodeId { hash });
1303        }
1304
1305        // Should have low diversity score
1306        let score = detector.calculate_diversity_score(&routing_table).await;
1307        assert!(score < 0.5);
1308
1309        // Create diverse routing table
1310        let mut diverse_table = vec![];
1311        for i in 0..10 {
1312            let mut hash = [0u8; 32];
1313            hash[0] = i * 25; // Different prefixes
1314            diverse_table.push(NodeId { hash });
1315        }
1316
1317        // Should have high diversity score
1318        let diverse_score = detector.calculate_diversity_score(&diverse_table).await;
1319        assert!(diverse_score > 0.8);
1320    }
1321
1322    #[tokio::test]
1323    async fn test_integrity_verification() {
1324        let config = IntegrityConfig::default();
1325        let verifier = IntegrityVerifier::new(config);
1326
1327        let content = b"Test content";
1328        let mut hasher = Sha256::new();
1329        hasher.update(content);
1330        let correct_hash = hasher.finalize();
1331
1332        // Should verify correct hash
1333        assert!(verifier.verify_hash(content, &correct_hash).await);
1334
1335        // Should fail with incorrect hash
1336        let wrong_hash = [0u8; 32];
1337        assert!(!verifier.verify_hash(content, &wrong_hash).await);
1338
1339        // Check failure count
1340        assert_eq!(verifier.get_failure_count().await, 1);
1341    }
1342
1343    #[tokio::test]
1344    async fn test_security_auditor() {
1345        let config = AuditConfig::default();
1346        let auditor = SecurityAuditor::new(config);
1347
1348        // Log some events
1349        auditor
1350            .log_event(
1351                SecurityEvent::RateLimitExceeded,
1352                None,
1353                "Test rate limit".to_string(),
1354                Severity::Warning,
1355            )
1356            .await;
1357
1358        auditor
1359            .log_event(
1360                SecurityEvent::EclipseAttackDetected,
1361                None,
1362                "Test eclipse attack".to_string(),
1363                Severity::Critical,
1364            )
1365            .await;
1366
1367        // Check entry count
1368        assert_eq!(auditor.get_entry_count().await, 2);
1369
1370        // Get critical events
1371        let entries = auditor.get_entries(None, Some(Severity::Critical)).await;
1372        assert_eq!(entries.len(), 1);
1373        assert_eq!(entries[0].severity, Severity::Critical);
1374
1375        // Export report
1376        let report = auditor.export_report().await;
1377        assert_eq!(report.total_entries, 2);
1378        assert_eq!(report.recent_critical_events.len(), 1);
1379    }
1380
1381    #[tokio::test]
1382    async fn test_security_manager_integration() {
1383        let config = SecurityConfig::default();
1384        let identity = NodeIdentity::generate().unwrap();
1385        let manager = SecurityManager::new(config, &identity);
1386
1387        // Test node join validation
1388        // Generate a valid ML-DSA key and derive matching UserId via blake3(pubkey)
1389        let (ml_pub, _ml_sec) = crate::quantum_crypto::generate_ml_dsa_keypair().unwrap();
1390        let derived_hash = blake3::hash(ml_pub.as_bytes());
1391        let derived_id = crate::peer_record::UserId::from_bytes(*derived_hash.as_bytes());
1392        let node = NodeDescriptor {
1393            id: derived_id,
1394            public_key: ml_pub,
1395            addresses: vec![],
1396            hyperbolic: None,
1397            som_position: None,
1398            trust: 0.5,
1399            capabilities: NodeCapabilities {
1400                storage: 100,
1401                compute: 50,
1402                bandwidth: 10,
1403            },
1404        };
1405
1406        // Should pass validation
1407        assert!(manager.validate_node_join(&node).await.is_ok());
1408
1409        // Blacklist the node
1410        manager
1411            .blacklist_node(node.id.clone(), BlacklistReason::Manual("Test".to_string()))
1412            .await;
1413
1414        // Should now fail validation
1415        assert!(matches!(
1416            manager.validate_node_join(&node).await,
1417            Err(SecurityError::Blacklisted)
1418        ));
1419
1420        // Check metrics
1421        let metrics = manager.get_metrics().await;
1422        assert_eq!(metrics.blacklisted_nodes, 1);
1423        assert!(metrics.audit_entries > 0);
1424    }
1425}