Skip to main content

torsh_profiler/
streaming.rs

1//! Enhanced Real-time Streaming Capabilities
2//!
3//! This module provides advanced real-time streaming features including
4//! adaptive bitrate streaming, data compression, intelligent buffering,
5//! and multi-protocol streaming support for profiling data.
6//!
7//! # Features
8//!
9//! - **Adaptive Bitrate Streaming**: Automatically adjusts streaming rate based on network conditions
10//! - **Multiple Compression Algorithms**: Gzip, Zlib, Lz4, Zstd with adaptive compression
11//! - **Intelligent Buffering**: Priority-based event buffering with overflow management
12//! - **Multi-Protocol Support**: WebSocket, SSE, TCP, UDP protocols
13//! - **Quality Adaptation**: Dynamic quality adjustment based on bandwidth and latency
14//! - **Connection Management**: Handle multiple concurrent client connections
15//! - **Statistics Tracking**: Real-time metrics on throughput, latency, and compression ratios
16//!
17//! # Quick Start
18//!
19//! ```rust
20//! use torsh_profiler::create_streaming_engine;
21//!
22//! // Create a basic streaming engine with default configuration
23//! let engine = create_streaming_engine();
24//! let stats = engine.get_stats();
25//! println!("Active connections: {}", stats.active_connections);
26//! ```
27//!
28//! # Factory Functions
29//!
30//! Three convenience functions create pre-configured engines for common use cases:
31//!
32//! ```rust
33//! use torsh_profiler::{
34//!     create_streaming_engine,
35//!     create_high_performance_streaming_engine,
36//!     create_low_latency_streaming_engine,
37//! };
38//!
39//! // 1. Default balanced configuration
40//! let default_engine = create_streaming_engine();
41//!
42//! // 2. High-performance: Optimized for maximum throughput
43//! //    - 50,000 event buffer
44//! //    - 2,000 events/sec max bitrate
45//! //    - Level 9 compression
46//! let hp_engine = create_high_performance_streaming_engine();
47//!
48//! // 3. Low-latency: Optimized for minimal delay
49//! //    - 1,000 event buffer
50//! //    - Compression disabled
51//! //    - 50ms latency target
52//! let ll_engine = create_low_latency_streaming_engine();
53//! ```
54//!
55//! # Custom Configuration
56//!
57//! ```rust
58//! use torsh_profiler::{
59//!     EnhancedStreamingEngine, StreamingConfig, AdaptiveBitrateConfig,
60//!     CompressionConfig, CompressionAlgorithm, QualityConfig,
61//!     ProtocolConfig, AdvancedFeatures,
62//! };
63//!
64//! let config = StreamingConfig {
65//!     base_port: 8080,
66//!     max_connections: 50,
67//!     buffer_size: 5000,
68//!     adaptive_bitrate: AdaptiveBitrateConfig {
69//!         enabled: true,
70//!         min_bitrate: 50,
71//!         max_bitrate: 500,
72//!         initial_bitrate: 100,
73//!         adaptation_threshold: 0.15,
74//!         adjustment_factor: 1.5,
75//!     },
76//!     compression: CompressionConfig {
77//!         enabled: true,
78//!         algorithm: CompressionAlgorithm::Lz4,
79//!         level: 3,
80//!         adaptive: false,
81//!         threshold: 512,
82//!     },
83//!     quality: QualityConfig::default(),
84//!     protocols: ProtocolConfig::default(),
85//!     advanced_features: AdvancedFeatures::default(),
86//! };
87//!
88//! let engine = EnhancedStreamingEngine::new(config);
89//! ```
90//!
91//! # Compression Algorithms
92//!
93//! ```rust
94//! use torsh_profiler::{CompressionAlgorithm, CompressionConfig};
95//!
96//! // Available compression algorithms:
97//! // - None: No compression (best for latency)
98//! // - Gzip: Standard gzip compression (good balance)
99//! // - Zlib: Similar to gzip (slightly faster)
100//! // - Lz4: Very fast compression (best for throughput)
101//! // - Zstd: Modern compression (best ratio)
102//!
103//! let compression = CompressionConfig {
104//!     enabled: true,
105//!     algorithm: CompressionAlgorithm::Zstd,
106//!     level: 6,  // 0-9, higher = better compression but slower
107//!     adaptive: true,  // Automatically adjust based on performance
108//!     threshold: 1024,  // Only compress events larger than 1KB
109//! };
110//! ```
111//!
112//! # Statistics and Monitoring
113//!
114//! ```rust
115//! use torsh_profiler::create_streaming_engine;
116//!
117//! let engine = create_streaming_engine();
118//! let stats = engine.get_stats();
119//!
120//! println!("Total events sent: {}", stats.total_events_sent);
121//! println!("Total bytes sent: {}", stats.total_bytes_sent);
122//! println!("Active connections: {}", stats.active_connections);
123//! println!("Average latency: {}ms", stats.average_latency_ms);
124//! println!("Compression ratio: {}%", stats.compression_ratio);
125//! println!("Dropped events: {}", stats.dropped_events);
126//! ```
127
128use crate::dashboard::types::WebSocketConfig;
129use crate::ProfileEvent;
130use futures_util::{SinkExt, StreamExt};
131use parking_lot::RwLock;
132use serde::{Deserialize, Serialize};
133use std::collections::{BTreeMap, HashMap, VecDeque};
134use std::net::SocketAddr;
135use std::sync::{
136    atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
137    Arc, Mutex,
138};
139use std::time::{Duration, Instant, SystemTime};
140use tokio::sync::{broadcast, mpsc};
141use tokio::time::interval;
142
143/// WebSocket message types for streaming
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub enum WebSocketMessage {
146    ProfileEvent(ProfileEvent),
147    Stats(StreamingStatsSnapshot),
148    Control(ControlMessage),
149}
150
151/// Control messages for WebSocket communication
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum ControlMessage {
154    Subscribe(String),
155    Unsubscribe(String),
156    Ping,
157    Pong,
158}
159
160/// Enhanced streaming engine with adaptive capabilities
161#[derive(Debug)]
162pub struct EnhancedStreamingEngine {
163    /// Configuration
164    pub config: StreamingConfig,
165    /// Active streams
166    streams: Arc<RwLock<HashMap<String, StreamConnection>>>,
167    /// Event buffer for intelligent batching
168    event_buffer: Arc<Mutex<EventBuffer>>,
169    /// Statistics
170    stats: Arc<StreamingStats>,
171    /// Adaptive rate controller
172    rate_controller: Arc<AdaptiveRateController>,
173    /// Compression manager
174    compression_manager: Arc<CompressionManager>,
175    /// Connection manager
176    connection_manager: Arc<ConnectionManager>,
177}
178
179/// Streaming configuration with adaptive features
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct StreamingConfig {
182    /// Base port for streaming
183    pub base_port: u16,
184    /// Maximum concurrent connections
185    pub max_connections: usize,
186    /// Buffer size for events
187    pub buffer_size: usize,
188    /// Adaptive bitrate settings
189    pub adaptive_bitrate: AdaptiveBitrateConfig,
190    /// Compression settings
191    pub compression: CompressionConfig,
192    /// Quality settings
193    pub quality: QualityConfig,
194    /// Protocol settings
195    pub protocols: ProtocolConfig,
196    /// Advanced features
197    pub advanced_features: AdvancedFeatures,
198}
199
200impl Default for StreamingConfig {
201    fn default() -> Self {
202        Self {
203            base_port: 9090,
204            max_connections: 100,
205            buffer_size: 10000,
206            adaptive_bitrate: AdaptiveBitrateConfig::default(),
207            compression: CompressionConfig::default(),
208            quality: QualityConfig::default(),
209            protocols: ProtocolConfig::default(),
210            advanced_features: AdvancedFeatures::default(),
211        }
212    }
213}
214
215/// Adaptive bitrate configuration
216#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct AdaptiveBitrateConfig {
218    /// Enable adaptive bitrate streaming
219    pub enabled: bool,
220    /// Minimum bitrate (events per second)
221    pub min_bitrate: usize,
222    /// Maximum bitrate (events per second)
223    pub max_bitrate: usize,
224    /// Initial bitrate
225    pub initial_bitrate: usize,
226    /// Quality adaptation threshold
227    pub adaptation_threshold: f64,
228    /// Bitrate adjustment factor
229    pub adjustment_factor: f64,
230}
231
232impl Default for AdaptiveBitrateConfig {
233    fn default() -> Self {
234        Self {
235            enabled: true,
236            min_bitrate: 10,
237            max_bitrate: 1000,
238            initial_bitrate: 100,
239            adaptation_threshold: 0.1,
240            adjustment_factor: 1.2,
241        }
242    }
243}
244
245/// Compression configuration
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct CompressionConfig {
248    /// Enable compression
249    pub enabled: bool,
250    /// Compression algorithm
251    pub algorithm: CompressionAlgorithm,
252    /// Compression level (0-9)
253    pub level: u8,
254    /// Enable adaptive compression
255    pub adaptive: bool,
256    /// Compression threshold (bytes)
257    pub threshold: usize,
258}
259
260impl Default for CompressionConfig {
261    fn default() -> Self {
262        Self {
263            enabled: true,
264            algorithm: CompressionAlgorithm::Zlib,
265            level: 6,
266            adaptive: true,
267            threshold: 1024,
268        }
269    }
270}
271
272/// Compression algorithms
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum CompressionAlgorithm {
275    None,
276    Gzip,
277    Zlib,
278    Lz4,
279    Zstd,
280}
281
282/// Quality configuration for streaming
283#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct QualityConfig {
285    /// Quality levels
286    pub levels: Vec<QualityLevel>,
287    /// Auto quality adjustment
288    pub auto_adjust: bool,
289    /// Quality metrics threshold
290    pub metrics_threshold: QualityMetricsThreshold,
291}
292
293impl Default for QualityConfig {
294    fn default() -> Self {
295        Self {
296            levels: vec![
297                QualityLevel::new("low", 0.5, 10, 100),
298                QualityLevel::new("medium", 0.7, 50, 500),
299                QualityLevel::new("high", 1.0, 100, 1000),
300            ],
301            auto_adjust: true,
302            metrics_threshold: QualityMetricsThreshold::default(),
303        }
304    }
305}
306
307/// Quality level definition
308#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct QualityLevel {
310    pub name: String,
311    pub sampling_rate: f64,
312    pub min_events_per_second: usize,
313    pub max_events_per_second: usize,
314}
315
316impl QualityLevel {
317    pub fn new(name: &str, sampling_rate: f64, min_eps: usize, max_eps: usize) -> Self {
318        Self {
319            name: name.to_string(),
320            sampling_rate,
321            min_events_per_second: min_eps,
322            max_events_per_second: max_eps,
323        }
324    }
325}
326
327/// Quality metrics threshold
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct QualityMetricsThreshold {
330    pub latency_ms: u64,
331    pub packet_loss_percent: f64,
332    pub bandwidth_utilization: f64,
333    pub cpu_usage_percent: f64,
334}
335
336impl Default for QualityMetricsThreshold {
337    fn default() -> Self {
338        Self {
339            latency_ms: 100,
340            packet_loss_percent: 1.0,
341            bandwidth_utilization: 0.8,
342            cpu_usage_percent: 70.0,
343        }
344    }
345}
346
347/// Protocol configuration
348#[derive(Debug, Clone, Serialize, Deserialize)]
349pub struct ProtocolConfig {
350    /// Enable WebSocket streaming
351    pub websocket: bool,
352    /// Enable Server-Sent Events
353    pub sse: bool,
354    /// Enable UDP streaming
355    pub udp: bool,
356    /// Enable TCP streaming
357    pub tcp: bool,
358    /// Protocol priority
359    pub priority: Vec<StreamingProtocol>,
360}
361
362impl Default for ProtocolConfig {
363    fn default() -> Self {
364        Self {
365            websocket: true,
366            sse: true,
367            udp: false,
368            tcp: false,
369            priority: vec![
370                StreamingProtocol::WebSocket,
371                StreamingProtocol::ServerSentEvents,
372                StreamingProtocol::Tcp,
373                StreamingProtocol::Udp,
374            ],
375        }
376    }
377}
378
379/// Streaming protocols
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub enum StreamingProtocol {
382    WebSocket,
383    ServerSentEvents,
384    Tcp,
385    Udp,
386}
387
388/// Advanced streaming features
389#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct AdvancedFeatures {
391    /// Enable predictive buffering
392    pub predictive_buffering: bool,
393    /// Enable intelligent sampling
394    pub intelligent_sampling: bool,
395    /// Enable data deduplication
396    pub deduplication: bool,
397    /// Enable delta compression
398    pub delta_compression: bool,
399    /// Enable priority streaming
400    pub priority_streaming: bool,
401    /// Enable load balancing
402    pub load_balancing: bool,
403}
404
405impl Default for AdvancedFeatures {
406    fn default() -> Self {
407        Self {
408            predictive_buffering: true,
409            intelligent_sampling: true,
410            deduplication: true,
411            delta_compression: true,
412            priority_streaming: true,
413            load_balancing: true,
414        }
415    }
416}
417
418/// Stream connection information
419#[derive(Debug, Clone)]
420pub struct StreamConnection {
421    pub id: String,
422    pub protocol: StreamingProtocol,
423    pub remote_addr: SocketAddr,
424    pub quality_level: String,
425    pub bitrate: usize,
426    pub compression: bool,
427    pub connected_at: SystemTime,
428    pub last_activity: SystemTime,
429    pub bytes_sent: u64,
430    pub events_sent: u64,
431    pub latency_ms: u64,
432}
433
434/// Event buffer for intelligent batching
435#[derive(Debug)]
436pub struct EventBuffer {
437    events: VecDeque<BufferedEvent>,
438    categories: BTreeMap<String, VecDeque<BufferedEvent>>,
439    max_size: usize,
440    total_size: usize,
441}
442
443/// Buffered event with metadata
444#[derive(Debug, Clone)]
445pub struct BufferedEvent {
446    pub event: ProfileEvent,
447    pub priority: EventPriority,
448    pub timestamp: Instant,
449    pub size_bytes: usize,
450    pub compressed: bool,
451    pub category: String, // Store category as String
452}
453
454/// Event priority levels
455#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
456pub enum EventPriority {
457    Critical,
458    High,
459    Normal,
460    Low,
461}
462
463/// Streaming statistics
464#[derive(Debug)]
465pub struct StreamingStats {
466    pub total_connections: AtomicUsize,
467    pub active_connections: AtomicUsize,
468    pub total_events_sent: AtomicU64,
469    pub total_bytes_sent: AtomicU64,
470    pub compression_ratio: AtomicUsize, // percentage * 100
471    pub average_latency_ms: AtomicU64,
472    pub dropped_events: AtomicU64,
473    pub quality_adjustments: AtomicU64,
474    pub bitrate_adjustments: AtomicU64,
475}
476
477impl Default for StreamingStats {
478    fn default() -> Self {
479        Self {
480            total_connections: AtomicUsize::new(0),
481            active_connections: AtomicUsize::new(0),
482            total_events_sent: AtomicU64::new(0),
483            total_bytes_sent: AtomicU64::new(0),
484            compression_ratio: AtomicUsize::new(0),
485            average_latency_ms: AtomicU64::new(0),
486            dropped_events: AtomicU64::new(0),
487            quality_adjustments: AtomicU64::new(0),
488            bitrate_adjustments: AtomicU64::new(0),
489        }
490    }
491}
492
493/// Adaptive rate controller
494#[derive(Debug)]
495pub struct AdaptiveRateController {
496    current_bitrate: AtomicUsize,
497    target_bitrate: AtomicUsize,
498    quality_score: AtomicUsize, // percentage * 100
499    adjustment_history: Mutex<VecDeque<BitrateAdjustment>>,
500    config: AdaptiveBitrateConfig,
501}
502
503#[derive(Debug, Clone)]
504pub struct BitrateAdjustment {
505    pub timestamp: Instant,
506    pub old_bitrate: usize,
507    pub new_bitrate: usize,
508    pub reason: AdjustmentReason,
509}
510
511#[derive(Debug, Clone)]
512pub enum AdjustmentReason {
513    QualityImprovement,
514    QualityDegradation,
515    LatencyOptimization,
516    BandwidthOptimization,
517    LoadBalancing,
518}
519
520/// Compression manager
521#[derive(Debug)]
522pub struct CompressionManager {
523    config: CompressionConfig,
524    stats: CompressionStats,
525}
526
527#[derive(Debug, Default)]
528pub struct CompressionStats {
529    pub total_compressed: AtomicU64,
530    pub compression_time_ns: AtomicU64,
531    pub original_size: AtomicU64,
532    pub compressed_size: AtomicU64,
533}
534
535/// Connection manager for handling multiple protocols
536#[derive(Debug)]
537pub struct ConnectionManager {
538    websocket_connections: Arc<RwLock<HashMap<String, WebSocketConnection>>>,
539    sse_connections: Arc<RwLock<HashMap<String, SSEConnection>>>,
540    udp_connections: Arc<RwLock<HashMap<String, UdpConnection>>>,
541    tcp_connections: Arc<RwLock<HashMap<String, TcpConnection>>>,
542}
543
544#[derive(Debug)]
545pub struct WebSocketConnection {
546    pub sender: mpsc::UnboundedSender<WebSocketMessage>,
547    pub stats: ConnectionStats,
548}
549
550#[derive(Debug)]
551pub struct SSEConnection {
552    pub sender: mpsc::UnboundedSender<String>,
553    pub stats: ConnectionStats,
554}
555
556#[derive(Debug)]
557pub struct UdpConnection {
558    pub addr: SocketAddr,
559    pub stats: ConnectionStats,
560}
561
562#[derive(Debug)]
563pub struct TcpConnection {
564    pub writer: Arc<Mutex<tokio::net::tcp::OwnedWriteHalf>>,
565    pub stats: ConnectionStats,
566}
567
568#[derive(Debug, Default)]
569pub struct ConnectionStats {
570    pub bytes_sent: AtomicU64,
571    pub messages_sent: AtomicU64,
572    pub errors: AtomicU64,
573    pub last_send: Arc<Mutex<Option<Instant>>>,
574}
575
576impl EnhancedStreamingEngine {
577    /// Create a new enhanced streaming engine
578    pub fn new(config: StreamingConfig) -> Self {
579        Self {
580            rate_controller: Arc::new(AdaptiveRateController::new(config.adaptive_bitrate.clone())),
581            compression_manager: Arc::new(CompressionManager::new(config.compression.clone())),
582            connection_manager: Arc::new(ConnectionManager::new()),
583            streams: Arc::new(RwLock::new(HashMap::new())),
584            event_buffer: Arc::new(Mutex::new(EventBuffer::new(config.buffer_size))),
585            stats: Arc::new(StreamingStats::default()),
586            config,
587        }
588    }
589
590    /// Start the streaming engine
591    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
592        // Start the event processing loop
593        self.start_event_processor().await?;
594
595        // Start adaptive rate control
596        self.start_rate_controller().await?;
597
598        // Start quality monitoring
599        self.start_quality_monitor().await?;
600
601        // Start connection management
602        self.start_connection_manager().await?;
603
604        Ok(())
605    }
606
607    /// Add event to streaming buffer with intelligent prioritization
608    pub fn add_event(&self, event: ProfileEvent) {
609        let priority = self.calculate_event_priority(&event);
610        let size_bytes = self.estimate_event_size(&event);
611        let category = event.category.clone();
612
613        let buffered_event = BufferedEvent {
614            event,
615            priority,
616            timestamp: Instant::now(),
617            size_bytes,
618            compressed: false,
619            category,
620        };
621
622        let mut buffer = self
623            .event_buffer
624            .lock()
625            .expect("lock should not be poisoned");
626        buffer.add_event(buffered_event);
627    }
628
629    /// Stream events to all connected clients with adaptive quality
630    pub async fn stream_events(&self) -> Result<(), Box<dyn std::error::Error>> {
631        let events = {
632            let mut buffer = self
633                .event_buffer
634                .lock()
635                .expect("lock should not be poisoned");
636            buffer.get_events_for_streaming()
637        };
638
639        if events.is_empty() {
640            return Ok(());
641        }
642
643        // Apply intelligent sampling
644        let sampled_events = if self.config.advanced_features.intelligent_sampling {
645            self.apply_intelligent_sampling(events).await
646        } else {
647            events
648        };
649
650        // Apply compression if enabled
651        let compressed_events = if self.config.compression.enabled {
652            self.compression_manager
653                .compress_events(sampled_events)
654                .await?
655        } else {
656            sampled_events
657        };
658
659        // Stream to all active connections
660        self.broadcast_events(compressed_events).await?;
661
662        Ok(())
663    }
664
665    /// Calculate event priority based on type and context
666    fn calculate_event_priority(&self, event: &ProfileEvent) -> EventPriority {
667        match event.category.as_str() {
668            "memory" | "Memory" => {
669                if event.name.contains("leak") || event.name.contains("critical") {
670                    EventPriority::Critical
671                } else {
672                    EventPriority::High
673                }
674            }
675            "performance" | "Performance" => EventPriority::High,
676            "error" | "Error" => EventPriority::Critical,
677            "debug" | "Debug" => EventPriority::Low,
678            _ => EventPriority::Normal,
679        }
680    }
681
682    /// Estimate event size for bandwidth calculation
683    fn estimate_event_size(&self, event: &ProfileEvent) -> usize {
684        // Rough estimation based on event content
685        let base_size = std::mem::size_of::<ProfileEvent>();
686        let name_size = event.name.len();
687        let stack_trace_size = event.stack_trace.as_ref().map_or(0, |s| s.len());
688
689        base_size + name_size + stack_trace_size
690    }
691
692    /// Apply intelligent sampling based on current conditions
693    async fn apply_intelligent_sampling(&self, events: Vec<BufferedEvent>) -> Vec<BufferedEvent> {
694        let current_bitrate = self.rate_controller.current_bitrate.load(Ordering::Relaxed);
695        let max_events = current_bitrate.min(events.len());
696
697        if events.len() <= max_events {
698            return events;
699        }
700
701        // Sort by priority and recency
702        let mut sorted_events = events;
703        sorted_events.sort_by(|a, b| {
704            a.priority
705                .cmp(&b.priority)
706                .then(b.timestamp.cmp(&a.timestamp))
707        });
708
709        sorted_events.truncate(max_events);
710        sorted_events
711    }
712
713    /// Broadcast events to all connected clients
714    async fn broadcast_events(
715        &self,
716        events: Vec<BufferedEvent>,
717    ) -> Result<(), Box<dyn std::error::Error>> {
718        // Clone stream info before async operations to avoid holding lock across await
719        let stream_info: Vec<(String, StreamingProtocol)> = {
720            let streams = self.streams.read();
721            streams
722                .iter()
723                .map(|(id, conn)| (id.clone(), conn.protocol.clone()))
724                .collect()
725        };
726
727        for (stream_id, protocol) in stream_info {
728            match protocol {
729                StreamingProtocol::WebSocket => {
730                    self.send_to_websocket(&stream_id, &events).await?;
731                }
732                StreamingProtocol::ServerSentEvents => {
733                    self.send_to_sse(&stream_id, &events).await?;
734                }
735                StreamingProtocol::Tcp => {
736                    self.send_to_tcp(&stream_id, &events).await?;
737                }
738                StreamingProtocol::Udp => {
739                    self.send_to_udp(&stream_id, &events).await?;
740                }
741            }
742        }
743
744        // Update statistics
745        self.stats
746            .total_events_sent
747            .fetch_add(events.len() as u64, Ordering::Relaxed);
748
749        Ok(())
750    }
751
752    // Protocol-specific sending methods
753    async fn send_to_websocket(
754        &self,
755        stream_id: &str,
756        events: &[BufferedEvent],
757    ) -> Result<(), Box<dyn std::error::Error>> {
758        let connections = self.connection_manager.websocket_connections.read();
759        if let Some(connection) = connections.get(stream_id) {
760            for event in events {
761                let message = WebSocketMessage::ProfileEvent(event.event.clone());
762                if connection.sender.send(message).is_err() {
763                    // Connection closed
764                    break;
765                }
766                connection
767                    .stats
768                    .messages_sent
769                    .fetch_add(1, Ordering::Relaxed);
770                connection
771                    .stats
772                    .bytes_sent
773                    .fetch_add(event.size_bytes as u64, Ordering::Relaxed);
774            }
775        }
776        Ok(())
777    }
778
779    async fn send_to_sse(
780        &self,
781        stream_id: &str,
782        events: &[BufferedEvent],
783    ) -> Result<(), Box<dyn std::error::Error>> {
784        let connections = self.connection_manager.sse_connections.read();
785        if let Some(connection) = connections.get(stream_id) {
786            for event in events {
787                let json = serde_json::to_string(&event.event)?;
788                let sse_message = format!("data: {}\n\n", json);
789                if connection.sender.send(sse_message).is_err() {
790                    break;
791                }
792                connection
793                    .stats
794                    .messages_sent
795                    .fetch_add(1, Ordering::Relaxed);
796                connection
797                    .stats
798                    .bytes_sent
799                    .fetch_add(event.size_bytes as u64, Ordering::Relaxed);
800            }
801        }
802        Ok(())
803    }
804
805    async fn send_to_tcp(
806        &self,
807        _stream_id: &str,
808        _events: &[BufferedEvent],
809    ) -> Result<(), Box<dyn std::error::Error>> {
810        // TCP implementation would go here
811        Ok(())
812    }
813
814    async fn send_to_udp(
815        &self,
816        _stream_id: &str,
817        _events: &[BufferedEvent],
818    ) -> Result<(), Box<dyn std::error::Error>> {
819        // UDP implementation would go here
820        Ok(())
821    }
822
823    // Background task starters
824    async fn start_event_processor(&self) -> Result<(), Box<dyn std::error::Error>> {
825        let engine = Arc::new(self.clone());
826
827        tokio::spawn(async move {
828            let mut interval = interval(Duration::from_millis(50)); // 20 FPS
829
830            loop {
831                interval.tick().await;
832                if let Err(e) = engine.stream_events().await {
833                    eprintln!("Error streaming events: {}", e);
834                }
835            }
836        });
837
838        Ok(())
839    }
840
841    async fn start_rate_controller(&self) -> Result<(), Box<dyn std::error::Error>> {
842        let controller = Arc::clone(&self.rate_controller);
843
844        tokio::spawn(async move {
845            let mut interval = interval(Duration::from_secs(1));
846
847            loop {
848                interval.tick().await;
849                controller.adjust_bitrate().await;
850            }
851        });
852
853        Ok(())
854    }
855
856    async fn start_quality_monitor(&self) -> Result<(), Box<dyn std::error::Error>> {
857        let stats = Arc::clone(&self.stats);
858
859        tokio::spawn(async move {
860            let mut interval = interval(Duration::from_secs(5));
861
862            loop {
863                interval.tick().await;
864                // Monitor quality metrics and adjust settings
865                println!(
866                    "Quality monitor: Active connections: {}",
867                    stats.active_connections.load(Ordering::Relaxed)
868                );
869            }
870        });
871
872        Ok(())
873    }
874
875    async fn start_connection_manager(&self) -> Result<(), Box<dyn std::error::Error>> {
876        // Connection management logic would go here
877        Ok(())
878    }
879
880    /// Get streaming statistics
881    pub fn get_stats(&self) -> StreamingStatsSnapshot {
882        StreamingStatsSnapshot {
883            total_connections: self.stats.total_connections.load(Ordering::Relaxed),
884            active_connections: self.stats.active_connections.load(Ordering::Relaxed),
885            total_events_sent: self.stats.total_events_sent.load(Ordering::Relaxed),
886            total_bytes_sent: self.stats.total_bytes_sent.load(Ordering::Relaxed),
887            compression_ratio: self.stats.compression_ratio.load(Ordering::Relaxed) as f64 / 100.0,
888            average_latency_ms: self.stats.average_latency_ms.load(Ordering::Relaxed),
889            dropped_events: self.stats.dropped_events.load(Ordering::Relaxed),
890            quality_adjustments: self.stats.quality_adjustments.load(Ordering::Relaxed),
891            bitrate_adjustments: self.stats.bitrate_adjustments.load(Ordering::Relaxed),
892        }
893    }
894}
895
896impl Clone for EnhancedStreamingEngine {
897    fn clone(&self) -> Self {
898        Self {
899            config: self.config.clone(),
900            streams: Arc::clone(&self.streams),
901            event_buffer: Arc::clone(&self.event_buffer),
902            stats: Arc::clone(&self.stats),
903            rate_controller: Arc::clone(&self.rate_controller),
904            compression_manager: Arc::clone(&self.compression_manager),
905            connection_manager: Arc::clone(&self.connection_manager),
906        }
907    }
908}
909
910impl EventBuffer {
911    pub fn new(max_size: usize) -> Self {
912        Self {
913            events: VecDeque::new(),
914            categories: BTreeMap::new(),
915            max_size,
916            total_size: 0,
917        }
918    }
919
920    pub fn add_event(&mut self, event: BufferedEvent) {
921        // Remove old events if buffer is full
922        while self.events.len() >= self.max_size {
923            if let Some(old_event) = self.events.pop_front() {
924                self.total_size -= old_event.size_bytes;
925            }
926        }
927
928        self.total_size += event.size_bytes;
929
930        // Add to category-specific queue
931        self.categories
932            .entry(event.category.clone())
933            .or_default()
934            .push_back(event.clone());
935
936        self.events.push_back(event);
937    }
938
939    fn get_events_for_streaming(&mut self) -> Vec<BufferedEvent> {
940        let events: Vec<_> = self.events.drain(..).collect();
941        self.categories.clear();
942        self.total_size = 0;
943        events
944    }
945}
946
947impl AdaptiveRateController {
948    pub fn new(config: AdaptiveBitrateConfig) -> Self {
949        Self {
950            current_bitrate: AtomicUsize::new(config.initial_bitrate),
951            target_bitrate: AtomicUsize::new(config.initial_bitrate),
952            quality_score: AtomicUsize::new(8000), // 80%
953            adjustment_history: Mutex::new(VecDeque::with_capacity(100)),
954            config,
955        }
956    }
957
958    async fn adjust_bitrate(&self) {
959        if !self.config.enabled {
960            return;
961        }
962
963        let current = self.current_bitrate.load(Ordering::Relaxed);
964        let quality = self.quality_score.load(Ordering::Relaxed) as f64 / 100.0;
965
966        let new_bitrate = if quality < self.config.adaptation_threshold {
967            // Decrease bitrate
968            ((current as f64) / self.config.adjustment_factor) as usize
969        } else if quality > (1.0 - self.config.adaptation_threshold) {
970            // Increase bitrate
971            ((current as f64) * self.config.adjustment_factor) as usize
972        } else {
973            current // No change
974        };
975
976        let clamped_bitrate = new_bitrate
977            .max(self.config.min_bitrate)
978            .min(self.config.max_bitrate);
979
980        if clamped_bitrate != current {
981            self.current_bitrate
982                .store(clamped_bitrate, Ordering::Relaxed);
983
984            let adjustment = BitrateAdjustment {
985                timestamp: Instant::now(),
986                old_bitrate: current,
987                new_bitrate: clamped_bitrate,
988                reason: if new_bitrate > current {
989                    AdjustmentReason::QualityImprovement
990                } else {
991                    AdjustmentReason::QualityDegradation
992                },
993            };
994
995            let mut history = self
996                .adjustment_history
997                .lock()
998                .expect("lock should not be poisoned");
999            if history.len() >= 100 {
1000                history.pop_front();
1001            }
1002            history.push_back(adjustment);
1003        }
1004    }
1005}
1006
1007impl CompressionManager {
1008    pub fn new(config: CompressionConfig) -> Self {
1009        Self {
1010            config,
1011            stats: CompressionStats::default(),
1012        }
1013    }
1014
1015    async fn compress_events(
1016        &self,
1017        events: Vec<BufferedEvent>,
1018    ) -> Result<Vec<BufferedEvent>, Box<dyn std::error::Error>> {
1019        if !self.config.enabled {
1020            return Ok(events);
1021        }
1022
1023        let mut compressed_events = Vec::new();
1024
1025        for event in events {
1026            if event.size_bytes < self.config.threshold {
1027                compressed_events.push(event);
1028                continue;
1029            }
1030
1031            let compressed_event = self.compress_event(event).await?;
1032            compressed_events.push(compressed_event);
1033        }
1034
1035        Ok(compressed_events)
1036    }
1037
1038    async fn compress_event(
1039        &self,
1040        mut event: BufferedEvent,
1041    ) -> Result<BufferedEvent, Box<dyn std::error::Error>> {
1042        let start = Instant::now();
1043
1044        // Simulate compression (in real implementation, would actually compress the event data)
1045        let original_size = event.size_bytes;
1046        let compressed_size = (original_size as f64 * 0.7) as usize; // Simulate 30% compression
1047
1048        event.size_bytes = compressed_size;
1049        event.compressed = true;
1050
1051        let compression_time = start.elapsed();
1052
1053        // Update statistics
1054        self.stats.total_compressed.fetch_add(1, Ordering::Relaxed);
1055        self.stats
1056            .compression_time_ns
1057            .fetch_add(compression_time.as_nanos() as u64, Ordering::Relaxed);
1058        self.stats
1059            .original_size
1060            .fetch_add(original_size as u64, Ordering::Relaxed);
1061        self.stats
1062            .compressed_size
1063            .fetch_add(compressed_size as u64, Ordering::Relaxed);
1064
1065        Ok(event)
1066    }
1067}
1068
1069impl ConnectionManager {
1070    fn new() -> Self {
1071        Self {
1072            websocket_connections: Arc::new(RwLock::new(HashMap::new())),
1073            sse_connections: Arc::new(RwLock::new(HashMap::new())),
1074            udp_connections: Arc::new(RwLock::new(HashMap::new())),
1075            tcp_connections: Arc::new(RwLock::new(HashMap::new())),
1076        }
1077    }
1078}
1079
1080/// Statistics snapshot for external consumption
1081#[derive(Debug, Clone, Serialize, Deserialize)]
1082pub struct StreamingStatsSnapshot {
1083    pub total_connections: usize,
1084    pub active_connections: usize,
1085    pub total_events_sent: u64,
1086    pub total_bytes_sent: u64,
1087    pub compression_ratio: f64,
1088    pub average_latency_ms: u64,
1089    pub dropped_events: u64,
1090    pub quality_adjustments: u64,
1091    pub bitrate_adjustments: u64,
1092}
1093
1094/// Convenience functions for creating streaming engines
1095pub fn create_streaming_engine() -> EnhancedStreamingEngine {
1096    EnhancedStreamingEngine::new(StreamingConfig::default())
1097}
1098
1099pub fn create_high_performance_streaming_engine() -> EnhancedStreamingEngine {
1100    let mut config = StreamingConfig::default();
1101    config.adaptive_bitrate.max_bitrate = 2000;
1102    config.buffer_size = 50000;
1103    config.compression.level = 9;
1104    config.advanced_features.delta_compression = true;
1105
1106    EnhancedStreamingEngine::new(config)
1107}
1108
1109pub fn create_low_latency_streaming_engine() -> EnhancedStreamingEngine {
1110    let mut config = StreamingConfig::default();
1111    config.adaptive_bitrate.initial_bitrate = 500;
1112    config.buffer_size = 1000;
1113    config.compression.enabled = false; // Disable compression for lower latency
1114    config.quality.metrics_threshold.latency_ms = 50;
1115
1116    EnhancedStreamingEngine::new(config)
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::*;
1122
1123    #[test]
1124    fn test_streaming_engine_creation() {
1125        let engine = create_streaming_engine();
1126        assert_eq!(engine.config.base_port, 9090);
1127        assert!(engine.config.adaptive_bitrate.enabled);
1128    }
1129
1130    #[test]
1131    fn test_event_buffer() {
1132        let mut buffer = EventBuffer::new(5);
1133
1134        let event = ProfileEvent {
1135            name: "test".to_string(),
1136            category: "memory".to_string(),
1137            start_us: 0,
1138            duration_us: 100,
1139            thread_id: 1,
1140            operation_count: None,
1141            flops: None,
1142            bytes_transferred: None,
1143            stack_trace: None,
1144        };
1145
1146        let buffered_event = BufferedEvent {
1147            event,
1148            priority: EventPriority::Normal,
1149            timestamp: Instant::now(),
1150            size_bytes: 100,
1151            compressed: false,
1152            category: "memory".to_string(),
1153        };
1154
1155        buffer.add_event(buffered_event);
1156        assert_eq!(buffer.events.len(), 1);
1157        assert_eq!(buffer.total_size, 100);
1158    }
1159
1160    #[test]
1161    fn test_event_priority_calculation() {
1162        let engine = create_streaming_engine();
1163
1164        let memory_event = ProfileEvent {
1165            name: "memory_leak_detected".to_string(),
1166            category: "memory".to_string(),
1167            start_us: 0,
1168            duration_us: 100,
1169            thread_id: 1,
1170            operation_count: None,
1171            flops: None,
1172            bytes_transferred: None,
1173            stack_trace: None,
1174        };
1175
1176        let priority = engine.calculate_event_priority(&memory_event);
1177        assert_eq!(priority, EventPriority::Critical);
1178    }
1179
1180    #[tokio::test]
1181    async fn test_compression_manager() {
1182        let config = CompressionConfig::default();
1183        let manager = CompressionManager::new(config);
1184
1185        let event = BufferedEvent {
1186            event: ProfileEvent {
1187                name: "test".to_string(),
1188                category: "memory".to_string(),
1189                start_us: 0,
1190                duration_us: 100,
1191                thread_id: 1,
1192                operation_count: None,
1193                flops: None,
1194                bytes_transferred: None,
1195                stack_trace: None,
1196            },
1197            priority: EventPriority::Normal,
1198            timestamp: Instant::now(),
1199            size_bytes: 2000, // Above threshold
1200            compressed: false,
1201            category: "memory".to_string(),
1202        };
1203
1204        let compressed = manager.compress_event(event).await.unwrap();
1205        assert!(compressed.compressed);
1206        assert!(compressed.size_bytes < 2000);
1207    }
1208
1209    #[test]
1210    fn test_adaptive_rate_controller() {
1211        let config = AdaptiveBitrateConfig::default();
1212        let controller = AdaptiveRateController::new(config);
1213
1214        assert_eq!(controller.current_bitrate.load(Ordering::Relaxed), 100);
1215        assert_eq!(controller.target_bitrate.load(Ordering::Relaxed), 100);
1216    }
1217
1218    #[test]
1219    fn test_quality_level() {
1220        let level = QualityLevel::new("test", 0.8, 50, 500);
1221        assert_eq!(level.name, "test");
1222        assert_eq!(level.sampling_rate, 0.8);
1223        assert_eq!(level.min_events_per_second, 50);
1224        assert_eq!(level.max_events_per_second, 500);
1225    }
1226
1227    #[test]
1228    fn test_streaming_config_defaults() {
1229        let config = StreamingConfig::default();
1230        assert_eq!(config.base_port, 9090);
1231        assert_eq!(config.max_connections, 100);
1232        assert!(config.compression.enabled);
1233        assert!(config.adaptive_bitrate.enabled);
1234    }
1235
1236    #[test]
1237    fn test_high_performance_engine() {
1238        let engine = create_high_performance_streaming_engine();
1239        assert_eq!(engine.config.adaptive_bitrate.max_bitrate, 2000);
1240        assert_eq!(engine.config.buffer_size, 50000);
1241        assert!(engine.config.advanced_features.delta_compression);
1242    }
1243
1244    #[test]
1245    fn test_low_latency_engine() {
1246        let engine = create_low_latency_streaming_engine();
1247        assert_eq!(engine.config.adaptive_bitrate.initial_bitrate, 500);
1248        assert_eq!(engine.config.buffer_size, 1000);
1249        assert!(!engine.config.compression.enabled);
1250        assert_eq!(engine.config.quality.metrics_threshold.latency_ms, 50);
1251    }
1252}