use crate::core::protocol::ProtocolType;
use crate::error::{DetectorError, Result};
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct AnalysisResult {
pub stream_id: String,
pub timestamp: Instant,
pub features: StreamFeatures,
pub patterns: Vec<BehaviorPattern>,
pub anomalies: Vec<Anomaly>,
pub performance: PerformanceMetrics,
pub confidence: f64,
}
#[derive(Debug, Clone, Default)]
pub struct StreamFeatures {
pub packet_size_distribution: PacketSizeDistribution,
pub timing_distribution: TimingDistribution,
pub byte_frequency: ByteFrequency,
pub entropy: f64,
pub compression_ratio: f64,
pub periodicity: PeriodicityFeatures,
pub protocol_features: ProtocolFeatures,
}
#[derive(Debug, Clone, Default)]
pub struct PacketSizeDistribution {
pub min_size: usize,
pub max_size: usize,
pub mean_size: f64,
pub std_dev: f64,
pub median: usize,
pub histogram: HashMap<usize, usize>,
}
#[derive(Debug, Clone, Default)]
pub struct TimingDistribution {
pub min_interval: Duration,
pub max_interval: Duration,
pub mean_interval: Duration,
pub std_dev: Duration,
pub histogram: HashMap<u64, usize>, }
#[derive(Debug, Clone)]
pub struct ByteFrequency {
pub frequencies: [usize; 256],
pub most_common: u8,
pub least_common: u8,
pub unique_bytes: usize,
}
impl Default for ByteFrequency {
fn default() -> Self {
Self {
frequencies: [0; 256],
most_common: 0,
least_common: 0,
unique_bytes: 0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PeriodicityFeatures {
pub has_periodicity: bool,
pub period_length: Option<Duration>,
pub period_strength: f64,
pub period_variance: f64,
}
#[derive(Debug, Clone, Default)]
pub struct ProtocolFeatures {
pub detected_protocols: Vec<ProtocolType>,
pub protocol_confidence: HashMap<ProtocolType, f64>,
pub feature_vector: Vec<f64>,
pub signatures: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct BehaviorPattern {
pub pattern_type: PatternType,
pub description: String,
pub confidence: f64,
pub start_time: Instant,
pub duration: Duration,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PatternType {
BurstTraffic,
PeriodicTraffic,
GradualIncrease,
GradualDecrease,
IdlePeriod,
AnomalousSpike,
ProtocolSwitch,
EncryptedTraffic,
CompressedTraffic,
Custom(String),
}
#[derive(Debug, Clone)]
pub struct Anomaly {
pub anomaly_type: AnomalyType,
pub severity: AnomalySeverity,
pub description: String,
pub detected_at: Instant,
pub value: f64,
pub expected_value: f64,
pub deviation: f64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AnomalyType {
SizeAnomaly,
TimingAnomaly,
FrequencyAnomaly,
EntropyAnomaly,
PatternAnomaly,
ProtocolAnomaly,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum AnomalySeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Default)]
pub struct PerformanceMetrics {
pub throughput: f64,
pub latency: Duration,
pub jitter: Duration,
pub packet_loss_rate: f64,
pub bandwidth_utilization: f64,
pub quality_score: f64,
}
#[derive(Debug, Clone)]
pub struct AnalyzerConfig {
pub window_size: Duration,
pub min_samples: usize,
pub anomaly_threshold: f64,
pub enable_deep_analysis: bool,
pub enable_pattern_detection: bool,
pub enable_anomaly_detection: bool,
pub history_retention: Duration,
pub analysis_interval: Duration,
}
impl Default for AnalyzerConfig {
fn default() -> Self {
Self {
window_size: Duration::from_secs(60),
min_samples: 10,
anomaly_threshold: 2.0, enable_deep_analysis: true,
enable_pattern_detection: true,
enable_anomaly_detection: true,
history_retention: Duration::from_secs(3600), analysis_interval: Duration::from_secs(10),
}
}
}
#[derive(Debug)]
pub struct StreamAnalyzer {
config: AnalyzerConfig,
history: HashMap<String, StreamHistory>,
stats: AnalyzerStats,
}
#[derive(Debug)]
struct StreamHistory {
data_points: Vec<DataPoint>,
last_analysis: Option<Instant>,
accumulated_features: StreamFeatures,
}
#[derive(Debug, Clone)]
struct DataPoint {
timestamp: Instant,
size: usize,
sample: Vec<u8>,
}
#[derive(Debug, Clone, Default)]
pub struct AnalyzerStats {
pub analysis_count: usize,
pub patterns_detected: usize,
pub anomalies_detected: usize,
pub average_analysis_time: Duration,
pub total_bytes_analyzed: usize,
}
impl StreamAnalyzer {
pub fn new(config: AnalyzerConfig) -> Self {
Self {
config,
history: HashMap::new(),
stats: AnalyzerStats::default(),
}
}
pub fn add_data_point(&mut self, stream_id: String, data: &[u8]) {
let sample_size = std::cmp::min(data.len(), 1024);
let data_point = DataPoint {
timestamp: Instant::now(),
size: data.len(),
sample: data[..sample_size].to_vec(),
};
let history = self.history.entry(stream_id).or_insert_with(|| StreamHistory {
data_points: Vec::new(),
last_analysis: None,
accumulated_features: StreamFeatures::default(),
});
history.data_points.push(data_point);
let cutoff_time = Instant::now() - self.config.history_retention;
history.data_points.retain(|dp| dp.timestamp > cutoff_time);
self.stats.total_bytes_analyzed += data.len();
}
pub fn analyze_stream(&mut self, stream_id: &str) -> Result<AnalysisResult> {
let start_time = Instant::now();
let data_points = {
let history = self.history.get(stream_id)
.ok_or_else(|| DetectorError::config_error(
format!("Stream not found: {}", stream_id)
))?;
if history.data_points.len() < self.config.min_samples {
return Err(DetectorError::NeedMoreData(
self.config.min_samples - history.data_points.len()
));
}
history.data_points.clone()
};
let features = self.extract_features(&data_points)?;
let patterns = if self.config.enable_pattern_detection {
self.detect_patterns(&data_points, &features)?
} else {
Vec::new()
};
let anomalies = if self.config.enable_anomaly_detection {
self.detect_anomalies(&data_points, &features)?
} else {
Vec::new()
};
let performance = self.calculate_performance_metrics(&data_points)?;
let confidence = self.calculate_confidence(&features, &patterns, &anomalies);
if let Some(history) = self.history.get_mut(stream_id) {
history.last_analysis = Some(Instant::now());
history.accumulated_features = features.clone();
}
self.stats.analysis_count += 1;
self.stats.patterns_detected += patterns.len();
self.stats.anomalies_detected += anomalies.len();
self.update_average_analysis_time(start_time.elapsed());
Ok(AnalysisResult {
stream_id: stream_id.to_string(),
timestamp: Instant::now(),
features,
patterns,
anomalies,
performance,
confidence,
})
}
fn extract_features(&self, data_points: &[DataPoint]) -> Result<StreamFeatures> {
let mut features = StreamFeatures::default();
if data_points.is_empty() {
return Ok(features);
}
features.packet_size_distribution = self.calculate_packet_size_distribution(data_points);
features.timing_distribution = self.calculate_timing_distribution(data_points);
features.byte_frequency = self.calculate_byte_frequency(data_points);
features.entropy = self.calculate_entropy(data_points);
features.compression_ratio = self.calculate_compression_ratio(data_points);
features.periodicity = self.detect_periodicity(data_points);
features.protocol_features = self.extract_protocol_features(data_points);
Ok(features)
}
fn calculate_packet_size_distribution(&self, data_points: &[DataPoint]) -> PacketSizeDistribution {
let sizes: Vec<usize> = data_points.iter().map(|dp| dp.size).collect();
if sizes.is_empty() {
return PacketSizeDistribution::default();
}
let min_size = *sizes.iter().min().unwrap();
let max_size = *sizes.iter().max().unwrap();
let mean_size = sizes.iter().sum::<usize>() as f64 / sizes.len() as f64;
let variance = sizes.iter()
.map(|&size| (size as f64 - mean_size).powi(2))
.sum::<f64>() / sizes.len() as f64;
let std_dev = variance.sqrt();
let mut sorted_sizes = sizes.clone();
sorted_sizes.sort_unstable();
let median = sorted_sizes[sorted_sizes.len() / 2];
let mut histogram = HashMap::new();
for &size in &sizes {
*histogram.entry(size).or_insert(0) += 1;
}
PacketSizeDistribution {
min_size,
max_size,
mean_size,
std_dev,
median,
histogram,
}
}
fn calculate_timing_distribution(&self, data_points: &[DataPoint]) -> TimingDistribution {
if data_points.len() < 2 {
return TimingDistribution::default();
}
let intervals: Vec<Duration> = data_points.windows(2)
.map(|window| window[1].timestamp.duration_since(window[0].timestamp))
.collect();
let min_interval = intervals.iter().min().cloned().unwrap_or_default();
let max_interval = intervals.iter().max().cloned().unwrap_or_default();
let mean_nanos = intervals.iter()
.map(|d| d.as_nanos())
.sum::<u128>() / intervals.len() as u128;
let mean_interval = Duration::from_nanos(mean_nanos as u64);
let variance_nanos = intervals.iter()
.map(|d| (d.as_nanos() as i128 - mean_nanos as i128).pow(2) as u128)
.sum::<u128>() / intervals.len() as u128;
let std_dev = Duration::from_nanos((variance_nanos as f64).sqrt() as u64);
let mut histogram = HashMap::new();
for interval in &intervals {
let millis = interval.as_millis() as u64;
*histogram.entry(millis).or_insert(0) += 1;
}
TimingDistribution {
min_interval,
max_interval,
mean_interval,
std_dev,
histogram,
}
}
fn calculate_byte_frequency(&self, data_points: &[DataPoint]) -> ByteFrequency {
let mut frequencies = [0usize; 256];
let mut total_bytes = 0;
for data_point in data_points {
for &byte in &data_point.sample {
frequencies[byte as usize] += 1;
total_bytes += 1;
}
}
if total_bytes == 0 {
return ByteFrequency::default();
}
let mut most_common = 0u8;
let mut least_common = 0u8;
let mut max_freq = 0;
let mut min_freq = usize::MAX;
for (byte, &freq) in frequencies.iter().enumerate() {
if freq > 0 {
if freq > max_freq {
max_freq = freq;
most_common = byte as u8;
}
if freq < min_freq {
min_freq = freq;
least_common = byte as u8;
}
}
}
let unique_bytes = frequencies.iter().filter(|&&freq| freq > 0).count();
ByteFrequency {
frequencies,
most_common,
least_common,
unique_bytes,
}
}
fn calculate_entropy(&self, data_points: &[DataPoint]) -> f64 {
let byte_freq = self.calculate_byte_frequency(data_points);
let total_bytes: usize = byte_freq.frequencies.iter().sum();
if total_bytes == 0 {
return 0.0;
}
let mut entropy = 0.0;
for &freq in &byte_freq.frequencies {
if freq > 0 {
let probability = freq as f64 / total_bytes as f64;
entropy -= probability * probability.log2();
}
}
entropy
}
fn calculate_compression_ratio(&self, data_points: &[DataPoint]) -> f64 {
if data_points.is_empty() {
return 1.0;
}
let entropy = self.calculate_entropy(data_points);
let max_entropy = 8.0;
if max_entropy == 0.0 {
1.0
} else {
entropy / max_entropy
}
}
fn detect_periodicity(&self, data_points: &[DataPoint]) -> PeriodicityFeatures {
if data_points.len() < 10 {
return PeriodicityFeatures::default();
}
let sizes: Vec<usize> = data_points.iter().map(|dp| dp.size).collect();
let mut max_correlation = 0.0;
let mut best_period = None;
for period in 2..std::cmp::min(sizes.len() / 2, 100) {
let correlation = self.calculate_autocorrelation(&sizes, period);
if correlation > max_correlation {
max_correlation = correlation;
if correlation > 0.5 { best_period = Some(Duration::from_secs(period as u64));
}
}
}
PeriodicityFeatures {
has_periodicity: best_period.is_some(),
period_length: best_period,
period_strength: max_correlation,
period_variance: 0.0, }
}
fn calculate_autocorrelation(&self, data: &[usize], lag: usize) -> f64 {
if data.len() <= lag {
return 0.0;
}
let n = data.len() - lag;
let mean: f64 = data.iter().sum::<usize>() as f64 / data.len() as f64;
let mut numerator = 0.0;
let mut denominator = 0.0;
for i in 0..n {
let x = data[i] as f64 - mean;
let y = data[i + lag] as f64 - mean;
numerator += x * y;
denominator += x * x;
}
if denominator == 0.0 {
0.0
} else {
numerator / denominator
}
}
fn extract_protocol_features(&self, data_points: &[DataPoint]) -> ProtocolFeatures {
let mut features = ProtocolFeatures::default();
for data_point in data_points {
if !data_point.sample.is_empty() {
if self.has_http_signature(&data_point.sample) {
features.detected_protocols.push(ProtocolType::HTTP1_1);
features.protocol_confidence.insert(ProtocolType::HTTP1_1, 0.8);
}
if self.has_tls_signature(&data_point.sample) {
features.detected_protocols.push(ProtocolType::TLS);
features.protocol_confidence.insert(ProtocolType::TLS, 0.7);
}
}
}
features.detected_protocols.sort();
features.detected_protocols.dedup();
features
}
fn has_http_signature(&self, data: &[u8]) -> bool {
let data_str = String::from_utf8_lossy(data).to_lowercase();
data_str.contains("http/") ||
data_str.starts_with("get ") ||
data_str.starts_with("post ") ||
data_str.starts_with("put ") ||
data_str.starts_with("delete ")
}
fn has_tls_signature(&self, data: &[u8]) -> bool {
if data.len() < 6 {
return false;
}
data[0] >= 20 && data[0] <= 24 && data[1] == 3 && (data[2] >= 1 && data[2] <= 4) }
fn detect_patterns(&self, data_points: &[DataPoint], features: &StreamFeatures) -> Result<Vec<BehaviorPattern>> {
let mut patterns = Vec::new();
if let Some(pattern) = self.detect_burst_pattern(data_points)? {
patterns.push(pattern);
}
if features.periodicity.has_periodicity {
patterns.push(BehaviorPattern {
pattern_type: PatternType::PeriodicTraffic,
description: format!("Periodic traffic detected with period {:?}",
features.periodicity.period_length),
confidence: features.periodicity.period_strength,
start_time: data_points.first().unwrap().timestamp,
duration: data_points.last().unwrap().timestamp
.duration_since(data_points.first().unwrap().timestamp),
metadata: HashMap::new(),
});
}
if features.entropy > 7.5 { patterns.push(BehaviorPattern {
pattern_type: PatternType::EncryptedTraffic,
description: format!("High entropy traffic detected (entropy: {:.2})", features.entropy),
confidence: (features.entropy - 7.0) / 1.0, start_time: data_points.first().unwrap().timestamp,
duration: data_points.last().unwrap().timestamp
.duration_since(data_points.first().unwrap().timestamp),
metadata: HashMap::new(),
});
}
Ok(patterns)
}
fn detect_burst_pattern(&self, data_points: &[DataPoint]) -> Result<Option<BehaviorPattern>> {
if data_points.len() < 5 {
return Ok(None);
}
let sizes: Vec<usize> = data_points.iter().map(|dp| dp.size).collect();
let mean_size = sizes.iter().sum::<usize>() as f64 / sizes.len() as f64;
let mut burst_start = None;
let mut burst_count = 0;
for (i, &size) in sizes.iter().enumerate() {
if size as f64 > mean_size * 2.0 { if burst_start.is_none() {
burst_start = Some(i);
}
burst_count += 1;
} else {
if burst_count >= 3 { let start_idx = burst_start.unwrap();
return Ok(Some(BehaviorPattern {
pattern_type: PatternType::BurstTraffic,
description: format!("Burst traffic detected: {} large packets", burst_count),
confidence: 0.8,
start_time: data_points[start_idx].timestamp,
duration: data_points[i - 1].timestamp
.duration_since(data_points[start_idx].timestamp),
metadata: HashMap::new(),
}));
}
burst_start = None;
burst_count = 0;
}
}
Ok(None)
}
fn detect_anomalies(&self, data_points: &[DataPoint], features: &StreamFeatures) -> Result<Vec<Anomaly>> {
let mut anomalies = Vec::new();
let size_anomalies = self.detect_size_anomalies(data_points, features)?;
anomalies.extend(size_anomalies);
let timing_anomalies = self.detect_timing_anomalies(data_points, features)?;
anomalies.extend(timing_anomalies);
if features.entropy < 1.0 || features.entropy > 7.9 {
anomalies.push(Anomaly {
anomaly_type: AnomalyType::EntropyAnomaly,
severity: if features.entropy < 0.5 || features.entropy > 7.95 {
AnomalySeverity::High
} else {
AnomalySeverity::Medium
},
description: format!("Unusual entropy value: {:.2}", features.entropy),
detected_at: Instant::now(),
value: features.entropy,
expected_value: 4.0, deviation: (features.entropy - 4.0).abs(),
});
}
Ok(anomalies)
}
fn detect_size_anomalies(&self, data_points: &[DataPoint], features: &StreamFeatures) -> Result<Vec<Anomaly>> {
let mut anomalies = Vec::new();
let threshold = self.config.anomaly_threshold;
for data_point in data_points {
let size = data_point.size as f64;
let deviation = (size - features.packet_size_distribution.mean_size).abs()
/ features.packet_size_distribution.std_dev;
if deviation > threshold {
let severity = if deviation > threshold * 2.0 {
AnomalySeverity::High
} else if deviation > threshold * 1.5 {
AnomalySeverity::Medium
} else {
AnomalySeverity::Low
};
anomalies.push(Anomaly {
anomaly_type: AnomalyType::SizeAnomaly,
severity,
description: format!("Unusual packet size: {} bytes", data_point.size),
detected_at: data_point.timestamp,
value: size,
expected_value: features.packet_size_distribution.mean_size,
deviation,
});
}
}
Ok(anomalies)
}
fn detect_timing_anomalies(&self, data_points: &[DataPoint], features: &StreamFeatures) -> Result<Vec<Anomaly>> {
let mut anomalies = Vec::new();
if data_points.len() < 2 {
return Ok(anomalies);
}
let threshold = self.config.anomaly_threshold;
let mean_interval_nanos = features.timing_distribution.mean_interval.as_nanos() as f64;
let std_dev_nanos = features.timing_distribution.std_dev.as_nanos() as f64;
for window in data_points.windows(2) {
let interval = window[1].timestamp.duration_since(window[0].timestamp);
let interval_nanos = interval.as_nanos() as f64;
let deviation = (interval_nanos - mean_interval_nanos).abs() / std_dev_nanos;
if deviation > threshold {
let severity = if deviation > threshold * 2.0 {
AnomalySeverity::High
} else if deviation > threshold * 1.5 {
AnomalySeverity::Medium
} else {
AnomalySeverity::Low
};
anomalies.push(Anomaly {
anomaly_type: AnomalyType::TimingAnomaly,
severity,
description: format!("Unusual timing interval: {:?}", interval),
detected_at: window[1].timestamp,
value: interval_nanos,
expected_value: mean_interval_nanos,
deviation,
});
}
}
Ok(anomalies)
}
fn calculate_performance_metrics(&self, data_points: &[DataPoint]) -> Result<PerformanceMetrics> {
if data_points.is_empty() {
return Ok(PerformanceMetrics::default());
}
let total_bytes: usize = data_points.iter().map(|dp| dp.size).sum();
let duration = data_points.last().unwrap().timestamp
.duration_since(data_points.first().unwrap().timestamp);
let throughput = if duration.as_secs_f64() > 0.0 {
total_bytes as f64 / duration.as_secs_f64()
} else {
0.0
};
let mut metrics = PerformanceMetrics {
throughput,
latency: Duration::from_millis(10), jitter: Duration::from_millis(5), packet_loss_rate: 0.0, bandwidth_utilization: 0.8, quality_score: 0.9, };
if throughput < 1000.0 { metrics.quality_score *= 0.5;
} else if throughput > 1000000.0 { metrics.quality_score = (metrics.quality_score * 1.2).min(1.0);
}
Ok(metrics)
}
fn calculate_confidence(&self, features: &StreamFeatures, patterns: &[BehaviorPattern], anomalies: &[Anomaly]) -> f64 {
let mut confidence = 0.5;
if features.packet_size_distribution.histogram.len() > 5 {
confidence += 0.1;
}
if features.entropy > 0.0 && features.entropy < 8.0 {
confidence += 0.1;
}
confidence += (patterns.len() as f64 * 0.05).min(0.2);
confidence -= (anomalies.len() as f64 * 0.02).min(0.3);
confidence.clamp(0.0, 1.0)
}
fn cleanup_expired_data(&self, data_points: &mut Vec<DataPoint>) {
let cutoff = Instant::now() - self.config.history_retention;
data_points.retain(|dp| dp.timestamp > cutoff);
}
fn update_average_analysis_time(&mut self, duration: Duration) {
let count = self.stats.analysis_count;
if count == 1 {
self.stats.average_analysis_time = duration;
} else {
let current_total = self.stats.average_analysis_time.as_nanos() * (count - 1) as u128;
let new_total = current_total + duration.as_nanos();
self.stats.average_analysis_time = Duration::from_nanos((new_total / count as u128) as u64);
}
}
pub fn stats(&self) -> &AnalyzerStats {
&self.stats
}
pub fn config(&self) -> &AnalyzerConfig {
&self.config
}
pub fn update_config(&mut self, config: AnalyzerConfig) {
self.config = config;
}
pub fn clear_history(&mut self) {
self.history.clear();
}
pub fn stream_count(&self) -> usize {
self.history.len()
}
}
impl Default for StreamAnalyzer {
fn default() -> Self {
Self::new(AnalyzerConfig::default())
}
}