use super::{Algorithm, CompressionStats, Compressor, CompressorFactory};
use crate::error::{Result, ZiporaError};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionMode {
UltraLowLatency,
LowLatency,
Balanced,
HighCompression,
}
impl CompressionMode {
pub fn target_latency(&self) -> Duration {
match self {
CompressionMode::UltraLowLatency => Duration::from_millis(1),
CompressionMode::LowLatency => Duration::from_millis(10),
CompressionMode::Balanced => Duration::from_millis(100),
CompressionMode::HighCompression => Duration::from_millis(1000),
}
}
pub fn preferred_algorithm(&self) -> Algorithm {
match self {
CompressionMode::UltraLowLatency => Algorithm::None,
CompressionMode::LowLatency => Algorithm::Lz4,
#[cfg(feature = "zstd")]
CompressionMode::Balanced => Algorithm::Zstd(3),
#[cfg(not(feature = "zstd"))]
CompressionMode::Balanced => Algorithm::Lz4,
#[cfg(feature = "zstd")]
CompressionMode::HighCompression => Algorithm::Zstd(9),
#[cfg(not(feature = "zstd"))]
CompressionMode::HighCompression => Algorithm::Lz4,
}
}
pub fn max_memory_ratio(&self) -> f64 {
match self {
CompressionMode::UltraLowLatency => 0.0,
CompressionMode::LowLatency => 0.1,
CompressionMode::Balanced => 1.0,
CompressionMode::HighCompression => 4.0,
}
}
}
#[derive(Debug, Clone)]
pub struct RealtimeConfig {
pub mode: CompressionMode,
pub max_concurrent: usize,
pub enable_deadlines: bool,
pub fallback_on_timeout: bool,
pub batch_size: usize,
pub batch_timeout: Duration,
}
impl Default for RealtimeConfig {
fn default() -> Self {
Self {
mode: CompressionMode::LowLatency,
max_concurrent: std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1),
enable_deadlines: true,
fallback_on_timeout: true,
batch_size: 10,
batch_timeout: Duration::from_millis(1),
}
}
}
pub struct RealtimeCompressor {
config: RealtimeConfig,
compressor: Arc<RwLock<Box<dyn Compressor>>>,
fallback_compressor: Arc<Box<dyn Compressor>>, semaphore: Arc<Semaphore>,
stats: Arc<RwLock<RealtimeStats>>,
}
#[derive(Debug, Clone, Default)]
pub struct RealtimeStats {
pub base_stats: CompressionStats,
pub deadline_met: u64,
pub deadline_missed: u64,
pub fallback_operations: u64,
pub avg_latency_us: u64,
pub max_latency_us: u64,
pub p95_latency_us: u64,
pub p99_latency_us: u64,
latency_samples: Vec<u64>,
}
impl RealtimeStats {
pub fn deadline_success_rate(&self) -> f64 {
let total = self.deadline_met + self.deadline_missed;
if total == 0 {
0.0
} else {
self.deadline_met as f64 / total as f64
}
}
fn update_latency(&mut self, latency_us: u64, met_deadline: bool) {
if met_deadline {
self.deadline_met += 1;
} else {
self.deadline_missed += 1;
}
self.latency_samples.push(latency_us);
if self.latency_samples.len() > 1000 {
self.latency_samples.drain(0..500); }
let total_ops = self.deadline_met + self.deadline_missed;
self.avg_latency_us = (self.avg_latency_us * (total_ops - 1) + latency_us) / total_ops;
self.max_latency_us = self.max_latency_us.max(latency_us);
if self.latency_samples.len() >= 20 {
let mut sorted = self.latency_samples.clone();
sorted.sort_unstable();
let p95_idx = (sorted.len() as f64 * 0.95) as usize;
let p99_idx = (sorted.len() as f64 * 0.99) as usize;
self.p95_latency_us = sorted[p95_idx.min(sorted.len() - 1)];
self.p99_latency_us = sorted[p99_idx.min(sorted.len() - 1)];
}
}
}
impl RealtimeCompressor {
pub fn new(config: RealtimeConfig) -> Result<Self> {
let algorithm = config.mode.preferred_algorithm();
let compressor = CompressorFactory::create(algorithm, None)?;
let fallback_compressor = Arc::new(Box::new(super::NoCompressor) as Box<dyn Compressor>);
Ok(Self {
config: config.clone(),
compressor: Arc::new(RwLock::new(compressor)),
fallback_compressor,
semaphore: Arc::new(Semaphore::new(config.max_concurrent)),
stats: Arc::new(RwLock::new(RealtimeStats::default())),
})
}
pub fn with_mode(mode: CompressionMode) -> Result<Self> {
let config = RealtimeConfig {
mode,
..Default::default()
};
Self::new(config)
}
pub async fn compress_with_deadline(&self, data: &[u8], deadline: Instant) -> Result<Vec<u8>> {
let start_time = Instant::now();
if Instant::now() >= deadline {
return self.handle_timeout(data, start_time).await;
}
let _permit = self
.semaphore
.acquire()
.await
.map_err(|_| ZiporaError::configuration("semaphore acquire failed"))?;
if Instant::now() >= deadline {
return self.handle_timeout(data, start_time).await;
}
let remaining_time = deadline.saturating_duration_since(Instant::now());
let compression_result =
tokio::time::timeout(remaining_time, self.compress_internal(data)).await;
let latency = start_time.elapsed();
let met_deadline = Instant::now() <= deadline;
if let Ok(mut stats) = self.stats.write() {
stats.update_latency(latency.as_micros() as u64, met_deadline);
}
match compression_result {
Ok(Ok(compressed)) => Ok(compressed),
Ok(Err(e)) => Err(e),
Err(_) => self.handle_timeout(data, start_time).await,
}
}
pub async fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
let deadline = Instant::now() + self.config.mode.target_latency();
self.compress_with_deadline(data, deadline).await
}
pub async fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
let compressor = self.compressor.read()
.map_err(|e| crate::error::ZiporaError::system_error(
format!("RealtimeCompressor: compressor RwLock poisoned: {}", e)
))?;
compressor.decompress(data)
}
pub async fn compress_batch(&self, items: Vec<&[u8]>) -> Result<Vec<Vec<u8>>> {
if items.is_empty() {
return Ok(Vec::new());
}
let deadline = Instant::now() + self.config.mode.target_latency();
let mut results = Vec::with_capacity(items.len());
for item in items {
let result = self.compress_with_deadline(item, deadline).await?;
results.push(result);
if Instant::now() >= deadline {
break;
}
}
Ok(results)
}
pub fn stats(&self) -> RealtimeStats {
self.stats.read()
.map(|s| s.clone())
.unwrap_or_default()
}
pub fn set_mode(&self, mode: CompressionMode) -> Result<()> {
let algorithm = mode.preferred_algorithm();
let new_compressor = CompressorFactory::create(algorithm, None)?;
{
let mut compressor = self.compressor.write()
.map_err(|e| crate::error::ZiporaError::system_error(
format!("RealtimeCompressor: compressor RwLock poisoned: {}", e)
))?;
*compressor = new_compressor;
}
Ok(())
}
pub fn can_meet_deadline(&self, data_size: usize, deadline: Duration) -> bool {
let algorithm = self.config.mode.preferred_algorithm();
let expected_time = data_size as f64 / algorithm.compression_speed();
Duration::from_secs_f64(expected_time) <= deadline
}
async fn compress_internal(&self, data: &[u8]) -> Result<Vec<u8>> {
if data.len() < 64 && self.config.mode == CompressionMode::UltraLowLatency {
return Ok(data.to_vec());
}
let compressor = self.compressor.read()
.map_err(|e| crate::error::ZiporaError::system_error(
format!("RealtimeCompressor: compressor RwLock poisoned: {}", e)
))?;
compressor.compress(data)
}
async fn handle_timeout(&self, data: &[u8], start_time: Instant) -> Result<Vec<u8>> {
let latency = start_time.elapsed();
if let Ok(mut stats) = self.stats.write() {
stats.update_latency(latency.as_micros() as u64, false);
stats.fallback_operations += 1;
}
if self.config.fallback_on_timeout {
self.fallback_compressor.compress(data)
} else {
Err(ZiporaError::configuration("compression deadline exceeded"))
}
}
}
pub struct RealtimeCompressorBuilder {
config: RealtimeConfig,
}
impl RealtimeCompressorBuilder {
pub fn new() -> Self {
Self {
config: RealtimeConfig::default(),
}
}
pub fn mode(mut self, mode: CompressionMode) -> Self {
self.config.mode = mode;
self
}
pub fn max_concurrent(mut self, max_concurrent: usize) -> Self {
self.config.max_concurrent = max_concurrent;
self
}
pub fn enable_deadlines(mut self, enable: bool) -> Self {
self.config.enable_deadlines = enable;
self
}
pub fn fallback_on_timeout(mut self, fallback: bool) -> Self {
self.config.fallback_on_timeout = fallback;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.config.batch_size = size;
self
}
pub fn build(self) -> Result<RealtimeCompressor> {
RealtimeCompressor::new(self.config)
}
}
impl Default for RealtimeCompressorBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio;
#[test]
fn test_compression_mode() {
assert!(
CompressionMode::UltraLowLatency.target_latency()
< CompressionMode::LowLatency.target_latency()
);
assert_eq!(
CompressionMode::LowLatency.preferred_algorithm(),
Algorithm::Lz4
);
}
#[test]
fn test_realtime_config() {
let config = RealtimeConfig::default();
assert_eq!(config.mode, CompressionMode::LowLatency);
assert!(config.max_concurrent > 0);
}
#[tokio::test]
async fn test_realtime_compressor_creation() {
let compressor = RealtimeCompressor::with_mode(CompressionMode::LowLatency).unwrap();
let stats = compressor.stats();
assert_eq!(stats.deadline_met, 0);
assert_eq!(stats.deadline_missed, 0);
}
#[tokio::test]
#[cfg(feature = "lz4")]
async fn test_realtime_compression() {
let compressor = RealtimeCompressor::with_mode(CompressionMode::LowLatency).unwrap();
let data = b"test data for real-time compression";
let compressed = compressor.compress(data).await.unwrap();
let decompressed = compressor.decompress(&compressed).await.unwrap();
assert_eq!(decompressed, data);
let stats = compressor.stats();
assert_eq!(stats.deadline_met + stats.deadline_missed, 1);
}
#[tokio::test]
#[cfg(feature = "lz4")]
async fn test_deadline_compression() {
let compressor = RealtimeCompressor::with_mode(CompressionMode::LowLatency).unwrap();
let data = b"test data for deadline-based compression";
let deadline = Instant::now() + Duration::from_millis(50);
let compressed = compressor
.compress_with_deadline(data, deadline)
.await
.unwrap();
let decompressed = compressor.decompress(&compressed).await.unwrap();
assert_eq!(decompressed, data);
}
#[tokio::test]
#[cfg(feature = "lz4")]
async fn test_batch_compression() {
let compressor = RealtimeCompressor::with_mode(CompressionMode::LowLatency).unwrap();
let items = vec![
b"item 1".as_slice(),
b"item 2".as_slice(),
b"item 3".as_slice(),
];
let compressed_items = compressor.compress_batch(items).await.unwrap();
assert_eq!(compressed_items.len(), 3);
}
#[tokio::test]
async fn test_timeout_handling() {
let compressor = RealtimeCompressor::with_mode(CompressionMode::UltraLowLatency).unwrap();
let data = vec![0u8; 10000]; let deadline = Instant::now();
let result = compressor.compress_with_deadline(&data, deadline).await;
match result {
Ok(_) => {
let stats = compressor.stats();
assert!(stats.fallback_operations > 0);
}
Err(_) => {
}
}
}
#[test]
fn test_deadline_prediction() {
let compressor = RealtimeCompressor::with_mode(CompressionMode::LowLatency).unwrap();
assert!(compressor.can_meet_deadline(1000, Duration::from_millis(10)));
assert!(!compressor.can_meet_deadline(10_000_000, Duration::from_micros(1)));
}
#[test]
fn test_statistics_tracking() {
let mut stats = RealtimeStats::default();
stats.update_latency(1000, true); stats.update_latency(5000, true); stats.update_latency(15000, false);
assert_eq!(stats.deadline_met, 2);
assert_eq!(stats.deadline_missed, 1);
assert!(stats.deadline_success_rate() > 0.6);
assert_eq!(stats.max_latency_us, 15000);
}
#[test]
fn test_builder_pattern() {
let compressor = RealtimeCompressorBuilder::new()
.mode(CompressionMode::Balanced)
.max_concurrent(8)
.enable_deadlines(false)
.fallback_on_timeout(false)
.build()
.unwrap();
assert_eq!(compressor.config.mode, CompressionMode::Balanced);
assert_eq!(compressor.config.max_concurrent, 8);
assert!(!compressor.config.enable_deadlines);
assert!(!compressor.config.fallback_on_timeout);
}
}