use crate::core::detector::{DetectionResult, ProtocolDetector};
use crate::core::protocol::ProtocolType;
use crate::error::{DetectorError, Result};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
pub mod buffer;
pub mod processor;
pub mod analyzer;
pub use buffer::{StreamBuffer, BufferConfig};
pub use processor::{StreamProcessor, ProcessorConfig};
pub use analyzer::{StreamAnalyzer, AnalysisResult};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamState {
Initial,
Detecting,
Detected(ProtocolType),
Failed(String),
Closed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StreamDirection {
Inbound,
Outbound,
Bidirectional,
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub stream_id: String,
pub source_addr: Option<String>,
pub dest_addr: Option<String>,
pub direction: StreamDirection,
pub created_at: Instant,
pub last_activity: Instant,
pub total_bytes: usize,
pub packet_count: usize,
pub attributes: std::collections::HashMap<String, String>,
}
impl StreamMetadata {
pub fn new(stream_id: String, direction: StreamDirection) -> Self {
let now = Instant::now();
Self {
stream_id,
source_addr: None,
dest_addr: None,
direction,
created_at: now,
last_activity: now,
total_bytes: 0,
packet_count: 0,
attributes: std::collections::HashMap::new(),
}
}
pub fn with_source_addr(mut self, addr: String) -> Self {
self.source_addr = Some(addr);
self
}
pub fn with_dest_addr(mut self, addr: String) -> Self {
self.dest_addr = Some(addr);
self
}
pub fn with_attribute(mut self, key: String, value: String) -> Self {
self.attributes.insert(key, value);
self
}
pub fn update_activity(&mut self) {
self.last_activity = Instant::now();
}
pub fn add_bytes(&mut self, bytes: usize) {
self.total_bytes += bytes;
self.packet_count += 1;
self.update_activity();
}
pub fn duration(&self) -> Duration {
self.last_activity.duration_since(self.created_at)
}
pub fn average_packet_size(&self) -> f64 {
if self.packet_count == 0 {
0.0
} else {
self.total_bytes as f64 / self.packet_count as f64
}
}
}
#[derive(Debug, Clone)]
pub enum StreamEvent {
DataReceived {
data: Vec<u8>,
timestamp: Instant,
},
ProtocolDetected {
protocol: ProtocolType,
confidence: f64,
timestamp: Instant,
},
DetectionFailed {
error: String,
timestamp: Instant,
},
StreamClosed {
timestamp: Instant,
},
BufferFull {
size: usize,
timestamp: Instant,
},
Timeout {
duration: Duration,
timestamp: Instant,
},
}
pub trait StreamEventHandler {
fn handle_event(&mut self, event: StreamEvent) -> Result<()>;
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub max_buffer_size: usize,
pub detection_timeout: Duration,
pub min_detection_size: usize,
pub max_detection_size: usize,
pub enable_analysis: bool,
pub keep_raw_data: bool,
pub event_queue_size: usize,
pub enable_stats: bool,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
max_buffer_size: 64 * 1024, detection_timeout: Duration::from_secs(30),
min_detection_size: 16,
max_detection_size: 8 * 1024, enable_analysis: true,
keep_raw_data: false,
event_queue_size: 1000,
enable_stats: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub streams_processed: usize,
pub successful_detections: usize,
pub failed_detections: usize,
pub timeouts: usize,
pub total_bytes_processed: usize,
pub average_detection_time: Duration,
pub protocol_counts: std::collections::HashMap<ProtocolType, usize>,
pub error_counts: std::collections::HashMap<String, usize>,
}
impl StreamStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_successful_detection(&mut self, protocol: ProtocolType, duration: Duration) {
self.successful_detections += 1;
*self.protocol_counts.entry(protocol).or_insert(0) += 1;
self.update_average_time(duration);
}
pub fn record_failed_detection(&mut self, error: &str) {
self.failed_detections += 1;
*self.error_counts.entry(error.to_string()).or_insert(0) += 1;
}
pub fn record_timeout(&mut self) {
self.timeouts += 1;
}
pub fn record_bytes_processed(&mut self, bytes: usize) {
self.total_bytes_processed += bytes;
}
pub fn record_new_stream(&mut self) {
self.streams_processed += 1;
}
fn update_average_time(&mut self, duration: Duration) {
let total_detections = self.successful_detections;
if total_detections == 1 {
self.average_detection_time = duration;
} else {
let current_total = self.average_detection_time.as_nanos() * (total_detections - 1) as u128;
let new_total = current_total + duration.as_nanos();
self.average_detection_time = Duration::from_nanos((new_total / total_detections as u128) as u64);
}
}
pub fn success_rate(&self) -> f64 {
let total = self.successful_detections + self.failed_detections;
if total == 0 {
0.0
} else {
self.successful_detections as f64 / total as f64
}
}
pub fn timeout_rate(&self) -> f64 {
if self.streams_processed == 0 {
0.0
} else {
self.timeouts as f64 / self.streams_processed as f64
}
}
pub fn most_common_protocol(&self) -> Option<ProtocolType> {
self.protocol_counts
.iter()
.max_by_key(|(_, count)| *count)
.map(|(protocol, _)| *protocol)
}
pub fn most_common_error(&self) -> Option<String> {
self.error_counts
.iter()
.max_by_key(|(_, count)| *count)
.map(|(error, _)| error.clone())
}
pub fn reset(&mut self) {
*self = Self::new();
}
}
pub struct StreamManager {
active_streams: std::collections::HashMap<String, StreamMetadata>,
config: StreamConfig,
stats: StreamStats,
event_queue: VecDeque<StreamEvent>,
event_handlers: Vec<Box<dyn StreamEventHandler>>,
}
impl StreamManager {
pub fn new(config: StreamConfig) -> Self {
Self {
active_streams: std::collections::HashMap::new(),
config,
stats: StreamStats::new(),
event_queue: VecDeque::with_capacity(1000),
event_handlers: Vec::new(),
}
}
pub fn add_event_handler(&mut self, handler: Box<dyn StreamEventHandler>) {
self.event_handlers.push(handler);
}
pub fn create_stream(&mut self, stream_id: String, direction: StreamDirection) -> Result<()> {
let metadata = StreamMetadata::new(stream_id.clone(), direction);
self.active_streams.insert(stream_id, metadata);
self.stats.record_new_stream();
Ok(())
}
pub fn get_stream(&self, stream_id: &str) -> Option<&StreamMetadata> {
self.active_streams.get(stream_id)
}
pub fn get_stream_mut(&mut self, stream_id: &str) -> Option<&mut StreamMetadata> {
self.active_streams.get_mut(stream_id)
}
pub fn close_stream(&mut self, stream_id: &str) -> Result<()> {
if self.active_streams.remove(stream_id).is_some() {
let event = StreamEvent::StreamClosed {
timestamp: Instant::now(),
};
self.emit_event(event)?;
}
Ok(())
}
pub fn emit_event(&mut self, event: StreamEvent) -> Result<()> {
if self.event_queue.len() >= self.config.event_queue_size {
self.event_queue.pop_front();
}
self.event_queue.push_back(event.clone());
for handler in &mut self.event_handlers {
handler.handle_event(event.clone())?;
}
Ok(())
}
pub fn handle_timeouts(&mut self) -> Result<()> {
let now = Instant::now();
let timeout_duration = self.config.detection_timeout;
let mut timed_out_streams = Vec::new();
for (stream_id, metadata) in &self.active_streams {
if now.duration_since(metadata.last_activity) > timeout_duration {
timed_out_streams.push(stream_id.clone());
}
}
for stream_id in timed_out_streams {
self.stats.record_timeout();
let event = StreamEvent::Timeout {
duration: timeout_duration,
timestamp: now,
};
self.emit_event(event)?;
self.close_stream(&stream_id)?;
}
Ok(())
}
pub fn active_stream_count(&self) -> usize {
self.active_streams.len()
}
pub fn stats(&self) -> &StreamStats {
&self.stats
}
pub fn stats_mut(&mut self) -> &mut StreamStats {
&mut self.stats
}
pub fn config(&self) -> &StreamConfig {
&self.config
}
pub fn update_config(&mut self, config: StreamConfig) {
self.config = config;
}
pub fn cleanup(&mut self) {
self.active_streams.clear();
self.event_queue.clear();
self.stats.reset();
}
}
impl Default for StreamManager {
fn default() -> Self {
Self::new(StreamConfig::default())
}
}