oxirs_core/distributed/bft/
detection.rs1#![allow(dead_code)]
4
5use super::types::*;
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::time::{Duration, Instant};
8
9pub struct ByzantineDetector {
11 suspected_nodes: HashSet<NodeId>,
13
14 timing_anomalies: HashMap<NodeId, TimingAnalysis>,
16
17 signature_failures: HashMap<NodeId, usize>,
19
20 inconsistent_patterns: HashMap<NodeId, usize>,
22
23 detection_threshold: usize,
25
26 partition_detector: PartitionDetector,
28
29 replay_detector: ReplayDetector,
31
32 equivocation_detector: EquivocationDetector,
34
35 resource_monitor: ResourceMonitor,
37
38 collusion_detector: CollusionDetector,
40}
41
42#[derive(Debug, Clone)]
44pub struct TimingAnalysis {
45 message_times: VecDeque<Instant>,
47 avg_response_time: Duration,
49 response_time_stddev: Duration,
51 suspicious_patterns: usize,
53}
54
55#[derive(Debug, Clone)]
57pub struct PartitionDetector {
58 last_communication: HashMap<NodeId, Instant>,
60 partitioned_nodes: HashSet<NodeId>,
62 partition_timeout: Duration,
64}
65
66#[derive(Debug, Clone)]
68pub struct ReplayDetector {
69 seen_messages: HashMap<Vec<u8>, Instant>,
71 replay_window: Duration,
73 replay_attempts: HashMap<NodeId, usize>,
75}
76
77type NodeMessageStore = HashMap<NodeId, HashMap<(ViewNumber, SequenceNumber), Vec<Vec<u8>>>>;
79
80#[derive(Debug, Clone)]
82pub struct EquivocationDetector {
83 node_messages: NodeMessageStore,
85 equivocations: HashMap<NodeId, usize>,
87}
88
89#[derive(Debug, Clone)]
91#[allow(dead_code)]
92pub struct ResourceMonitor {
93 message_rates: HashMap<NodeId, VecDeque<Instant>>,
95 rate_limit: f64,
97 memory_usage: HashMap<NodeId, usize>,
99 resource_attacks: HashMap<NodeId, usize>,
101}
102
103#[derive(Debug, Clone)]
105pub struct CollusionDetector {
106 coordination_patterns: HashMap<Vec<NodeId>, usize>,
108 simultaneous_actions: VecDeque<(Instant, Vec<NodeId>)>,
110 collusion_threshold: usize,
112}
113
114impl Default for PartitionDetector {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl PartitionDetector {
121 pub fn new() -> Self {
122 Self {
123 last_communication: HashMap::new(),
124 partitioned_nodes: HashSet::new(),
125 partition_timeout: Duration::from_secs(30),
126 }
127 }
128}
129
130impl Default for ReplayDetector {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136impl ReplayDetector {
137 pub fn new() -> Self {
138 Self {
139 seen_messages: HashMap::new(),
140 replay_window: Duration::from_secs(60), replay_attempts: HashMap::new(),
142 }
143 }
144}
145
146impl Default for EquivocationDetector {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl EquivocationDetector {
153 pub fn new() -> Self {
154 Self {
155 node_messages: HashMap::new(),
156 equivocations: HashMap::new(),
157 }
158 }
159}
160
161impl Default for ResourceMonitor {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl ResourceMonitor {
168 pub fn new() -> Self {
169 Self {
170 message_rates: HashMap::new(),
171 rate_limit: 100.0, memory_usage: HashMap::new(),
173 resource_attacks: HashMap::new(),
174 }
175 }
176}
177
178impl Default for CollusionDetector {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184impl CollusionDetector {
185 pub fn new() -> Self {
186 Self {
187 coordination_patterns: HashMap::new(),
188 simultaneous_actions: VecDeque::new(),
189 collusion_threshold: 5, }
191 }
192}
193
194impl ByzantineDetector {
195 pub fn new(detection_threshold: usize) -> Self {
196 Self {
197 suspected_nodes: HashSet::new(),
198 timing_anomalies: HashMap::new(),
199 signature_failures: HashMap::new(),
200 inconsistent_patterns: HashMap::new(),
201 detection_threshold,
202 partition_detector: PartitionDetector::new(),
203 replay_detector: ReplayDetector::new(),
204 equivocation_detector: EquivocationDetector::new(),
205 resource_monitor: ResourceMonitor::new(),
206 collusion_detector: CollusionDetector::new(),
207 }
208 }
209
210 pub fn report_timing_anomaly(&mut self, node_id: NodeId, response_time: Duration) {
212 let now = Instant::now();
213
214 {
216 let analysis = self
217 .timing_anomalies
218 .entry(node_id)
219 .or_insert_with(|| TimingAnalysis {
220 message_times: VecDeque::new(),
221 avg_response_time: Duration::from_millis(100), response_time_stddev: Duration::from_millis(50),
223 suspicious_patterns: 0,
224 });
225
226 analysis.message_times.push_back(now);
227
228 while analysis.message_times.len() > 100 {
230 analysis.message_times.pop_front();
231 }
232 }
233
234 self.update_timing_statistics(node_id, response_time);
236
237 let is_suspicious = self.detect_timing_attack(node_id, response_time);
239 if is_suspicious {
240 if let Some(analysis) = self.timing_anomalies.get_mut(&node_id) {
241 analysis.suspicious_patterns += 1;
242 if analysis.suspicious_patterns >= self.detection_threshold {
243 self.suspected_nodes.insert(node_id);
244 tracing::warn!("Node {} suspected of timing attacks", node_id);
245 }
246 }
247 }
248 }
249
250 fn detect_timing_attack(&self, node_id: NodeId, response_time: Duration) -> bool {
252 if let Some(analysis) = self.timing_anomalies.get(&node_id) {
253 if response_time < Duration::from_millis(1) {
255 return true;
256 }
257
258 if response_time > analysis.avg_response_time + 3 * analysis.response_time_stddev {
260 return true;
261 }
262
263 if analysis.message_times.len() >= 10 {
265 let intervals: Vec<_> = analysis
266 .message_times
267 .iter()
268 .zip(analysis.message_times.iter().skip(1))
269 .map(|(a, b)| b.duration_since(*a))
270 .collect();
271
272 if let (Some(&min), Some(&max)) = (intervals.iter().min(), intervals.iter().max()) {
274 if max - min < Duration::from_millis(10) && intervals.len() >= 5 {
275 return true;
276 }
277 }
278 }
279 }
280 false
281 }
282
283 fn update_timing_statistics(&mut self, node_id: NodeId, response_time: Duration) {
285 if let Some(analysis) = self.timing_anomalies.get_mut(&node_id) {
286 let alpha = 0.1;
288 let new_time = response_time.as_millis() as f64;
289 let old_avg = analysis.avg_response_time.as_millis() as f64;
290 let new_avg = alpha * new_time + (1.0 - alpha) * old_avg;
291 analysis.avg_response_time = Duration::from_millis(new_avg as u64);
292 }
293 }
294
295 pub fn report_signature_failure(&mut self, node_id: NodeId) {
296 *self.signature_failures.entry(node_id).or_default() += 1;
297 if self.signature_failures[&node_id] >= self.detection_threshold {
298 self.suspected_nodes.insert(node_id);
299 tracing::warn!("Node {} suspected due to signature failures", node_id);
300 }
301 }
302
303 pub fn report_inconsistent_pattern(&mut self, node_id: NodeId) {
304 *self.inconsistent_patterns.entry(node_id).or_default() += 1;
305 if self.inconsistent_patterns[&node_id] >= self.detection_threshold {
306 self.suspected_nodes.insert(node_id);
307 tracing::warn!("Node {} suspected due to inconsistent patterns", node_id);
308 }
309 }
310
311 pub fn check_replay_attack(&mut self, node_id: NodeId, message_hash: Vec<u8>) -> bool {
313 let now = Instant::now();
314
315 self.replay_detector
317 .seen_messages
318 .retain(|_, &mut timestamp| {
319 now.duration_since(timestamp) <= self.replay_detector.replay_window
320 });
321
322 if let Some(×tamp) = self.replay_detector.seen_messages.get(&message_hash) {
324 if now.duration_since(timestamp) <= self.replay_detector.replay_window {
325 *self
326 .replay_detector
327 .replay_attempts
328 .entry(node_id)
329 .or_default() += 1;
330 if self.replay_detector.replay_attempts[&node_id] >= self.detection_threshold {
331 self.suspected_nodes.insert(node_id);
332 tracing::warn!("Node {} suspected of replay attacks", node_id);
333 }
334 return true;
335 }
336 }
337
338 self.replay_detector.seen_messages.insert(message_hash, now);
339 false
340 }
341
342 pub fn check_equivocation(
344 &mut self,
345 node_id: NodeId,
346 view: ViewNumber,
347 sequence: SequenceNumber,
348 message_hash: Vec<u8>,
349 ) -> bool {
350 let messages = self
351 .equivocation_detector
352 .node_messages
353 .entry(node_id)
354 .or_default()
355 .entry((view, sequence))
356 .or_default();
357
358 if !messages.is_empty() && !messages.contains(&message_hash) {
360 *self
361 .equivocation_detector
362 .equivocations
363 .entry(node_id)
364 .or_default() += 1;
365 if self.equivocation_detector.equivocations[&node_id] >= self.detection_threshold {
366 self.suspected_nodes.insert(node_id);
367 tracing::warn!("Node {} suspected of equivocation", node_id);
368 }
369 return true;
370 }
371
372 messages.push(message_hash);
373 false
374 }
375
376 pub fn monitor_resource_usage(&mut self, node_id: NodeId) -> bool {
378 let now = Instant::now();
379 let rates = self
380 .resource_monitor
381 .message_rates
382 .entry(node_id)
383 .or_default();
384
385 rates.push_back(now);
386
387 while let Some(&front_time) = rates.front() {
389 if now.duration_since(front_time) > Duration::from_secs(1) {
390 rates.pop_front();
391 } else {
392 break;
393 }
394 }
395
396 let current_rate = rates.len() as f64;
398 if current_rate > self.resource_monitor.rate_limit {
399 *self
400 .resource_monitor
401 .resource_attacks
402 .entry(node_id)
403 .or_default() += 1;
404 if self.resource_monitor.resource_attacks[&node_id] >= self.detection_threshold {
405 self.suspected_nodes.insert(node_id);
406 tracing::warn!("Node {} suspected of resource exhaustion attack", node_id);
407 }
408 return true;
409 }
410
411 false
412 }
413
414 pub fn check_collusion(&mut self, coordinating_nodes: Vec<NodeId>) {
416 if coordinating_nodes.len() >= 2 {
417 let now = Instant::now();
418
419 self.collusion_detector
421 .simultaneous_actions
422 .push_back((now, coordinating_nodes.clone()));
423
424 while let Some((timestamp, _)) = self.collusion_detector.simultaneous_actions.front() {
426 if now.duration_since(*timestamp) > Duration::from_secs(3600) {
427 self.collusion_detector.simultaneous_actions.pop_front();
428 } else {
429 break;
430 }
431 }
432
433 *self
435 .collusion_detector
436 .coordination_patterns
437 .entry(coordinating_nodes.clone())
438 .or_default() += 1;
439
440 if self.collusion_detector.coordination_patterns[&coordinating_nodes]
441 >= self.collusion_detector.collusion_threshold
442 {
443 for &node_id in &coordinating_nodes {
444 self.suspected_nodes.insert(node_id);
445 }
446 tracing::warn!(
447 "Suspected collusion detected between nodes: {:?}",
448 coordinating_nodes
449 );
450 }
451 }
452 }
453
454 pub fn check_network_partition(&mut self, node_id: NodeId) {
456 let now = Instant::now();
457 self.partition_detector
458 .last_communication
459 .insert(node_id, now);
460
461 for (&id, &last_time) in &self.partition_detector.last_communication {
463 if now.duration_since(last_time) > self.partition_detector.partition_timeout {
464 self.partition_detector.partitioned_nodes.insert(id);
465 } else {
466 self.partition_detector.partitioned_nodes.remove(&id);
467 }
468 }
469 }
470
471 pub fn get_threat_assessment(&self, node_id: NodeId) -> ThreatLevel {
473 let mut score = 0;
474
475 if self.suspected_nodes.contains(&node_id) {
476 score += 10;
477 }
478
479 if let Some(failures) = self.signature_failures.get(&node_id) {
480 score += failures * 2;
481 }
482
483 if let Some(patterns) = self.inconsistent_patterns.get(&node_id) {
484 score += patterns;
485 }
486
487 if let Some(replays) = self.replay_detector.replay_attempts.get(&node_id) {
488 score += replays * 3;
489 }
490
491 if let Some(equivocations) = self.equivocation_detector.equivocations.get(&node_id) {
492 score += equivocations * 5;
493 }
494
495 if let Some(attacks) = self.resource_monitor.resource_attacks.get(&node_id) {
496 score += attacks;
497 }
498
499 match score {
500 0..=2 => ThreatLevel::Low,
501 3..=7 => ThreatLevel::Medium,
502 8..=15 => ThreatLevel::High,
503 _ => ThreatLevel::Critical,
504 }
505 }
506
507 pub fn is_suspected(&self, node_id: NodeId) -> bool {
508 self.suspected_nodes.contains(&node_id)
509 }
510
511 pub fn get_suspected_nodes(&self) -> &HashSet<NodeId> {
512 &self.suspected_nodes
513 }
514
515 pub fn is_partitioned(&self, node_id: NodeId) -> bool {
516 self.partition_detector.partitioned_nodes.contains(&node_id)
517 }
518}