Skip to main content

oxirs_core/distributed/bft/
detection.rs

1//! Byzantine behavior detection systems and security components
2
3#![allow(dead_code)]
4
5use super::types::*;
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::time::{Duration, Instant};
8
9/// Byzantine behavior detection system with advanced threat detection
10pub struct ByzantineDetector {
11    /// Suspected Byzantine nodes
12    suspected_nodes: HashSet<NodeId>,
13
14    /// Message timing anomalies with detailed analysis
15    timing_anomalies: HashMap<NodeId, TimingAnalysis>,
16
17    /// Signature verification failures
18    signature_failures: HashMap<NodeId, usize>,
19
20    /// Inconsistent message patterns
21    inconsistent_patterns: HashMap<NodeId, usize>,
22
23    /// Detection threshold
24    detection_threshold: usize,
25
26    /// Network partition detection
27    partition_detector: PartitionDetector,
28
29    /// Message replay attack detection
30    replay_detector: ReplayDetector,
31
32    /// Equivocation detection (sending different messages for same view/sequence)
33    equivocation_detector: EquivocationDetector,
34
35    /// Resource exhaustion attack detection
36    resource_monitor: ResourceMonitor,
37
38    /// Collusion detection between nodes
39    collusion_detector: CollusionDetector,
40}
41
42/// Advanced timing analysis for Byzantine detection
43#[derive(Debug, Clone)]
44pub struct TimingAnalysis {
45    /// Recent message timestamps
46    message_times: VecDeque<Instant>,
47    /// Average response time
48    avg_response_time: Duration,
49    /// Standard deviation of response times
50    response_time_stddev: Duration,
51    /// Suspicious timing patterns count
52    suspicious_patterns: usize,
53}
54
55/// Network partition detection system
56#[derive(Debug, Clone)]
57pub struct PartitionDetector {
58    /// Last communication time with each node
59    last_communication: HashMap<NodeId, Instant>,
60    /// Suspected partitioned nodes
61    partitioned_nodes: HashSet<NodeId>,
62    /// Partition timeout threshold
63    partition_timeout: Duration,
64}
65
66/// Replay attack detection system
67#[derive(Debug, Clone)]
68pub struct ReplayDetector {
69    /// Recently seen message hashes with timestamps
70    seen_messages: HashMap<Vec<u8>, Instant>,
71    /// Replay attack threshold
72    replay_window: Duration,
73    /// Detected replay attempts
74    replay_attempts: HashMap<NodeId, usize>,
75}
76
77/// Type alias for complex message storage type
78type NodeMessageStore = HashMap<NodeId, HashMap<(ViewNumber, SequenceNumber), Vec<Vec<u8>>>>;
79
80/// Equivocation detection system
81#[derive(Debug, Clone)]
82pub struct EquivocationDetector {
83    /// Messages per view/sequence from each node
84    node_messages: NodeMessageStore,
85    /// Detected equivocations
86    equivocations: HashMap<NodeId, usize>,
87}
88
89/// Resource exhaustion monitoring
90#[derive(Debug, Clone)]
91#[allow(dead_code)]
92pub struct ResourceMonitor {
93    /// Message rate per node
94    message_rates: HashMap<NodeId, VecDeque<Instant>>,
95    /// Rate limit threshold (messages per second)
96    rate_limit: f64,
97    /// Memory usage tracking
98    memory_usage: HashMap<NodeId, usize>,
99    /// Detected resource attacks
100    resource_attacks: HashMap<NodeId, usize>,
101}
102
103/// Collusion detection between Byzantine nodes
104#[derive(Debug, Clone)]
105pub struct CollusionDetector {
106    /// Coordinated behavior patterns
107    coordination_patterns: HashMap<Vec<NodeId>, usize>,
108    /// Simultaneous actions tracking
109    simultaneous_actions: VecDeque<(Instant, Vec<NodeId>)>,
110    /// Collusion threshold
111    collusion_threshold: usize,
112}
113
114impl Default for PartitionDetector {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl PartitionDetector {
121    pub fn new() -> Self {
122        Self {
123            last_communication: HashMap::new(),
124            partitioned_nodes: HashSet::new(),
125            partition_timeout: Duration::from_secs(30),
126        }
127    }
128}
129
130impl Default for ReplayDetector {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl ReplayDetector {
137    pub fn new() -> Self {
138        Self {
139            seen_messages: HashMap::new(),
140            replay_window: Duration::from_secs(60), // 1 minute window
141            replay_attempts: HashMap::new(),
142        }
143    }
144}
145
146impl Default for EquivocationDetector {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl EquivocationDetector {
153    pub fn new() -> Self {
154        Self {
155            node_messages: HashMap::new(),
156            equivocations: HashMap::new(),
157        }
158    }
159}
160
161impl Default for ResourceMonitor {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167impl ResourceMonitor {
168    pub fn new() -> Self {
169        Self {
170            message_rates: HashMap::new(),
171            rate_limit: 100.0, // 100 messages per second default limit
172            memory_usage: HashMap::new(),
173            resource_attacks: HashMap::new(),
174        }
175    }
176}
177
178impl Default for CollusionDetector {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184impl CollusionDetector {
185    pub fn new() -> Self {
186        Self {
187            coordination_patterns: HashMap::new(),
188            simultaneous_actions: VecDeque::new(),
189            collusion_threshold: 5, // 5 coordinated actions trigger suspicion
190        }
191    }
192}
193
194impl ByzantineDetector {
195    pub fn new(detection_threshold: usize) -> Self {
196        Self {
197            suspected_nodes: HashSet::new(),
198            timing_anomalies: HashMap::new(),
199            signature_failures: HashMap::new(),
200            inconsistent_patterns: HashMap::new(),
201            detection_threshold,
202            partition_detector: PartitionDetector::new(),
203            replay_detector: ReplayDetector::new(),
204            equivocation_detector: EquivocationDetector::new(),
205            resource_monitor: ResourceMonitor::new(),
206            collusion_detector: CollusionDetector::new(),
207        }
208    }
209
210    /// Report advanced timing anomaly with detailed analysis
211    pub fn report_timing_anomaly(&mut self, node_id: NodeId, response_time: Duration) {
212        let now = Instant::now();
213
214        // First, update/create the timing analysis
215        {
216            let analysis = self
217                .timing_anomalies
218                .entry(node_id)
219                .or_insert_with(|| TimingAnalysis {
220                    message_times: VecDeque::new(),
221                    avg_response_time: Duration::from_millis(100), // Default
222                    response_time_stddev: Duration::from_millis(50),
223                    suspicious_patterns: 0,
224                });
225
226            analysis.message_times.push_back(now);
227
228            // Keep only recent timing data (last 100 messages)
229            while analysis.message_times.len() > 100 {
230                analysis.message_times.pop_front();
231            }
232        }
233
234        // Update statistics (separate borrow)
235        self.update_timing_statistics(node_id, response_time);
236
237        // Detect suspicious patterns and update if needed
238        let is_suspicious = self.detect_timing_attack(node_id, response_time);
239        if is_suspicious {
240            if let Some(analysis) = self.timing_anomalies.get_mut(&node_id) {
241                analysis.suspicious_patterns += 1;
242                if analysis.suspicious_patterns >= self.detection_threshold {
243                    self.suspected_nodes.insert(node_id);
244                    tracing::warn!("Node {} suspected of timing attacks", node_id);
245                }
246            }
247        }
248    }
249
250    /// Detect potential timing-based attacks
251    fn detect_timing_attack(&self, node_id: NodeId, response_time: Duration) -> bool {
252        if let Some(analysis) = self.timing_anomalies.get(&node_id) {
253            // Check for extremely fast responses (potential pre-computation)
254            if response_time < Duration::from_millis(1) {
255                return true;
256            }
257
258            // Check for extremely slow responses (potential DoS)
259            if response_time > analysis.avg_response_time + 3 * analysis.response_time_stddev {
260                return true;
261            }
262
263            // Check for suspiciously regular timing (potential automation)
264            if analysis.message_times.len() >= 10 {
265                let intervals: Vec<_> = analysis
266                    .message_times
267                    .iter()
268                    .zip(analysis.message_times.iter().skip(1))
269                    .map(|(a, b)| b.duration_since(*a))
270                    .collect();
271
272                // If all intervals are too similar, it's suspicious
273                if let (Some(&min), Some(&max)) = (intervals.iter().min(), intervals.iter().max()) {
274                    if max - min < Duration::from_millis(10) && intervals.len() >= 5 {
275                        return true;
276                    }
277                }
278            }
279        }
280        false
281    }
282
283    /// Update timing statistics for a node
284    fn update_timing_statistics(&mut self, node_id: NodeId, response_time: Duration) {
285        if let Some(analysis) = self.timing_anomalies.get_mut(&node_id) {
286            // Simple exponential moving average
287            let alpha = 0.1;
288            let new_time = response_time.as_millis() as f64;
289            let old_avg = analysis.avg_response_time.as_millis() as f64;
290            let new_avg = alpha * new_time + (1.0 - alpha) * old_avg;
291            analysis.avg_response_time = Duration::from_millis(new_avg as u64);
292        }
293    }
294
295    pub fn report_signature_failure(&mut self, node_id: NodeId) {
296        *self.signature_failures.entry(node_id).or_default() += 1;
297        if self.signature_failures[&node_id] >= self.detection_threshold {
298            self.suspected_nodes.insert(node_id);
299            tracing::warn!("Node {} suspected due to signature failures", node_id);
300        }
301    }
302
303    pub fn report_inconsistent_pattern(&mut self, node_id: NodeId) {
304        *self.inconsistent_patterns.entry(node_id).or_default() += 1;
305        if self.inconsistent_patterns[&node_id] >= self.detection_threshold {
306            self.suspected_nodes.insert(node_id);
307            tracing::warn!("Node {} suspected due to inconsistent patterns", node_id);
308        }
309    }
310
311    /// Check for message replay attacks
312    pub fn check_replay_attack(&mut self, node_id: NodeId, message_hash: Vec<u8>) -> bool {
313        let now = Instant::now();
314
315        // Clean old entries
316        self.replay_detector
317            .seen_messages
318            .retain(|_, &mut timestamp| {
319                now.duration_since(timestamp) <= self.replay_detector.replay_window
320            });
321
322        // Check if message was seen recently
323        if let Some(&timestamp) = self.replay_detector.seen_messages.get(&message_hash) {
324            if now.duration_since(timestamp) <= self.replay_detector.replay_window {
325                *self
326                    .replay_detector
327                    .replay_attempts
328                    .entry(node_id)
329                    .or_default() += 1;
330                if self.replay_detector.replay_attempts[&node_id] >= self.detection_threshold {
331                    self.suspected_nodes.insert(node_id);
332                    tracing::warn!("Node {} suspected of replay attacks", node_id);
333                }
334                return true;
335            }
336        }
337
338        self.replay_detector.seen_messages.insert(message_hash, now);
339        false
340    }
341
342    /// Detect equivocation (sending different messages for same view/sequence)
343    pub fn check_equivocation(
344        &mut self,
345        node_id: NodeId,
346        view: ViewNumber,
347        sequence: SequenceNumber,
348        message_hash: Vec<u8>,
349    ) -> bool {
350        let messages = self
351            .equivocation_detector
352            .node_messages
353            .entry(node_id)
354            .or_default()
355            .entry((view, sequence))
356            .or_default();
357
358        // Check if we've seen a different message for this view/sequence
359        if !messages.is_empty() && !messages.contains(&message_hash) {
360            *self
361                .equivocation_detector
362                .equivocations
363                .entry(node_id)
364                .or_default() += 1;
365            if self.equivocation_detector.equivocations[&node_id] >= self.detection_threshold {
366                self.suspected_nodes.insert(node_id);
367                tracing::warn!("Node {} suspected of equivocation", node_id);
368            }
369            return true;
370        }
371
372        messages.push(message_hash);
373        false
374    }
375
376    /// Monitor resource usage for DoS attacks
377    pub fn monitor_resource_usage(&mut self, node_id: NodeId) -> bool {
378        let now = Instant::now();
379        let rates = self
380            .resource_monitor
381            .message_rates
382            .entry(node_id)
383            .or_default();
384
385        rates.push_back(now);
386
387        // Keep only messages from the last second
388        while let Some(&front_time) = rates.front() {
389            if now.duration_since(front_time) > Duration::from_secs(1) {
390                rates.pop_front();
391            } else {
392                break;
393            }
394        }
395
396        // Check if rate exceeds threshold
397        let current_rate = rates.len() as f64;
398        if current_rate > self.resource_monitor.rate_limit {
399            *self
400                .resource_monitor
401                .resource_attacks
402                .entry(node_id)
403                .or_default() += 1;
404            if self.resource_monitor.resource_attacks[&node_id] >= self.detection_threshold {
405                self.suspected_nodes.insert(node_id);
406                tracing::warn!("Node {} suspected of resource exhaustion attack", node_id);
407            }
408            return true;
409        }
410
411        false
412    }
413
414    /// Detect potential collusion between nodes
415    pub fn check_collusion(&mut self, coordinating_nodes: Vec<NodeId>) {
416        if coordinating_nodes.len() >= 2 {
417            let now = Instant::now();
418
419            // Record simultaneous action
420            self.collusion_detector
421                .simultaneous_actions
422                .push_back((now, coordinating_nodes.clone()));
423
424            // Clean old entries (keep last hour)
425            while let Some((timestamp, _)) = self.collusion_detector.simultaneous_actions.front() {
426                if now.duration_since(*timestamp) > Duration::from_secs(3600) {
427                    self.collusion_detector.simultaneous_actions.pop_front();
428                } else {
429                    break;
430                }
431            }
432
433            // Check for repeated coordination
434            *self
435                .collusion_detector
436                .coordination_patterns
437                .entry(coordinating_nodes.clone())
438                .or_default() += 1;
439
440            if self.collusion_detector.coordination_patterns[&coordinating_nodes]
441                >= self.collusion_detector.collusion_threshold
442            {
443                for &node_id in &coordinating_nodes {
444                    self.suspected_nodes.insert(node_id);
445                }
446                tracing::warn!(
447                    "Suspected collusion detected between nodes: {:?}",
448                    coordinating_nodes
449                );
450            }
451        }
452    }
453
454    /// Check network partition status
455    pub fn check_network_partition(&mut self, node_id: NodeId) {
456        let now = Instant::now();
457        self.partition_detector
458            .last_communication
459            .insert(node_id, now);
460
461        // Check for partitioned nodes
462        for (&id, &last_time) in &self.partition_detector.last_communication {
463            if now.duration_since(last_time) > self.partition_detector.partition_timeout {
464                self.partition_detector.partitioned_nodes.insert(id);
465            } else {
466                self.partition_detector.partitioned_nodes.remove(&id);
467            }
468        }
469    }
470
471    /// Get comprehensive threat assessment
472    pub fn get_threat_assessment(&self, node_id: NodeId) -> ThreatLevel {
473        let mut score = 0;
474
475        if self.suspected_nodes.contains(&node_id) {
476            score += 10;
477        }
478
479        if let Some(failures) = self.signature_failures.get(&node_id) {
480            score += failures * 2;
481        }
482
483        if let Some(patterns) = self.inconsistent_patterns.get(&node_id) {
484            score += patterns;
485        }
486
487        if let Some(replays) = self.replay_detector.replay_attempts.get(&node_id) {
488            score += replays * 3;
489        }
490
491        if let Some(equivocations) = self.equivocation_detector.equivocations.get(&node_id) {
492            score += equivocations * 5;
493        }
494
495        if let Some(attacks) = self.resource_monitor.resource_attacks.get(&node_id) {
496            score += attacks;
497        }
498
499        match score {
500            0..=2 => ThreatLevel::Low,
501            3..=7 => ThreatLevel::Medium,
502            8..=15 => ThreatLevel::High,
503            _ => ThreatLevel::Critical,
504        }
505    }
506
507    pub fn is_suspected(&self, node_id: NodeId) -> bool {
508        self.suspected_nodes.contains(&node_id)
509    }
510
511    pub fn get_suspected_nodes(&self) -> &HashSet<NodeId> {
512        &self.suspected_nodes
513    }
514
515    pub fn is_partitioned(&self, node_id: NodeId) -> bool {
516        self.partition_detector.partitioned_nodes.contains(&node_id)
517    }
518}