use crate::core::detector::{DetectionConfig, DetectionResult, ProtocolDetector};
use crate::core::protocol::ProtocolType;
use crate::error::{DetectorError, Result};
use crate::stream::buffer::{StreamBuffer, BufferConfig};
use crate::stream::{StreamEvent, StreamMetadata, StreamState};
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct ProcessorConfig {
pub buffer_config: BufferConfig,
pub detection_config: DetectionConfig,
pub max_concurrent_streams: usize,
pub stream_timeout: Duration,
pub detection_interval: Duration,
pub min_detection_size: usize,
pub max_detection_attempts: usize,
pub enable_incremental_detection: bool,
pub keep_detection_history: bool,
}
impl Default for ProcessorConfig {
fn default() -> Self {
Self {
buffer_config: BufferConfig::default(),
detection_config: DetectionConfig::default(),
max_concurrent_streams: 1000,
stream_timeout: Duration::from_secs(300), detection_interval: Duration::from_millis(100),
min_detection_size: 64,
max_detection_attempts: 5,
enable_incremental_detection: true,
keep_detection_history: false,
}
}
}
#[derive(Debug)]
struct StreamContext {
metadata: StreamMetadata,
state: StreamState,
buffer: StreamBuffer,
detection_history: Vec<DetectionResult>,
detection_attempts: usize,
last_detection: Option<Instant>,
last_activity: Instant,
}
impl StreamContext {
fn new(metadata: StreamMetadata, buffer_config: BufferConfig) -> Self {
Self {
metadata,
state: StreamState::Initial,
buffer: StreamBuffer::new(buffer_config),
detection_history: Vec::new(),
detection_attempts: 0,
last_detection: None,
last_activity: Instant::now(),
}
}
fn update_activity(&mut self) {
self.last_activity = Instant::now();
self.metadata.update_activity();
}
fn is_expired(&self, timeout: Duration) -> bool {
self.last_activity.elapsed() > timeout
}
fn should_detect(&self, interval: Duration, min_size: usize) -> bool {
if matches!(self.state, StreamState::Detected(_) | StreamState::Failed(_) | StreamState::Closed) {
return false;
}
if self.buffer.size() < min_size {
return false;
}
if let Some(last_detection) = self.last_detection {
if last_detection.elapsed() < interval {
return false;
}
}
true
}
}
#[derive(Debug)]
pub struct StreamProcessor {
config: ProcessorConfig,
streams: HashMap<String, StreamContext>,
detector: Box<dyn ProtocolDetector>,
stats: ProcessorStats,
}
#[derive(Debug, Clone, Default)]
pub struct ProcessorStats {
pub total_streams: usize,
pub active_streams: usize,
pub successful_detections: usize,
pub failed_detections: usize,
pub timed_out_streams: usize,
pub total_bytes_processed: usize,
pub average_detection_time: Duration,
pub protocol_stats: HashMap<ProtocolType, usize>,
pub error_stats: HashMap<String, usize>,
}
impl ProcessorStats {
fn record_new_stream(&mut self) {
self.total_streams += 1;
self.active_streams += 1;
}
fn record_stream_closed(&mut self) {
if self.active_streams > 0 {
self.active_streams -= 1;
}
}
fn record_successful_detection(&mut self, protocol: ProtocolType, duration: Duration) {
self.successful_detections += 1;
*self.protocol_stats.entry(protocol).or_insert(0) += 1;
self.update_average_detection_time(duration);
}
fn record_failed_detection(&mut self, error: &str) {
self.failed_detections += 1;
*self.error_stats.entry(error.to_string()).or_insert(0) += 1;
}
fn record_timeout(&mut self) {
self.timed_out_streams += 1;
}
fn record_bytes_processed(&mut self, bytes: usize) {
self.total_bytes_processed += bytes;
}
fn update_average_detection_time(&mut self, duration: Duration) {
let total_detections = self.successful_detections;
if total_detections == 1 {
self.average_detection_time = duration;
} else {
let current_total = self.average_detection_time.as_nanos() * (total_detections - 1) as u128;
let new_total = current_total + duration.as_nanos();
self.average_detection_time = Duration::from_nanos((new_total / total_detections as u128) as u64);
}
}
}
impl StreamProcessor {
pub fn new(config: ProcessorConfig, detector: Box<dyn ProtocolDetector>) -> Self {
Self {
config,
streams: HashMap::new(),
detector,
stats: ProcessorStats::default(),
}
}
pub fn create_stream(&mut self, metadata: StreamMetadata) -> Result<()> {
let stream_id = metadata.stream_id.clone();
if self.streams.len() >= self.config.max_concurrent_streams {
return Err(DetectorError::config_error(
"Maximum concurrent streams exceeded".to_string()
));
}
let context = StreamContext::new(metadata, self.config.buffer_config.clone());
self.streams.insert(stream_id, context);
self.stats.record_new_stream();
Ok(())
}
pub fn process_data(&mut self, stream_id: &str, data: Vec<u8>) -> Result<Vec<StreamEvent>> {
let mut events = Vec::new();
let context = self.streams.get_mut(stream_id)
.ok_or_else(|| DetectorError::config_error(
format!("Stream not found: {}", stream_id)
))?;
context.update_activity();
let data_size = data.len();
context.buffer.push(data)?;
context.metadata.add_bytes(data_size);
self.stats.record_bytes_processed(data_size);
events.push(StreamEvent::DataReceived {
data: context.buffer.peek(data_size),
timestamp: Instant::now(),
});
let should_detect = context.should_detect(
self.config.detection_interval,
self.config.min_detection_size,
);
let buffer_full = context.buffer.is_full();
let buffer_size = context.buffer.size();
drop(context);
if should_detect {
if let Some(detection_event) = self.attempt_detection(stream_id)? {
events.push(detection_event);
}
}
if buffer_full {
events.push(StreamEvent::BufferFull {
size: buffer_size,
timestamp: Instant::now(),
});
}
Ok(events)
}
fn attempt_detection(&mut self, stream_id: &str) -> Result<Option<StreamEvent>> {
let context = self.streams.get_mut(stream_id)
.ok_or_else(|| DetectorError::config_error(
format!("Stream not found: {}", stream_id)
))?;
if context.detection_attempts >= self.config.max_detection_attempts {
context.state = StreamState::Failed("Max detection attempts exceeded".to_string());
self.stats.record_failed_detection("max_attempts_exceeded");
return Ok(Some(StreamEvent::DetectionFailed {
error: "Maximum detection attempts exceeded".to_string(),
timestamp: Instant::now(),
}));
}
let detection_size = std::cmp::min(
context.buffer.size(),
self.config.detection_config.max_probe_size
);
if detection_size < self.config.min_detection_size {
return Ok(None);
}
let data = context.buffer.peek(detection_size);
context.detection_attempts += 1;
context.last_detection = Some(Instant::now());
context.state = StreamState::Detecting;
let start_time = Instant::now();
match self.detector.detect(&data) {
Ok(result) => {
let detection_time = start_time.elapsed();
if result.protocol_info.confidence >= self.config.detection_config.min_confidence {
context.state = StreamState::Detected(result.protocol_info.protocol_type);
if self.config.keep_detection_history {
context.detection_history.push(result.clone());
}
self.stats.record_successful_detection(
result.protocol_info.protocol_type,
detection_time,
);
Ok(Some(StreamEvent::ProtocolDetected {
protocol: result.protocol_info.protocol_type,
confidence: result.protocol_info.confidence as f64,
timestamp: Instant::now(),
}))
} else {
context.state = StreamState::Initial;
if self.config.keep_detection_history {
context.detection_history.push(result);
}
Ok(None)
}
}
Err(err) => {
let error_msg = err.to_string();
context.state = StreamState::Failed(error_msg.clone());
self.stats.record_failed_detection(&error_msg);
Ok(Some(StreamEvent::DetectionFailed {
error: error_msg,
timestamp: Instant::now(),
}))
}
}
}
pub fn get_stream_state(&self, stream_id: &str) -> Option<&StreamState> {
self.streams.get(stream_id).map(|ctx| &ctx.state)
}
pub fn get_stream_metadata(&self, stream_id: &str) -> Option<&StreamMetadata> {
self.streams.get(stream_id).map(|ctx| &ctx.metadata)
}
pub fn get_buffer_size(&self, stream_id: &str) -> Option<usize> {
self.streams.get(stream_id).map(|ctx| ctx.buffer.size())
}
pub fn get_detection_history(&self, stream_id: &str) -> Option<&[DetectionResult]> {
self.streams.get(stream_id).map(|ctx| ctx.detection_history.as_slice())
}
pub fn close_stream(&mut self, stream_id: &str) -> Result<()> {
if let Some(mut context) = self.streams.remove(stream_id) {
context.state = StreamState::Closed;
self.stats.record_stream_closed();
}
Ok(())
}
pub fn handle_timeouts(&mut self) -> Result<Vec<StreamEvent>> {
let mut events = Vec::new();
let mut timed_out_streams = Vec::new();
for (stream_id, context) in &self.streams {
if context.is_expired(self.config.stream_timeout) {
timed_out_streams.push(stream_id.clone());
}
}
for stream_id in timed_out_streams {
self.stats.record_timeout();
events.push(StreamEvent::Timeout {
duration: self.config.stream_timeout,
timestamp: Instant::now(),
});
self.close_stream(&stream_id)?;
}
Ok(events)
}
pub fn get_active_streams(&self) -> Vec<&str> {
self.streams.keys().map(|s| s.as_str()).collect()
}
pub fn stats(&self) -> &ProcessorStats {
&self.stats
}
pub fn config(&self) -> &ProcessorConfig {
&self.config
}
pub fn update_config(&mut self, config: ProcessorConfig) {
self.config = config;
}
pub fn cleanup(&mut self) {
self.streams.clear();
self.stats = ProcessorStats::default();
}
pub fn force_detection(&mut self, stream_id: &str) -> Result<Option<StreamEvent>> {
if let Some(context) = self.streams.get_mut(stream_id) {
context.last_detection = None;
}
self.attempt_detection(stream_id)
}
pub fn get_stream_data(&self, stream_id: &str, size: usize) -> Option<Vec<u8>> {
self.streams.get(stream_id).map(|ctx| ctx.buffer.peek(size))
}
pub fn consume_stream_data(&mut self, stream_id: &str, size: usize) -> Option<Vec<u8>> {
self.streams.get_mut(stream_id).map(|ctx| ctx.buffer.pop(size))
}
pub fn load(&self) -> f64 {
if self.config.max_concurrent_streams == 0 {
0.0
} else {
self.streams.len() as f64 / self.config.max_concurrent_streams as f64
}
}
pub fn success_rate(&self) -> f64 {
let total = self.stats.successful_detections + self.stats.failed_detections;
if total == 0 {
0.0
} else {
self.stats.successful_detections as f64 / total as f64
}
}
}