use crate::error::{DetectorError, Result};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct BufferConfig {
pub max_size: usize,
pub max_chunks: usize,
pub chunk_timeout: Duration,
pub enable_compression: bool,
pub compression_threshold: usize,
pub auto_cleanup: bool,
pub cleanup_interval: Duration,
}
impl Default for BufferConfig {
fn default() -> Self {
Self {
max_size: 1024 * 1024, max_chunks: 1000,
chunk_timeout: Duration::from_secs(300), enable_compression: false,
compression_threshold: 4096, auto_cleanup: true,
cleanup_interval: Duration::from_secs(60), }
}
}
#[derive(Debug, Clone)]
pub struct DataChunk {
pub data: Vec<u8>,
pub timestamp: Instant,
pub sequence: u64,
pub compressed: bool,
pub original_size: Option<usize>,
}
impl DataChunk {
pub fn new(data: Vec<u8>, sequence: u64) -> Self {
Self {
data,
timestamp: Instant::now(),
sequence,
compressed: false,
original_size: None,
}
}
pub fn size(&self) -> usize {
self.data.len()
}
pub fn age(&self) -> Duration {
Instant::now().duration_since(self.timestamp)
}
pub fn is_expired(&self, timeout: Duration) -> bool {
self.age() > timeout
}
pub fn compress(&mut self) -> Result<()> {
if self.compressed {
return Ok(());
}
if self.data.len() > 100 {
self.original_size = Some(self.data.len());
let mut compressed_data = Vec::new();
let mut last_byte = None;
let mut count = 0u8;
for &byte in &self.data {
if Some(byte) == last_byte {
count = count.saturating_add(1);
if count == 255 {
compressed_data.push(count);
compressed_data.push(byte);
count = 0;
last_byte = None;
}
} else {
if let Some(prev_byte) = last_byte {
if count > 0 {
compressed_data.push(count);
compressed_data.push(prev_byte);
} else {
compressed_data.push(prev_byte);
}
}
last_byte = Some(byte);
count = 0;
}
}
if let Some(byte) = last_byte {
if count > 0 {
compressed_data.push(count);
compressed_data.push(byte);
} else {
compressed_data.push(byte);
}
}
if compressed_data.len() < self.data.len() * 3 / 4 {
self.data = compressed_data;
self.compressed = true;
}
}
Ok(())
}
pub fn decompress(&mut self) -> Result<()> {
if !self.compressed {
return Ok(());
}
let mut decompressed_data = Vec::new();
let mut i = 0;
while i < self.data.len() {
let byte = self.data[i];
if i + 1 < self.data.len() && byte > 1 {
let repeat_byte = self.data[i + 1];
for _ in 0..byte {
decompressed_data.push(repeat_byte);
}
i += 2;
} else {
decompressed_data.push(byte);
i += 1;
}
}
self.data = decompressed_data;
self.compressed = false;
self.original_size = None;
Ok(())
}
}
#[derive(Debug)]
pub struct StreamBuffer {
chunks: VecDeque<DataChunk>,
config: BufferConfig,
total_size: usize,
next_sequence: u64,
last_cleanup: Instant,
stats: BufferStats,
}
#[derive(Debug, Clone, Default)]
pub struct BufferStats {
pub total_bytes_received: usize,
pub total_bytes_sent: usize,
pub current_buffered_bytes: usize,
pub max_buffered_bytes: usize,
pub chunk_count: usize,
pub max_chunk_count: usize,
pub compression_count: usize,
pub cleanup_count: usize,
pub dropped_bytes: usize,
}
impl StreamBuffer {
pub fn new(config: BufferConfig) -> Self {
Self {
chunks: VecDeque::new(),
config,
total_size: 0,
next_sequence: 0,
last_cleanup: Instant::now(),
stats: BufferStats::default(),
}
}
pub fn push(&mut self, data: Vec<u8>) -> Result<()> {
if data.is_empty() {
return Ok(());
}
let data_size = data.len();
self.stats.total_bytes_received += data_size;
if self.config.auto_cleanup {
self.maybe_cleanup()?;
}
if self.total_size + data_size > self.config.max_size {
self.make_space(data_size)?;
}
let mut chunk = DataChunk::new(data, self.next_sequence);
self.next_sequence += 1;
if self.config.enable_compression && chunk.size() >= self.config.compression_threshold {
chunk.compress()?;
self.stats.compression_count += 1;
}
self.total_size += chunk.size();
self.chunks.push_back(chunk);
self.stats.current_buffered_bytes = self.total_size;
self.stats.chunk_count = self.chunks.len();
if self.total_size > self.stats.max_buffered_bytes {
self.stats.max_buffered_bytes = self.total_size;
}
if self.chunks.len() > self.stats.max_chunk_count {
self.stats.max_chunk_count = self.chunks.len();
}
Ok(())
}
pub fn peek(&self, size: usize) -> Vec<u8> {
let mut result = Vec::new();
let mut remaining = size;
for chunk in &self.chunks {
if remaining == 0 {
break;
}
let chunk_data = if chunk.compressed {
let mut temp_chunk = chunk.clone();
if temp_chunk.decompress().is_ok() {
temp_chunk.data
} else {
continue;
}
} else {
chunk.data.clone()
};
let take_size = std::cmp::min(remaining, chunk_data.len());
result.extend_from_slice(&chunk_data[..take_size]);
remaining -= take_size;
}
result
}
pub fn pop(&mut self, size: usize) -> Vec<u8> {
let mut result = Vec::new();
let mut remaining = size;
while remaining > 0 && !self.chunks.is_empty() {
let mut chunk = self.chunks.pop_front().unwrap();
if chunk.compressed {
if chunk.decompress().is_err() {
continue;
}
}
let chunk_size = chunk.data.len();
if chunk_size <= remaining {
result.extend_from_slice(&chunk.data);
remaining -= chunk_size;
self.total_size -= chunk.size();
} else {
result.extend_from_slice(&chunk.data[..remaining]);
let remaining_data = chunk.data[remaining..].to_vec();
let new_chunk = DataChunk::new(remaining_data, chunk.sequence);
self.chunks.push_front(new_chunk);
self.total_size -= remaining;
remaining = 0;
}
}
self.stats.total_bytes_sent += result.len();
self.stats.current_buffered_bytes = self.total_size;
self.stats.chunk_count = self.chunks.len();
result
}
pub fn drain(&mut self) -> Vec<u8> {
let total_size = self.total_size;
self.pop(total_size)
}
pub fn size(&self) -> usize {
self.total_size
}
pub fn chunk_count(&self) -> usize {
self.chunks.len()
}
pub fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
pub fn is_full(&self) -> bool {
self.total_size >= self.config.max_size || self.chunks.len() >= self.config.max_chunks
}
pub fn clear(&mut self) {
self.chunks.clear();
self.total_size = 0;
self.stats.current_buffered_bytes = 0;
self.stats.chunk_count = 0;
}
pub fn compress_all(&mut self) -> Result<()> {
if !self.config.enable_compression {
return Ok(());
}
for chunk in &mut self.chunks {
if !chunk.compressed && chunk.size() >= self.config.compression_threshold {
let old_size = chunk.size();
chunk.compress()?;
let new_size = chunk.size();
self.total_size = self.total_size - old_size + new_size;
self.stats.compression_count += 1;
}
}
self.stats.current_buffered_bytes = self.total_size;
Ok(())
}
pub fn cleanup(&mut self) -> Result<()> {
let mut removed_size = 0;
let mut removed_count = 0;
while let Some(chunk) = self.chunks.front() {
if chunk.is_expired(self.config.chunk_timeout) {
let chunk = self.chunks.pop_front().unwrap();
removed_size += chunk.size();
removed_count += 1;
} else {
break;
}
}
if removed_size > 0 {
self.total_size -= removed_size;
self.stats.current_buffered_bytes = self.total_size;
self.stats.chunk_count = self.chunks.len();
self.stats.dropped_bytes += removed_size;
self.stats.cleanup_count += 1;
}
self.last_cleanup = Instant::now();
Ok(())
}
fn maybe_cleanup(&mut self) -> Result<()> {
if self.last_cleanup.elapsed() >= self.config.cleanup_interval {
self.cleanup()?;
}
Ok(())
}
fn make_space(&mut self, needed_size: usize) -> Result<()> {
let mut freed_size = 0;
self.cleanup()?;
while freed_size < needed_size && !self.chunks.is_empty() {
if let Some(chunk) = self.chunks.pop_front() {
freed_size += chunk.size();
self.total_size -= chunk.size();
self.stats.dropped_bytes += chunk.size();
}
}
if self.total_size + needed_size > self.config.max_size {
return Err(DetectorError::buffer_error(
"Cannot make enough space in buffer".to_string()
));
}
self.stats.current_buffered_bytes = self.total_size;
self.stats.chunk_count = self.chunks.len();
Ok(())
}
pub fn stats(&self) -> &BufferStats {
&self.stats
}
pub fn config(&self) -> &BufferConfig {
&self.config
}
pub fn update_config(&mut self, config: BufferConfig) {
self.config = config;
}
pub fn utilization(&self) -> f64 {
if self.config.max_size == 0 {
0.0
} else {
self.total_size as f64 / self.config.max_size as f64
}
}
pub fn compression_ratio(&self) -> f64 {
let mut original_size = 0;
let mut compressed_size = 0;
for chunk in &self.chunks {
if chunk.compressed {
compressed_size += chunk.size();
original_size += chunk.original_size.unwrap_or(chunk.size());
} else {
let size = chunk.size();
compressed_size += size;
original_size += size;
}
}
if original_size == 0 {
1.0
} else {
compressed_size as f64 / original_size as f64
}
}
}
impl Default for StreamBuffer {
fn default() -> Self {
Self::new(BufferConfig::default())
}
}