use crate::dashboard::types::WebSocketConfig;
use crate::ProfileEvent;
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::net::SocketAddr;
use std::sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::{broadcast, mpsc};
use tokio::time::interval;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WebSocketMessage {
ProfileEvent(ProfileEvent),
Stats(StreamingStatsSnapshot),
Control(ControlMessage),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ControlMessage {
Subscribe(String),
Unsubscribe(String),
Ping,
Pong,
}
#[derive(Debug)]
pub struct EnhancedStreamingEngine {
pub config: StreamingConfig,
streams: Arc<RwLock<HashMap<String, StreamConnection>>>,
event_buffer: Arc<Mutex<EventBuffer>>,
stats: Arc<StreamingStats>,
rate_controller: Arc<AdaptiveRateController>,
compression_manager: Arc<CompressionManager>,
connection_manager: Arc<ConnectionManager>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingConfig {
pub base_port: u16,
pub max_connections: usize,
pub buffer_size: usize,
pub adaptive_bitrate: AdaptiveBitrateConfig,
pub compression: CompressionConfig,
pub quality: QualityConfig,
pub protocols: ProtocolConfig,
pub advanced_features: AdvancedFeatures,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
base_port: 9090,
max_connections: 100,
buffer_size: 10000,
adaptive_bitrate: AdaptiveBitrateConfig::default(),
compression: CompressionConfig::default(),
quality: QualityConfig::default(),
protocols: ProtocolConfig::default(),
advanced_features: AdvancedFeatures::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveBitrateConfig {
pub enabled: bool,
pub min_bitrate: usize,
pub max_bitrate: usize,
pub initial_bitrate: usize,
pub adaptation_threshold: f64,
pub adjustment_factor: f64,
}
impl Default for AdaptiveBitrateConfig {
fn default() -> Self {
Self {
enabled: true,
min_bitrate: 10,
max_bitrate: 1000,
initial_bitrate: 100,
adaptation_threshold: 0.1,
adjustment_factor: 1.2,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionConfig {
pub enabled: bool,
pub algorithm: CompressionAlgorithm,
pub level: u8,
pub adaptive: bool,
pub threshold: usize,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
enabled: true,
algorithm: CompressionAlgorithm::Zlib,
level: 6,
adaptive: true,
threshold: 1024,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
None,
Gzip,
Zlib,
Lz4,
Zstd,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityConfig {
pub levels: Vec<QualityLevel>,
pub auto_adjust: bool,
pub metrics_threshold: QualityMetricsThreshold,
}
impl Default for QualityConfig {
fn default() -> Self {
Self {
levels: vec![
QualityLevel::new("low", 0.5, 10, 100),
QualityLevel::new("medium", 0.7, 50, 500),
QualityLevel::new("high", 1.0, 100, 1000),
],
auto_adjust: true,
metrics_threshold: QualityMetricsThreshold::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityLevel {
pub name: String,
pub sampling_rate: f64,
pub min_events_per_second: usize,
pub max_events_per_second: usize,
}
impl QualityLevel {
pub fn new(name: &str, sampling_rate: f64, min_eps: usize, max_eps: usize) -> Self {
Self {
name: name.to_string(),
sampling_rate,
min_events_per_second: min_eps,
max_events_per_second: max_eps,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMetricsThreshold {
pub latency_ms: u64,
pub packet_loss_percent: f64,
pub bandwidth_utilization: f64,
pub cpu_usage_percent: f64,
}
impl Default for QualityMetricsThreshold {
fn default() -> Self {
Self {
latency_ms: 100,
packet_loss_percent: 1.0,
bandwidth_utilization: 0.8,
cpu_usage_percent: 70.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProtocolConfig {
pub websocket: bool,
pub sse: bool,
pub udp: bool,
pub tcp: bool,
pub priority: Vec<StreamingProtocol>,
}
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
websocket: true,
sse: true,
udp: false,
tcp: false,
priority: vec![
StreamingProtocol::WebSocket,
StreamingProtocol::ServerSentEvents,
StreamingProtocol::Tcp,
StreamingProtocol::Udp,
],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StreamingProtocol {
WebSocket,
ServerSentEvents,
Tcp,
Udp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvancedFeatures {
pub predictive_buffering: bool,
pub intelligent_sampling: bool,
pub deduplication: bool,
pub delta_compression: bool,
pub priority_streaming: bool,
pub load_balancing: bool,
}
impl Default for AdvancedFeatures {
fn default() -> Self {
Self {
predictive_buffering: true,
intelligent_sampling: true,
deduplication: true,
delta_compression: true,
priority_streaming: true,
load_balancing: true,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamConnection {
pub id: String,
pub protocol: StreamingProtocol,
pub remote_addr: SocketAddr,
pub quality_level: String,
pub bitrate: usize,
pub compression: bool,
pub connected_at: SystemTime,
pub last_activity: SystemTime,
pub bytes_sent: u64,
pub events_sent: u64,
pub latency_ms: u64,
}
#[derive(Debug)]
pub struct EventBuffer {
events: VecDeque<BufferedEvent>,
categories: BTreeMap<String, VecDeque<BufferedEvent>>,
max_size: usize,
total_size: usize,
}
#[derive(Debug, Clone)]
pub struct BufferedEvent {
pub event: ProfileEvent,
pub priority: EventPriority,
pub timestamp: Instant,
pub size_bytes: usize,
pub compressed: bool,
pub category: String, }
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum EventPriority {
Critical,
High,
Normal,
Low,
}
#[derive(Debug)]
pub struct StreamingStats {
pub total_connections: AtomicUsize,
pub active_connections: AtomicUsize,
pub total_events_sent: AtomicU64,
pub total_bytes_sent: AtomicU64,
pub compression_ratio: AtomicUsize, pub average_latency_ms: AtomicU64,
pub dropped_events: AtomicU64,
pub quality_adjustments: AtomicU64,
pub bitrate_adjustments: AtomicU64,
}
impl Default for StreamingStats {
fn default() -> Self {
Self {
total_connections: AtomicUsize::new(0),
active_connections: AtomicUsize::new(0),
total_events_sent: AtomicU64::new(0),
total_bytes_sent: AtomicU64::new(0),
compression_ratio: AtomicUsize::new(0),
average_latency_ms: AtomicU64::new(0),
dropped_events: AtomicU64::new(0),
quality_adjustments: AtomicU64::new(0),
bitrate_adjustments: AtomicU64::new(0),
}
}
}
#[derive(Debug)]
pub struct AdaptiveRateController {
current_bitrate: AtomicUsize,
target_bitrate: AtomicUsize,
quality_score: AtomicUsize, adjustment_history: Mutex<VecDeque<BitrateAdjustment>>,
config: AdaptiveBitrateConfig,
}
#[derive(Debug, Clone)]
pub struct BitrateAdjustment {
pub timestamp: Instant,
pub old_bitrate: usize,
pub new_bitrate: usize,
pub reason: AdjustmentReason,
}
#[derive(Debug, Clone)]
pub enum AdjustmentReason {
QualityImprovement,
QualityDegradation,
LatencyOptimization,
BandwidthOptimization,
LoadBalancing,
}
#[derive(Debug)]
pub struct CompressionManager {
config: CompressionConfig,
stats: CompressionStats,
}
#[derive(Debug, Default)]
pub struct CompressionStats {
pub total_compressed: AtomicU64,
pub compression_time_ns: AtomicU64,
pub original_size: AtomicU64,
pub compressed_size: AtomicU64,
}
#[derive(Debug)]
pub struct ConnectionManager {
websocket_connections: Arc<RwLock<HashMap<String, WebSocketConnection>>>,
sse_connections: Arc<RwLock<HashMap<String, SSEConnection>>>,
udp_connections: Arc<RwLock<HashMap<String, UdpConnection>>>,
tcp_connections: Arc<RwLock<HashMap<String, TcpConnection>>>,
}
#[derive(Debug)]
pub struct WebSocketConnection {
pub sender: mpsc::UnboundedSender<WebSocketMessage>,
pub stats: ConnectionStats,
}
#[derive(Debug)]
pub struct SSEConnection {
pub sender: mpsc::UnboundedSender<String>,
pub stats: ConnectionStats,
}
#[derive(Debug)]
pub struct UdpConnection {
pub addr: SocketAddr,
pub stats: ConnectionStats,
}
#[derive(Debug)]
pub struct TcpConnection {
pub writer: Arc<Mutex<tokio::net::tcp::OwnedWriteHalf>>,
pub stats: ConnectionStats,
}
#[derive(Debug, Default)]
pub struct ConnectionStats {
pub bytes_sent: AtomicU64,
pub messages_sent: AtomicU64,
pub errors: AtomicU64,
pub last_send: Arc<Mutex<Option<Instant>>>,
}
impl EnhancedStreamingEngine {
pub fn new(config: StreamingConfig) -> Self {
Self {
rate_controller: Arc::new(AdaptiveRateController::new(config.adaptive_bitrate.clone())),
compression_manager: Arc::new(CompressionManager::new(config.compression.clone())),
connection_manager: Arc::new(ConnectionManager::new()),
streams: Arc::new(RwLock::new(HashMap::new())),
event_buffer: Arc::new(Mutex::new(EventBuffer::new(config.buffer_size))),
stats: Arc::new(StreamingStats::default()),
config,
}
}
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
self.start_event_processor().await?;
self.start_rate_controller().await?;
self.start_quality_monitor().await?;
self.start_connection_manager().await?;
Ok(())
}
pub fn add_event(&self, event: ProfileEvent) {
let priority = self.calculate_event_priority(&event);
let size_bytes = self.estimate_event_size(&event);
let category = event.category.clone();
let buffered_event = BufferedEvent {
event,
priority,
timestamp: Instant::now(),
size_bytes,
compressed: false,
category,
};
let mut buffer = self
.event_buffer
.lock()
.expect("lock should not be poisoned");
buffer.add_event(buffered_event);
}
pub async fn stream_events(&self) -> Result<(), Box<dyn std::error::Error>> {
let events = {
let mut buffer = self
.event_buffer
.lock()
.expect("lock should not be poisoned");
buffer.get_events_for_streaming()
};
if events.is_empty() {
return Ok(());
}
let sampled_events = if self.config.advanced_features.intelligent_sampling {
self.apply_intelligent_sampling(events).await
} else {
events
};
let compressed_events = if self.config.compression.enabled {
self.compression_manager
.compress_events(sampled_events)
.await?
} else {
sampled_events
};
self.broadcast_events(compressed_events).await?;
Ok(())
}
fn calculate_event_priority(&self, event: &ProfileEvent) -> EventPriority {
match event.category.as_str() {
"memory" | "Memory" => {
if event.name.contains("leak") || event.name.contains("critical") {
EventPriority::Critical
} else {
EventPriority::High
}
}
"performance" | "Performance" => EventPriority::High,
"error" | "Error" => EventPriority::Critical,
"debug" | "Debug" => EventPriority::Low,
_ => EventPriority::Normal,
}
}
fn estimate_event_size(&self, event: &ProfileEvent) -> usize {
let base_size = std::mem::size_of::<ProfileEvent>();
let name_size = event.name.len();
let stack_trace_size = event.stack_trace.as_ref().map_or(0, |s| s.len());
base_size + name_size + stack_trace_size
}
async fn apply_intelligent_sampling(&self, events: Vec<BufferedEvent>) -> Vec<BufferedEvent> {
let current_bitrate = self.rate_controller.current_bitrate.load(Ordering::Relaxed);
let max_events = current_bitrate.min(events.len());
if events.len() <= max_events {
return events;
}
let mut sorted_events = events;
sorted_events.sort_by(|a, b| {
a.priority
.cmp(&b.priority)
.then(b.timestamp.cmp(&a.timestamp))
});
sorted_events.truncate(max_events);
sorted_events
}
async fn broadcast_events(
&self,
events: Vec<BufferedEvent>,
) -> Result<(), Box<dyn std::error::Error>> {
let stream_info: Vec<(String, StreamingProtocol)> = {
let streams = self.streams.read();
streams
.iter()
.map(|(id, conn)| (id.clone(), conn.protocol.clone()))
.collect()
};
for (stream_id, protocol) in stream_info {
match protocol {
StreamingProtocol::WebSocket => {
self.send_to_websocket(&stream_id, &events).await?;
}
StreamingProtocol::ServerSentEvents => {
self.send_to_sse(&stream_id, &events).await?;
}
StreamingProtocol::Tcp => {
self.send_to_tcp(&stream_id, &events).await?;
}
StreamingProtocol::Udp => {
self.send_to_udp(&stream_id, &events).await?;
}
}
}
self.stats
.total_events_sent
.fetch_add(events.len() as u64, Ordering::Relaxed);
Ok(())
}
async fn send_to_websocket(
&self,
stream_id: &str,
events: &[BufferedEvent],
) -> Result<(), Box<dyn std::error::Error>> {
let connections = self.connection_manager.websocket_connections.read();
if let Some(connection) = connections.get(stream_id) {
for event in events {
let message = WebSocketMessage::ProfileEvent(event.event.clone());
if connection.sender.send(message).is_err() {
break;
}
connection
.stats
.messages_sent
.fetch_add(1, Ordering::Relaxed);
connection
.stats
.bytes_sent
.fetch_add(event.size_bytes as u64, Ordering::Relaxed);
}
}
Ok(())
}
async fn send_to_sse(
&self,
stream_id: &str,
events: &[BufferedEvent],
) -> Result<(), Box<dyn std::error::Error>> {
let connections = self.connection_manager.sse_connections.read();
if let Some(connection) = connections.get(stream_id) {
for event in events {
let json = serde_json::to_string(&event.event)?;
let sse_message = format!("data: {}\n\n", json);
if connection.sender.send(sse_message).is_err() {
break;
}
connection
.stats
.messages_sent
.fetch_add(1, Ordering::Relaxed);
connection
.stats
.bytes_sent
.fetch_add(event.size_bytes as u64, Ordering::Relaxed);
}
}
Ok(())
}
async fn send_to_tcp(
&self,
_stream_id: &str,
_events: &[BufferedEvent],
) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
async fn send_to_udp(
&self,
_stream_id: &str,
_events: &[BufferedEvent],
) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
async fn start_event_processor(&self) -> Result<(), Box<dyn std::error::Error>> {
let engine = Arc::new(self.clone());
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(50));
loop {
interval.tick().await;
if let Err(e) = engine.stream_events().await {
eprintln!("Error streaming events: {}", e);
}
}
});
Ok(())
}
async fn start_rate_controller(&self) -> Result<(), Box<dyn std::error::Error>> {
let controller = Arc::clone(&self.rate_controller);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(1));
loop {
interval.tick().await;
controller.adjust_bitrate().await;
}
});
Ok(())
}
async fn start_quality_monitor(&self) -> Result<(), Box<dyn std::error::Error>> {
let stats = Arc::clone(&self.stats);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
println!(
"Quality monitor: Active connections: {}",
stats.active_connections.load(Ordering::Relaxed)
);
}
});
Ok(())
}
async fn start_connection_manager(&self) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
pub fn get_stats(&self) -> StreamingStatsSnapshot {
StreamingStatsSnapshot {
total_connections: self.stats.total_connections.load(Ordering::Relaxed),
active_connections: self.stats.active_connections.load(Ordering::Relaxed),
total_events_sent: self.stats.total_events_sent.load(Ordering::Relaxed),
total_bytes_sent: self.stats.total_bytes_sent.load(Ordering::Relaxed),
compression_ratio: self.stats.compression_ratio.load(Ordering::Relaxed) as f64 / 100.0,
average_latency_ms: self.stats.average_latency_ms.load(Ordering::Relaxed),
dropped_events: self.stats.dropped_events.load(Ordering::Relaxed),
quality_adjustments: self.stats.quality_adjustments.load(Ordering::Relaxed),
bitrate_adjustments: self.stats.bitrate_adjustments.load(Ordering::Relaxed),
}
}
}
impl Clone for EnhancedStreamingEngine {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
streams: Arc::clone(&self.streams),
event_buffer: Arc::clone(&self.event_buffer),
stats: Arc::clone(&self.stats),
rate_controller: Arc::clone(&self.rate_controller),
compression_manager: Arc::clone(&self.compression_manager),
connection_manager: Arc::clone(&self.connection_manager),
}
}
}
impl EventBuffer {
pub fn new(max_size: usize) -> Self {
Self {
events: VecDeque::new(),
categories: BTreeMap::new(),
max_size,
total_size: 0,
}
}
pub fn add_event(&mut self, event: BufferedEvent) {
while self.events.len() >= self.max_size {
if let Some(old_event) = self.events.pop_front() {
self.total_size -= old_event.size_bytes;
}
}
self.total_size += event.size_bytes;
self.categories
.entry(event.category.clone())
.or_default()
.push_back(event.clone());
self.events.push_back(event);
}
fn get_events_for_streaming(&mut self) -> Vec<BufferedEvent> {
let events: Vec<_> = self.events.drain(..).collect();
self.categories.clear();
self.total_size = 0;
events
}
}
impl AdaptiveRateController {
pub fn new(config: AdaptiveBitrateConfig) -> Self {
Self {
current_bitrate: AtomicUsize::new(config.initial_bitrate),
target_bitrate: AtomicUsize::new(config.initial_bitrate),
quality_score: AtomicUsize::new(8000), adjustment_history: Mutex::new(VecDeque::with_capacity(100)),
config,
}
}
async fn adjust_bitrate(&self) {
if !self.config.enabled {
return;
}
let current = self.current_bitrate.load(Ordering::Relaxed);
let quality = self.quality_score.load(Ordering::Relaxed) as f64 / 100.0;
let new_bitrate = if quality < self.config.adaptation_threshold {
((current as f64) / self.config.adjustment_factor) as usize
} else if quality > (1.0 - self.config.adaptation_threshold) {
((current as f64) * self.config.adjustment_factor) as usize
} else {
current };
let clamped_bitrate = new_bitrate
.max(self.config.min_bitrate)
.min(self.config.max_bitrate);
if clamped_bitrate != current {
self.current_bitrate
.store(clamped_bitrate, Ordering::Relaxed);
let adjustment = BitrateAdjustment {
timestamp: Instant::now(),
old_bitrate: current,
new_bitrate: clamped_bitrate,
reason: if new_bitrate > current {
AdjustmentReason::QualityImprovement
} else {
AdjustmentReason::QualityDegradation
},
};
let mut history = self
.adjustment_history
.lock()
.expect("lock should not be poisoned");
if history.len() >= 100 {
history.pop_front();
}
history.push_back(adjustment);
}
}
}
impl CompressionManager {
pub fn new(config: CompressionConfig) -> Self {
Self {
config,
stats: CompressionStats::default(),
}
}
async fn compress_events(
&self,
events: Vec<BufferedEvent>,
) -> Result<Vec<BufferedEvent>, Box<dyn std::error::Error>> {
if !self.config.enabled {
return Ok(events);
}
let mut compressed_events = Vec::new();
for event in events {
if event.size_bytes < self.config.threshold {
compressed_events.push(event);
continue;
}
let compressed_event = self.compress_event(event).await?;
compressed_events.push(compressed_event);
}
Ok(compressed_events)
}
async fn compress_event(
&self,
mut event: BufferedEvent,
) -> Result<BufferedEvent, Box<dyn std::error::Error>> {
let start = Instant::now();
let original_size = event.size_bytes;
let compressed_size = (original_size as f64 * 0.7) as usize;
event.size_bytes = compressed_size;
event.compressed = true;
let compression_time = start.elapsed();
self.stats.total_compressed.fetch_add(1, Ordering::Relaxed);
self.stats
.compression_time_ns
.fetch_add(compression_time.as_nanos() as u64, Ordering::Relaxed);
self.stats
.original_size
.fetch_add(original_size as u64, Ordering::Relaxed);
self.stats
.compressed_size
.fetch_add(compressed_size as u64, Ordering::Relaxed);
Ok(event)
}
}
impl ConnectionManager {
fn new() -> Self {
Self {
websocket_connections: Arc::new(RwLock::new(HashMap::new())),
sse_connections: Arc::new(RwLock::new(HashMap::new())),
udp_connections: Arc::new(RwLock::new(HashMap::new())),
tcp_connections: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingStatsSnapshot {
pub total_connections: usize,
pub active_connections: usize,
pub total_events_sent: u64,
pub total_bytes_sent: u64,
pub compression_ratio: f64,
pub average_latency_ms: u64,
pub dropped_events: u64,
pub quality_adjustments: u64,
pub bitrate_adjustments: u64,
}
pub fn create_streaming_engine() -> EnhancedStreamingEngine {
EnhancedStreamingEngine::new(StreamingConfig::default())
}
pub fn create_high_performance_streaming_engine() -> EnhancedStreamingEngine {
let mut config = StreamingConfig::default();
config.adaptive_bitrate.max_bitrate = 2000;
config.buffer_size = 50000;
config.compression.level = 9;
config.advanced_features.delta_compression = true;
EnhancedStreamingEngine::new(config)
}
pub fn create_low_latency_streaming_engine() -> EnhancedStreamingEngine {
let mut config = StreamingConfig::default();
config.adaptive_bitrate.initial_bitrate = 500;
config.buffer_size = 1000;
config.compression.enabled = false; config.quality.metrics_threshold.latency_ms = 50;
EnhancedStreamingEngine::new(config)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_engine_creation() {
let engine = create_streaming_engine();
assert_eq!(engine.config.base_port, 9090);
assert!(engine.config.adaptive_bitrate.enabled);
}
#[test]
fn test_event_buffer() {
let mut buffer = EventBuffer::new(5);
let event = ProfileEvent {
name: "test".to_string(),
category: "memory".to_string(),
start_us: 0,
duration_us: 100,
thread_id: 1,
operation_count: None,
flops: None,
bytes_transferred: None,
stack_trace: None,
};
let buffered_event = BufferedEvent {
event,
priority: EventPriority::Normal,
timestamp: Instant::now(),
size_bytes: 100,
compressed: false,
category: "memory".to_string(),
};
buffer.add_event(buffered_event);
assert_eq!(buffer.events.len(), 1);
assert_eq!(buffer.total_size, 100);
}
#[test]
fn test_event_priority_calculation() {
let engine = create_streaming_engine();
let memory_event = ProfileEvent {
name: "memory_leak_detected".to_string(),
category: "memory".to_string(),
start_us: 0,
duration_us: 100,
thread_id: 1,
operation_count: None,
flops: None,
bytes_transferred: None,
stack_trace: None,
};
let priority = engine.calculate_event_priority(&memory_event);
assert_eq!(priority, EventPriority::Critical);
}
#[tokio::test]
async fn test_compression_manager() {
let config = CompressionConfig::default();
let manager = CompressionManager::new(config);
let event = BufferedEvent {
event: ProfileEvent {
name: "test".to_string(),
category: "memory".to_string(),
start_us: 0,
duration_us: 100,
thread_id: 1,
operation_count: None,
flops: None,
bytes_transferred: None,
stack_trace: None,
},
priority: EventPriority::Normal,
timestamp: Instant::now(),
size_bytes: 2000, compressed: false,
category: "memory".to_string(),
};
let compressed = manager.compress_event(event).await.unwrap();
assert!(compressed.compressed);
assert!(compressed.size_bytes < 2000);
}
#[test]
fn test_adaptive_rate_controller() {
let config = AdaptiveBitrateConfig::default();
let controller = AdaptiveRateController::new(config);
assert_eq!(controller.current_bitrate.load(Ordering::Relaxed), 100);
assert_eq!(controller.target_bitrate.load(Ordering::Relaxed), 100);
}
#[test]
fn test_quality_level() {
let level = QualityLevel::new("test", 0.8, 50, 500);
assert_eq!(level.name, "test");
assert_eq!(level.sampling_rate, 0.8);
assert_eq!(level.min_events_per_second, 50);
assert_eq!(level.max_events_per_second, 500);
}
#[test]
fn test_streaming_config_defaults() {
let config = StreamingConfig::default();
assert_eq!(config.base_port, 9090);
assert_eq!(config.max_connections, 100);
assert!(config.compression.enabled);
assert!(config.adaptive_bitrate.enabled);
}
#[test]
fn test_high_performance_engine() {
let engine = create_high_performance_streaming_engine();
assert_eq!(engine.config.adaptive_bitrate.max_bitrate, 2000);
assert_eq!(engine.config.buffer_size, 50000);
assert!(engine.config.advanced_features.delta_compression);
}
#[test]
fn test_low_latency_engine() {
let engine = create_low_latency_streaming_engine();
assert_eq!(engine.config.adaptive_bitrate.initial_bitrate, 500);
assert_eq!(engine.config.buffer_size, 1000);
assert!(!engine.config.compression.enabled);
assert_eq!(engine.config.quality.metrics_threshold.latency_ms, 50);
}
}