oxirs-core 0.2.2

Core RDF and SPARQL functionality for OxiRS - native Rust implementation with zero dependencies
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
//! Byzantine behavior detection systems and security components

#![allow(dead_code)]

use super::types::*;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};

/// Byzantine behavior detection system with advanced threat detection
pub struct ByzantineDetector {
    /// Suspected Byzantine nodes
    suspected_nodes: HashSet<NodeId>,

    /// Message timing anomalies with detailed analysis
    timing_anomalies: HashMap<NodeId, TimingAnalysis>,

    /// Signature verification failures
    signature_failures: HashMap<NodeId, usize>,

    /// Inconsistent message patterns
    inconsistent_patterns: HashMap<NodeId, usize>,

    /// Detection threshold
    detection_threshold: usize,

    /// Network partition detection
    partition_detector: PartitionDetector,

    /// Message replay attack detection
    replay_detector: ReplayDetector,

    /// Equivocation detection (sending different messages for same view/sequence)
    equivocation_detector: EquivocationDetector,

    /// Resource exhaustion attack detection
    resource_monitor: ResourceMonitor,

    /// Collusion detection between nodes
    collusion_detector: CollusionDetector,
}

/// Advanced timing analysis for Byzantine detection
#[derive(Debug, Clone)]
pub struct TimingAnalysis {
    /// Recent message timestamps
    message_times: VecDeque<Instant>,
    /// Average response time
    avg_response_time: Duration,
    /// Standard deviation of response times
    response_time_stddev: Duration,
    /// Suspicious timing patterns count
    suspicious_patterns: usize,
}

/// Network partition detection system
#[derive(Debug, Clone)]
pub struct PartitionDetector {
    /// Last communication time with each node
    last_communication: HashMap<NodeId, Instant>,
    /// Suspected partitioned nodes
    partitioned_nodes: HashSet<NodeId>,
    /// Partition timeout threshold
    partition_timeout: Duration,
}

/// Replay attack detection system
#[derive(Debug, Clone)]
pub struct ReplayDetector {
    /// Recently seen message hashes with timestamps
    seen_messages: HashMap<Vec<u8>, Instant>,
    /// Replay attack threshold
    replay_window: Duration,
    /// Detected replay attempts
    replay_attempts: HashMap<NodeId, usize>,
}

/// Type alias for complex message storage type
type NodeMessageStore = HashMap<NodeId, HashMap<(ViewNumber, SequenceNumber), Vec<Vec<u8>>>>;

/// Equivocation detection system
#[derive(Debug, Clone)]
pub struct EquivocationDetector {
    /// Messages per view/sequence from each node
    node_messages: NodeMessageStore,
    /// Detected equivocations
    equivocations: HashMap<NodeId, usize>,
}

/// Resource exhaustion monitoring
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ResourceMonitor {
    /// Message rate per node
    message_rates: HashMap<NodeId, VecDeque<Instant>>,
    /// Rate limit threshold (messages per second)
    rate_limit: f64,
    /// Memory usage tracking
    memory_usage: HashMap<NodeId, usize>,
    /// Detected resource attacks
    resource_attacks: HashMap<NodeId, usize>,
}

/// Collusion detection between Byzantine nodes
#[derive(Debug, Clone)]
pub struct CollusionDetector {
    /// Coordinated behavior patterns
    coordination_patterns: HashMap<Vec<NodeId>, usize>,
    /// Simultaneous actions tracking
    simultaneous_actions: VecDeque<(Instant, Vec<NodeId>)>,
    /// Collusion threshold
    collusion_threshold: usize,
}

impl Default for PartitionDetector {
    fn default() -> Self {
        Self::new()
    }
}

impl PartitionDetector {
    pub fn new() -> Self {
        Self {
            last_communication: HashMap::new(),
            partitioned_nodes: HashSet::new(),
            partition_timeout: Duration::from_secs(30),
        }
    }
}

impl Default for ReplayDetector {
    fn default() -> Self {
        Self::new()
    }
}

impl ReplayDetector {
    pub fn new() -> Self {
        Self {
            seen_messages: HashMap::new(),
            replay_window: Duration::from_secs(60), // 1 minute window
            replay_attempts: HashMap::new(),
        }
    }
}

impl Default for EquivocationDetector {
    fn default() -> Self {
        Self::new()
    }
}

impl EquivocationDetector {
    pub fn new() -> Self {
        Self {
            node_messages: HashMap::new(),
            equivocations: HashMap::new(),
        }
    }
}

impl Default for ResourceMonitor {
    fn default() -> Self {
        Self::new()
    }
}

impl ResourceMonitor {
    pub fn new() -> Self {
        Self {
            message_rates: HashMap::new(),
            rate_limit: 100.0, // 100 messages per second default limit
            memory_usage: HashMap::new(),
            resource_attacks: HashMap::new(),
        }
    }
}

impl Default for CollusionDetector {
    fn default() -> Self {
        Self::new()
    }
}

impl CollusionDetector {
    pub fn new() -> Self {
        Self {
            coordination_patterns: HashMap::new(),
            simultaneous_actions: VecDeque::new(),
            collusion_threshold: 5, // 5 coordinated actions trigger suspicion
        }
    }
}

impl ByzantineDetector {
    pub fn new(detection_threshold: usize) -> Self {
        Self {
            suspected_nodes: HashSet::new(),
            timing_anomalies: HashMap::new(),
            signature_failures: HashMap::new(),
            inconsistent_patterns: HashMap::new(),
            detection_threshold,
            partition_detector: PartitionDetector::new(),
            replay_detector: ReplayDetector::new(),
            equivocation_detector: EquivocationDetector::new(),
            resource_monitor: ResourceMonitor::new(),
            collusion_detector: CollusionDetector::new(),
        }
    }

    /// Report advanced timing anomaly with detailed analysis
    pub fn report_timing_anomaly(&mut self, node_id: NodeId, response_time: Duration) {
        let now = Instant::now();

        // First, update/create the timing analysis
        {
            let analysis = self
                .timing_anomalies
                .entry(node_id)
                .or_insert_with(|| TimingAnalysis {
                    message_times: VecDeque::new(),
                    avg_response_time: Duration::from_millis(100), // Default
                    response_time_stddev: Duration::from_millis(50),
                    suspicious_patterns: 0,
                });

            analysis.message_times.push_back(now);

            // Keep only recent timing data (last 100 messages)
            while analysis.message_times.len() > 100 {
                analysis.message_times.pop_front();
            }
        }

        // Update statistics (separate borrow)
        self.update_timing_statistics(node_id, response_time);

        // Detect suspicious patterns and update if needed
        let is_suspicious = self.detect_timing_attack(node_id, response_time);
        if is_suspicious {
            if let Some(analysis) = self.timing_anomalies.get_mut(&node_id) {
                analysis.suspicious_patterns += 1;
                if analysis.suspicious_patterns >= self.detection_threshold {
                    self.suspected_nodes.insert(node_id);
                    tracing::warn!("Node {} suspected of timing attacks", node_id);
                }
            }
        }
    }

    /// Detect potential timing-based attacks
    fn detect_timing_attack(&self, node_id: NodeId, response_time: Duration) -> bool {
        if let Some(analysis) = self.timing_anomalies.get(&node_id) {
            // Check for extremely fast responses (potential pre-computation)
            if response_time < Duration::from_millis(1) {
                return true;
            }

            // Check for extremely slow responses (potential DoS)
            if response_time > analysis.avg_response_time + 3 * analysis.response_time_stddev {
                return true;
            }

            // Check for suspiciously regular timing (potential automation)
            if analysis.message_times.len() >= 10 {
                let intervals: Vec<_> = analysis
                    .message_times
                    .iter()
                    .zip(analysis.message_times.iter().skip(1))
                    .map(|(a, b)| b.duration_since(*a))
                    .collect();

                // If all intervals are too similar, it's suspicious
                if let (Some(&min), Some(&max)) = (intervals.iter().min(), intervals.iter().max()) {
                    if max - min < Duration::from_millis(10) && intervals.len() >= 5 {
                        return true;
                    }
                }
            }
        }
        false
    }

    /// Update timing statistics for a node
    fn update_timing_statistics(&mut self, node_id: NodeId, response_time: Duration) {
        if let Some(analysis) = self.timing_anomalies.get_mut(&node_id) {
            // Simple exponential moving average
            let alpha = 0.1;
            let new_time = response_time.as_millis() as f64;
            let old_avg = analysis.avg_response_time.as_millis() as f64;
            let new_avg = alpha * new_time + (1.0 - alpha) * old_avg;
            analysis.avg_response_time = Duration::from_millis(new_avg as u64);
        }
    }

    pub fn report_signature_failure(&mut self, node_id: NodeId) {
        *self.signature_failures.entry(node_id).or_default() += 1;
        if self.signature_failures[&node_id] >= self.detection_threshold {
            self.suspected_nodes.insert(node_id);
            tracing::warn!("Node {} suspected due to signature failures", node_id);
        }
    }

    pub fn report_inconsistent_pattern(&mut self, node_id: NodeId) {
        *self.inconsistent_patterns.entry(node_id).or_default() += 1;
        if self.inconsistent_patterns[&node_id] >= self.detection_threshold {
            self.suspected_nodes.insert(node_id);
            tracing::warn!("Node {} suspected due to inconsistent patterns", node_id);
        }
    }

    /// Check for message replay attacks
    pub fn check_replay_attack(&mut self, node_id: NodeId, message_hash: Vec<u8>) -> bool {
        let now = Instant::now();

        // Clean old entries
        self.replay_detector
            .seen_messages
            .retain(|_, &mut timestamp| {
                now.duration_since(timestamp) <= self.replay_detector.replay_window
            });

        // Check if message was seen recently
        if let Some(&timestamp) = self.replay_detector.seen_messages.get(&message_hash) {
            if now.duration_since(timestamp) <= self.replay_detector.replay_window {
                *self
                    .replay_detector
                    .replay_attempts
                    .entry(node_id)
                    .or_default() += 1;
                if self.replay_detector.replay_attempts[&node_id] >= self.detection_threshold {
                    self.suspected_nodes.insert(node_id);
                    tracing::warn!("Node {} suspected of replay attacks", node_id);
                }
                return true;
            }
        }

        self.replay_detector.seen_messages.insert(message_hash, now);
        false
    }

    /// Detect equivocation (sending different messages for same view/sequence)
    pub fn check_equivocation(
        &mut self,
        node_id: NodeId,
        view: ViewNumber,
        sequence: SequenceNumber,
        message_hash: Vec<u8>,
    ) -> bool {
        let messages = self
            .equivocation_detector
            .node_messages
            .entry(node_id)
            .or_default()
            .entry((view, sequence))
            .or_default();

        // Check if we've seen a different message for this view/sequence
        if !messages.is_empty() && !messages.contains(&message_hash) {
            *self
                .equivocation_detector
                .equivocations
                .entry(node_id)
                .or_default() += 1;
            if self.equivocation_detector.equivocations[&node_id] >= self.detection_threshold {
                self.suspected_nodes.insert(node_id);
                tracing::warn!("Node {} suspected of equivocation", node_id);
            }
            return true;
        }

        messages.push(message_hash);
        false
    }

    /// Monitor resource usage for DoS attacks
    pub fn monitor_resource_usage(&mut self, node_id: NodeId) -> bool {
        let now = Instant::now();
        let rates = self
            .resource_monitor
            .message_rates
            .entry(node_id)
            .or_default();

        rates.push_back(now);

        // Keep only messages from the last second
        while let Some(&front_time) = rates.front() {
            if now.duration_since(front_time) > Duration::from_secs(1) {
                rates.pop_front();
            } else {
                break;
            }
        }

        // Check if rate exceeds threshold
        let current_rate = rates.len() as f64;
        if current_rate > self.resource_monitor.rate_limit {
            *self
                .resource_monitor
                .resource_attacks
                .entry(node_id)
                .or_default() += 1;
            if self.resource_monitor.resource_attacks[&node_id] >= self.detection_threshold {
                self.suspected_nodes.insert(node_id);
                tracing::warn!("Node {} suspected of resource exhaustion attack", node_id);
            }
            return true;
        }

        false
    }

    /// Detect potential collusion between nodes
    pub fn check_collusion(&mut self, coordinating_nodes: Vec<NodeId>) {
        if coordinating_nodes.len() >= 2 {
            let now = Instant::now();

            // Record simultaneous action
            self.collusion_detector
                .simultaneous_actions
                .push_back((now, coordinating_nodes.clone()));

            // Clean old entries (keep last hour)
            while let Some((timestamp, _)) = self.collusion_detector.simultaneous_actions.front() {
                if now.duration_since(*timestamp) > Duration::from_secs(3600) {
                    self.collusion_detector.simultaneous_actions.pop_front();
                } else {
                    break;
                }
            }

            // Check for repeated coordination
            *self
                .collusion_detector
                .coordination_patterns
                .entry(coordinating_nodes.clone())
                .or_default() += 1;

            if self.collusion_detector.coordination_patterns[&coordinating_nodes]
                >= self.collusion_detector.collusion_threshold
            {
                for &node_id in &coordinating_nodes {
                    self.suspected_nodes.insert(node_id);
                }
                tracing::warn!(
                    "Suspected collusion detected between nodes: {:?}",
                    coordinating_nodes
                );
            }
        }
    }

    /// Check network partition status
    pub fn check_network_partition(&mut self, node_id: NodeId) {
        let now = Instant::now();
        self.partition_detector
            .last_communication
            .insert(node_id, now);

        // Check for partitioned nodes
        for (&id, &last_time) in &self.partition_detector.last_communication {
            if now.duration_since(last_time) > self.partition_detector.partition_timeout {
                self.partition_detector.partitioned_nodes.insert(id);
            } else {
                self.partition_detector.partitioned_nodes.remove(&id);
            }
        }
    }

    /// Get comprehensive threat assessment
    pub fn get_threat_assessment(&self, node_id: NodeId) -> ThreatLevel {
        let mut score = 0;

        if self.suspected_nodes.contains(&node_id) {
            score += 10;
        }

        if let Some(failures) = self.signature_failures.get(&node_id) {
            score += failures * 2;
        }

        if let Some(patterns) = self.inconsistent_patterns.get(&node_id) {
            score += patterns;
        }

        if let Some(replays) = self.replay_detector.replay_attempts.get(&node_id) {
            score += replays * 3;
        }

        if let Some(equivocations) = self.equivocation_detector.equivocations.get(&node_id) {
            score += equivocations * 5;
        }

        if let Some(attacks) = self.resource_monitor.resource_attacks.get(&node_id) {
            score += attacks;
        }

        match score {
            0..=2 => ThreatLevel::Low,
            3..=7 => ThreatLevel::Medium,
            8..=15 => ThreatLevel::High,
            _ => ThreatLevel::Critical,
        }
    }

    pub fn is_suspected(&self, node_id: NodeId) -> bool {
        self.suspected_nodes.contains(&node_id)
    }

    pub fn get_suspected_nodes(&self) -> &HashSet<NodeId> {
        &self.suspected_nodes
    }

    pub fn is_partitioned(&self, node_id: NodeId) -> bool {
        self.partition_detector.partitioned_nodes.contains(&node_id)
    }
}