1use crate::dashboard::types::WebSocketConfig;
129use crate::ProfileEvent;
130use futures_util::{SinkExt, StreamExt};
131use parking_lot::RwLock;
132use serde::{Deserialize, Serialize};
133use std::collections::{BTreeMap, HashMap, VecDeque};
134use std::net::SocketAddr;
135use std::sync::{
136 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
137 Arc, Mutex,
138};
139use std::time::{Duration, Instant, SystemTime};
140use tokio::sync::{broadcast, mpsc};
141use tokio::time::interval;
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub enum WebSocketMessage {
146 ProfileEvent(ProfileEvent),
147 Stats(StreamingStatsSnapshot),
148 Control(ControlMessage),
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum ControlMessage {
154 Subscribe(String),
155 Unsubscribe(String),
156 Ping,
157 Pong,
158}
159
160#[derive(Debug)]
162pub struct EnhancedStreamingEngine {
163 pub config: StreamingConfig,
165 streams: Arc<RwLock<HashMap<String, StreamConnection>>>,
167 event_buffer: Arc<Mutex<EventBuffer>>,
169 stats: Arc<StreamingStats>,
171 rate_controller: Arc<AdaptiveRateController>,
173 compression_manager: Arc<CompressionManager>,
175 connection_manager: Arc<ConnectionManager>,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct StreamingConfig {
182 pub base_port: u16,
184 pub max_connections: usize,
186 pub buffer_size: usize,
188 pub adaptive_bitrate: AdaptiveBitrateConfig,
190 pub compression: CompressionConfig,
192 pub quality: QualityConfig,
194 pub protocols: ProtocolConfig,
196 pub advanced_features: AdvancedFeatures,
198}
199
200impl Default for StreamingConfig {
201 fn default() -> Self {
202 Self {
203 base_port: 9090,
204 max_connections: 100,
205 buffer_size: 10000,
206 adaptive_bitrate: AdaptiveBitrateConfig::default(),
207 compression: CompressionConfig::default(),
208 quality: QualityConfig::default(),
209 protocols: ProtocolConfig::default(),
210 advanced_features: AdvancedFeatures::default(),
211 }
212 }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct AdaptiveBitrateConfig {
218 pub enabled: bool,
220 pub min_bitrate: usize,
222 pub max_bitrate: usize,
224 pub initial_bitrate: usize,
226 pub adaptation_threshold: f64,
228 pub adjustment_factor: f64,
230}
231
232impl Default for AdaptiveBitrateConfig {
233 fn default() -> Self {
234 Self {
235 enabled: true,
236 min_bitrate: 10,
237 max_bitrate: 1000,
238 initial_bitrate: 100,
239 adaptation_threshold: 0.1,
240 adjustment_factor: 1.2,
241 }
242 }
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct CompressionConfig {
248 pub enabled: bool,
250 pub algorithm: CompressionAlgorithm,
252 pub level: u8,
254 pub adaptive: bool,
256 pub threshold: usize,
258}
259
260impl Default for CompressionConfig {
261 fn default() -> Self {
262 Self {
263 enabled: true,
264 algorithm: CompressionAlgorithm::Zlib,
265 level: 6,
266 adaptive: true,
267 threshold: 1024,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum CompressionAlgorithm {
275 None,
276 Gzip,
277 Zlib,
278 Lz4,
279 Zstd,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct QualityConfig {
285 pub levels: Vec<QualityLevel>,
287 pub auto_adjust: bool,
289 pub metrics_threshold: QualityMetricsThreshold,
291}
292
293impl Default for QualityConfig {
294 fn default() -> Self {
295 Self {
296 levels: vec![
297 QualityLevel::new("low", 0.5, 10, 100),
298 QualityLevel::new("medium", 0.7, 50, 500),
299 QualityLevel::new("high", 1.0, 100, 1000),
300 ],
301 auto_adjust: true,
302 metrics_threshold: QualityMetricsThreshold::default(),
303 }
304 }
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct QualityLevel {
310 pub name: String,
311 pub sampling_rate: f64,
312 pub min_events_per_second: usize,
313 pub max_events_per_second: usize,
314}
315
316impl QualityLevel {
317 pub fn new(name: &str, sampling_rate: f64, min_eps: usize, max_eps: usize) -> Self {
318 Self {
319 name: name.to_string(),
320 sampling_rate,
321 min_events_per_second: min_eps,
322 max_events_per_second: max_eps,
323 }
324 }
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct QualityMetricsThreshold {
330 pub latency_ms: u64,
331 pub packet_loss_percent: f64,
332 pub bandwidth_utilization: f64,
333 pub cpu_usage_percent: f64,
334}
335
336impl Default for QualityMetricsThreshold {
337 fn default() -> Self {
338 Self {
339 latency_ms: 100,
340 packet_loss_percent: 1.0,
341 bandwidth_utilization: 0.8,
342 cpu_usage_percent: 70.0,
343 }
344 }
345}
346
347#[derive(Debug, Clone, Serialize, Deserialize)]
349pub struct ProtocolConfig {
350 pub websocket: bool,
352 pub sse: bool,
354 pub udp: bool,
356 pub tcp: bool,
358 pub priority: Vec<StreamingProtocol>,
360}
361
362impl Default for ProtocolConfig {
363 fn default() -> Self {
364 Self {
365 websocket: true,
366 sse: true,
367 udp: false,
368 tcp: false,
369 priority: vec![
370 StreamingProtocol::WebSocket,
371 StreamingProtocol::ServerSentEvents,
372 StreamingProtocol::Tcp,
373 StreamingProtocol::Udp,
374 ],
375 }
376 }
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
381pub enum StreamingProtocol {
382 WebSocket,
383 ServerSentEvents,
384 Tcp,
385 Udp,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct AdvancedFeatures {
391 pub predictive_buffering: bool,
393 pub intelligent_sampling: bool,
395 pub deduplication: bool,
397 pub delta_compression: bool,
399 pub priority_streaming: bool,
401 pub load_balancing: bool,
403}
404
405impl Default for AdvancedFeatures {
406 fn default() -> Self {
407 Self {
408 predictive_buffering: true,
409 intelligent_sampling: true,
410 deduplication: true,
411 delta_compression: true,
412 priority_streaming: true,
413 load_balancing: true,
414 }
415 }
416}
417
418#[derive(Debug, Clone)]
420pub struct StreamConnection {
421 pub id: String,
422 pub protocol: StreamingProtocol,
423 pub remote_addr: SocketAddr,
424 pub quality_level: String,
425 pub bitrate: usize,
426 pub compression: bool,
427 pub connected_at: SystemTime,
428 pub last_activity: SystemTime,
429 pub bytes_sent: u64,
430 pub events_sent: u64,
431 pub latency_ms: u64,
432}
433
434#[derive(Debug)]
436pub struct EventBuffer {
437 events: VecDeque<BufferedEvent>,
438 categories: BTreeMap<String, VecDeque<BufferedEvent>>,
439 max_size: usize,
440 total_size: usize,
441}
442
443#[derive(Debug, Clone)]
445pub struct BufferedEvent {
446 pub event: ProfileEvent,
447 pub priority: EventPriority,
448 pub timestamp: Instant,
449 pub size_bytes: usize,
450 pub compressed: bool,
451 pub category: String, }
453
454#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
456pub enum EventPriority {
457 Critical,
458 High,
459 Normal,
460 Low,
461}
462
463#[derive(Debug)]
465pub struct StreamingStats {
466 pub total_connections: AtomicUsize,
467 pub active_connections: AtomicUsize,
468 pub total_events_sent: AtomicU64,
469 pub total_bytes_sent: AtomicU64,
470 pub compression_ratio: AtomicUsize, pub average_latency_ms: AtomicU64,
472 pub dropped_events: AtomicU64,
473 pub quality_adjustments: AtomicU64,
474 pub bitrate_adjustments: AtomicU64,
475}
476
477impl Default for StreamingStats {
478 fn default() -> Self {
479 Self {
480 total_connections: AtomicUsize::new(0),
481 active_connections: AtomicUsize::new(0),
482 total_events_sent: AtomicU64::new(0),
483 total_bytes_sent: AtomicU64::new(0),
484 compression_ratio: AtomicUsize::new(0),
485 average_latency_ms: AtomicU64::new(0),
486 dropped_events: AtomicU64::new(0),
487 quality_adjustments: AtomicU64::new(0),
488 bitrate_adjustments: AtomicU64::new(0),
489 }
490 }
491}
492
493#[derive(Debug)]
495pub struct AdaptiveRateController {
496 current_bitrate: AtomicUsize,
497 target_bitrate: AtomicUsize,
498 quality_score: AtomicUsize, adjustment_history: Mutex<VecDeque<BitrateAdjustment>>,
500 config: AdaptiveBitrateConfig,
501}
502
503#[derive(Debug, Clone)]
504pub struct BitrateAdjustment {
505 pub timestamp: Instant,
506 pub old_bitrate: usize,
507 pub new_bitrate: usize,
508 pub reason: AdjustmentReason,
509}
510
511#[derive(Debug, Clone)]
512pub enum AdjustmentReason {
513 QualityImprovement,
514 QualityDegradation,
515 LatencyOptimization,
516 BandwidthOptimization,
517 LoadBalancing,
518}
519
520#[derive(Debug)]
522pub struct CompressionManager {
523 config: CompressionConfig,
524 stats: CompressionStats,
525}
526
527#[derive(Debug, Default)]
528pub struct CompressionStats {
529 pub total_compressed: AtomicU64,
530 pub compression_time_ns: AtomicU64,
531 pub original_size: AtomicU64,
532 pub compressed_size: AtomicU64,
533}
534
535#[derive(Debug)]
537pub struct ConnectionManager {
538 websocket_connections: Arc<RwLock<HashMap<String, WebSocketConnection>>>,
539 sse_connections: Arc<RwLock<HashMap<String, SSEConnection>>>,
540 udp_connections: Arc<RwLock<HashMap<String, UdpConnection>>>,
541 tcp_connections: Arc<RwLock<HashMap<String, TcpConnection>>>,
542}
543
544#[derive(Debug)]
545pub struct WebSocketConnection {
546 pub sender: mpsc::UnboundedSender<WebSocketMessage>,
547 pub stats: ConnectionStats,
548}
549
550#[derive(Debug)]
551pub struct SSEConnection {
552 pub sender: mpsc::UnboundedSender<String>,
553 pub stats: ConnectionStats,
554}
555
556#[derive(Debug)]
557pub struct UdpConnection {
558 pub addr: SocketAddr,
559 pub stats: ConnectionStats,
560}
561
562#[derive(Debug)]
563pub struct TcpConnection {
564 pub writer: Arc<Mutex<tokio::net::tcp::OwnedWriteHalf>>,
565 pub stats: ConnectionStats,
566}
567
568#[derive(Debug, Default)]
569pub struct ConnectionStats {
570 pub bytes_sent: AtomicU64,
571 pub messages_sent: AtomicU64,
572 pub errors: AtomicU64,
573 pub last_send: Arc<Mutex<Option<Instant>>>,
574}
575
576impl EnhancedStreamingEngine {
577 pub fn new(config: StreamingConfig) -> Self {
579 Self {
580 rate_controller: Arc::new(AdaptiveRateController::new(config.adaptive_bitrate.clone())),
581 compression_manager: Arc::new(CompressionManager::new(config.compression.clone())),
582 connection_manager: Arc::new(ConnectionManager::new()),
583 streams: Arc::new(RwLock::new(HashMap::new())),
584 event_buffer: Arc::new(Mutex::new(EventBuffer::new(config.buffer_size))),
585 stats: Arc::new(StreamingStats::default()),
586 config,
587 }
588 }
589
590 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
592 self.start_event_processor().await?;
594
595 self.start_rate_controller().await?;
597
598 self.start_quality_monitor().await?;
600
601 self.start_connection_manager().await?;
603
604 Ok(())
605 }
606
607 pub fn add_event(&self, event: ProfileEvent) {
609 let priority = self.calculate_event_priority(&event);
610 let size_bytes = self.estimate_event_size(&event);
611 let category = event.category.clone();
612
613 let buffered_event = BufferedEvent {
614 event,
615 priority,
616 timestamp: Instant::now(),
617 size_bytes,
618 compressed: false,
619 category,
620 };
621
622 let mut buffer = self
623 .event_buffer
624 .lock()
625 .expect("lock should not be poisoned");
626 buffer.add_event(buffered_event);
627 }
628
629 pub async fn stream_events(&self) -> Result<(), Box<dyn std::error::Error>> {
631 let events = {
632 let mut buffer = self
633 .event_buffer
634 .lock()
635 .expect("lock should not be poisoned");
636 buffer.get_events_for_streaming()
637 };
638
639 if events.is_empty() {
640 return Ok(());
641 }
642
643 let sampled_events = if self.config.advanced_features.intelligent_sampling {
645 self.apply_intelligent_sampling(events).await
646 } else {
647 events
648 };
649
650 let compressed_events = if self.config.compression.enabled {
652 self.compression_manager
653 .compress_events(sampled_events)
654 .await?
655 } else {
656 sampled_events
657 };
658
659 self.broadcast_events(compressed_events).await?;
661
662 Ok(())
663 }
664
665 fn calculate_event_priority(&self, event: &ProfileEvent) -> EventPriority {
667 match event.category.as_str() {
668 "memory" | "Memory" => {
669 if event.name.contains("leak") || event.name.contains("critical") {
670 EventPriority::Critical
671 } else {
672 EventPriority::High
673 }
674 }
675 "performance" | "Performance" => EventPriority::High,
676 "error" | "Error" => EventPriority::Critical,
677 "debug" | "Debug" => EventPriority::Low,
678 _ => EventPriority::Normal,
679 }
680 }
681
682 fn estimate_event_size(&self, event: &ProfileEvent) -> usize {
684 let base_size = std::mem::size_of::<ProfileEvent>();
686 let name_size = event.name.len();
687 let stack_trace_size = event.stack_trace.as_ref().map_or(0, |s| s.len());
688
689 base_size + name_size + stack_trace_size
690 }
691
692 async fn apply_intelligent_sampling(&self, events: Vec<BufferedEvent>) -> Vec<BufferedEvent> {
694 let current_bitrate = self.rate_controller.current_bitrate.load(Ordering::Relaxed);
695 let max_events = current_bitrate.min(events.len());
696
697 if events.len() <= max_events {
698 return events;
699 }
700
701 let mut sorted_events = events;
703 sorted_events.sort_by(|a, b| {
704 a.priority
705 .cmp(&b.priority)
706 .then(b.timestamp.cmp(&a.timestamp))
707 });
708
709 sorted_events.truncate(max_events);
710 sorted_events
711 }
712
713 async fn broadcast_events(
715 &self,
716 events: Vec<BufferedEvent>,
717 ) -> Result<(), Box<dyn std::error::Error>> {
718 let stream_info: Vec<(String, StreamingProtocol)> = {
720 let streams = self.streams.read();
721 streams
722 .iter()
723 .map(|(id, conn)| (id.clone(), conn.protocol.clone()))
724 .collect()
725 };
726
727 for (stream_id, protocol) in stream_info {
728 match protocol {
729 StreamingProtocol::WebSocket => {
730 self.send_to_websocket(&stream_id, &events).await?;
731 }
732 StreamingProtocol::ServerSentEvents => {
733 self.send_to_sse(&stream_id, &events).await?;
734 }
735 StreamingProtocol::Tcp => {
736 self.send_to_tcp(&stream_id, &events).await?;
737 }
738 StreamingProtocol::Udp => {
739 self.send_to_udp(&stream_id, &events).await?;
740 }
741 }
742 }
743
744 self.stats
746 .total_events_sent
747 .fetch_add(events.len() as u64, Ordering::Relaxed);
748
749 Ok(())
750 }
751
752 async fn send_to_websocket(
754 &self,
755 stream_id: &str,
756 events: &[BufferedEvent],
757 ) -> Result<(), Box<dyn std::error::Error>> {
758 let connections = self.connection_manager.websocket_connections.read();
759 if let Some(connection) = connections.get(stream_id) {
760 for event in events {
761 let message = WebSocketMessage::ProfileEvent(event.event.clone());
762 if connection.sender.send(message).is_err() {
763 break;
765 }
766 connection
767 .stats
768 .messages_sent
769 .fetch_add(1, Ordering::Relaxed);
770 connection
771 .stats
772 .bytes_sent
773 .fetch_add(event.size_bytes as u64, Ordering::Relaxed);
774 }
775 }
776 Ok(())
777 }
778
779 async fn send_to_sse(
780 &self,
781 stream_id: &str,
782 events: &[BufferedEvent],
783 ) -> Result<(), Box<dyn std::error::Error>> {
784 let connections = self.connection_manager.sse_connections.read();
785 if let Some(connection) = connections.get(stream_id) {
786 for event in events {
787 let json = serde_json::to_string(&event.event)?;
788 let sse_message = format!("data: {}\n\n", json);
789 if connection.sender.send(sse_message).is_err() {
790 break;
791 }
792 connection
793 .stats
794 .messages_sent
795 .fetch_add(1, Ordering::Relaxed);
796 connection
797 .stats
798 .bytes_sent
799 .fetch_add(event.size_bytes as u64, Ordering::Relaxed);
800 }
801 }
802 Ok(())
803 }
804
805 async fn send_to_tcp(
806 &self,
807 _stream_id: &str,
808 _events: &[BufferedEvent],
809 ) -> Result<(), Box<dyn std::error::Error>> {
810 Ok(())
812 }
813
814 async fn send_to_udp(
815 &self,
816 _stream_id: &str,
817 _events: &[BufferedEvent],
818 ) -> Result<(), Box<dyn std::error::Error>> {
819 Ok(())
821 }
822
823 async fn start_event_processor(&self) -> Result<(), Box<dyn std::error::Error>> {
825 let engine = Arc::new(self.clone());
826
827 tokio::spawn(async move {
828 let mut interval = interval(Duration::from_millis(50)); loop {
831 interval.tick().await;
832 if let Err(e) = engine.stream_events().await {
833 eprintln!("Error streaming events: {}", e);
834 }
835 }
836 });
837
838 Ok(())
839 }
840
841 async fn start_rate_controller(&self) -> Result<(), Box<dyn std::error::Error>> {
842 let controller = Arc::clone(&self.rate_controller);
843
844 tokio::spawn(async move {
845 let mut interval = interval(Duration::from_secs(1));
846
847 loop {
848 interval.tick().await;
849 controller.adjust_bitrate().await;
850 }
851 });
852
853 Ok(())
854 }
855
856 async fn start_quality_monitor(&self) -> Result<(), Box<dyn std::error::Error>> {
857 let stats = Arc::clone(&self.stats);
858
859 tokio::spawn(async move {
860 let mut interval = interval(Duration::from_secs(5));
861
862 loop {
863 interval.tick().await;
864 println!(
866 "Quality monitor: Active connections: {}",
867 stats.active_connections.load(Ordering::Relaxed)
868 );
869 }
870 });
871
872 Ok(())
873 }
874
875 async fn start_connection_manager(&self) -> Result<(), Box<dyn std::error::Error>> {
876 Ok(())
878 }
879
880 pub fn get_stats(&self) -> StreamingStatsSnapshot {
882 StreamingStatsSnapshot {
883 total_connections: self.stats.total_connections.load(Ordering::Relaxed),
884 active_connections: self.stats.active_connections.load(Ordering::Relaxed),
885 total_events_sent: self.stats.total_events_sent.load(Ordering::Relaxed),
886 total_bytes_sent: self.stats.total_bytes_sent.load(Ordering::Relaxed),
887 compression_ratio: self.stats.compression_ratio.load(Ordering::Relaxed) as f64 / 100.0,
888 average_latency_ms: self.stats.average_latency_ms.load(Ordering::Relaxed),
889 dropped_events: self.stats.dropped_events.load(Ordering::Relaxed),
890 quality_adjustments: self.stats.quality_adjustments.load(Ordering::Relaxed),
891 bitrate_adjustments: self.stats.bitrate_adjustments.load(Ordering::Relaxed),
892 }
893 }
894}
895
896impl Clone for EnhancedStreamingEngine {
897 fn clone(&self) -> Self {
898 Self {
899 config: self.config.clone(),
900 streams: Arc::clone(&self.streams),
901 event_buffer: Arc::clone(&self.event_buffer),
902 stats: Arc::clone(&self.stats),
903 rate_controller: Arc::clone(&self.rate_controller),
904 compression_manager: Arc::clone(&self.compression_manager),
905 connection_manager: Arc::clone(&self.connection_manager),
906 }
907 }
908}
909
910impl EventBuffer {
911 pub fn new(max_size: usize) -> Self {
912 Self {
913 events: VecDeque::new(),
914 categories: BTreeMap::new(),
915 max_size,
916 total_size: 0,
917 }
918 }
919
920 pub fn add_event(&mut self, event: BufferedEvent) {
921 while self.events.len() >= self.max_size {
923 if let Some(old_event) = self.events.pop_front() {
924 self.total_size -= old_event.size_bytes;
925 }
926 }
927
928 self.total_size += event.size_bytes;
929
930 self.categories
932 .entry(event.category.clone())
933 .or_default()
934 .push_back(event.clone());
935
936 self.events.push_back(event);
937 }
938
939 fn get_events_for_streaming(&mut self) -> Vec<BufferedEvent> {
940 let events: Vec<_> = self.events.drain(..).collect();
941 self.categories.clear();
942 self.total_size = 0;
943 events
944 }
945}
946
947impl AdaptiveRateController {
948 pub fn new(config: AdaptiveBitrateConfig) -> Self {
949 Self {
950 current_bitrate: AtomicUsize::new(config.initial_bitrate),
951 target_bitrate: AtomicUsize::new(config.initial_bitrate),
952 quality_score: AtomicUsize::new(8000), adjustment_history: Mutex::new(VecDeque::with_capacity(100)),
954 config,
955 }
956 }
957
958 async fn adjust_bitrate(&self) {
959 if !self.config.enabled {
960 return;
961 }
962
963 let current = self.current_bitrate.load(Ordering::Relaxed);
964 let quality = self.quality_score.load(Ordering::Relaxed) as f64 / 100.0;
965
966 let new_bitrate = if quality < self.config.adaptation_threshold {
967 ((current as f64) / self.config.adjustment_factor) as usize
969 } else if quality > (1.0 - self.config.adaptation_threshold) {
970 ((current as f64) * self.config.adjustment_factor) as usize
972 } else {
973 current };
975
976 let clamped_bitrate = new_bitrate
977 .max(self.config.min_bitrate)
978 .min(self.config.max_bitrate);
979
980 if clamped_bitrate != current {
981 self.current_bitrate
982 .store(clamped_bitrate, Ordering::Relaxed);
983
984 let adjustment = BitrateAdjustment {
985 timestamp: Instant::now(),
986 old_bitrate: current,
987 new_bitrate: clamped_bitrate,
988 reason: if new_bitrate > current {
989 AdjustmentReason::QualityImprovement
990 } else {
991 AdjustmentReason::QualityDegradation
992 },
993 };
994
995 let mut history = self
996 .adjustment_history
997 .lock()
998 .expect("lock should not be poisoned");
999 if history.len() >= 100 {
1000 history.pop_front();
1001 }
1002 history.push_back(adjustment);
1003 }
1004 }
1005}
1006
1007impl CompressionManager {
1008 pub fn new(config: CompressionConfig) -> Self {
1009 Self {
1010 config,
1011 stats: CompressionStats::default(),
1012 }
1013 }
1014
1015 async fn compress_events(
1016 &self,
1017 events: Vec<BufferedEvent>,
1018 ) -> Result<Vec<BufferedEvent>, Box<dyn std::error::Error>> {
1019 if !self.config.enabled {
1020 return Ok(events);
1021 }
1022
1023 let mut compressed_events = Vec::new();
1024
1025 for event in events {
1026 if event.size_bytes < self.config.threshold {
1027 compressed_events.push(event);
1028 continue;
1029 }
1030
1031 let compressed_event = self.compress_event(event).await?;
1032 compressed_events.push(compressed_event);
1033 }
1034
1035 Ok(compressed_events)
1036 }
1037
1038 async fn compress_event(
1039 &self,
1040 mut event: BufferedEvent,
1041 ) -> Result<BufferedEvent, Box<dyn std::error::Error>> {
1042 let start = Instant::now();
1043
1044 let original_size = event.size_bytes;
1046 let compressed_size = (original_size as f64 * 0.7) as usize; event.size_bytes = compressed_size;
1049 event.compressed = true;
1050
1051 let compression_time = start.elapsed();
1052
1053 self.stats.total_compressed.fetch_add(1, Ordering::Relaxed);
1055 self.stats
1056 .compression_time_ns
1057 .fetch_add(compression_time.as_nanos() as u64, Ordering::Relaxed);
1058 self.stats
1059 .original_size
1060 .fetch_add(original_size as u64, Ordering::Relaxed);
1061 self.stats
1062 .compressed_size
1063 .fetch_add(compressed_size as u64, Ordering::Relaxed);
1064
1065 Ok(event)
1066 }
1067}
1068
1069impl ConnectionManager {
1070 fn new() -> Self {
1071 Self {
1072 websocket_connections: Arc::new(RwLock::new(HashMap::new())),
1073 sse_connections: Arc::new(RwLock::new(HashMap::new())),
1074 udp_connections: Arc::new(RwLock::new(HashMap::new())),
1075 tcp_connections: Arc::new(RwLock::new(HashMap::new())),
1076 }
1077 }
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1082pub struct StreamingStatsSnapshot {
1083 pub total_connections: usize,
1084 pub active_connections: usize,
1085 pub total_events_sent: u64,
1086 pub total_bytes_sent: u64,
1087 pub compression_ratio: f64,
1088 pub average_latency_ms: u64,
1089 pub dropped_events: u64,
1090 pub quality_adjustments: u64,
1091 pub bitrate_adjustments: u64,
1092}
1093
1094pub fn create_streaming_engine() -> EnhancedStreamingEngine {
1096 EnhancedStreamingEngine::new(StreamingConfig::default())
1097}
1098
1099pub fn create_high_performance_streaming_engine() -> EnhancedStreamingEngine {
1100 let mut config = StreamingConfig::default();
1101 config.adaptive_bitrate.max_bitrate = 2000;
1102 config.buffer_size = 50000;
1103 config.compression.level = 9;
1104 config.advanced_features.delta_compression = true;
1105
1106 EnhancedStreamingEngine::new(config)
1107}
1108
1109pub fn create_low_latency_streaming_engine() -> EnhancedStreamingEngine {
1110 let mut config = StreamingConfig::default();
1111 config.adaptive_bitrate.initial_bitrate = 500;
1112 config.buffer_size = 1000;
1113 config.compression.enabled = false; config.quality.metrics_threshold.latency_ms = 50;
1115
1116 EnhancedStreamingEngine::new(config)
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use super::*;
1122
1123 #[test]
1124 fn test_streaming_engine_creation() {
1125 let engine = create_streaming_engine();
1126 assert_eq!(engine.config.base_port, 9090);
1127 assert!(engine.config.adaptive_bitrate.enabled);
1128 }
1129
1130 #[test]
1131 fn test_event_buffer() {
1132 let mut buffer = EventBuffer::new(5);
1133
1134 let event = ProfileEvent {
1135 name: "test".to_string(),
1136 category: "memory".to_string(),
1137 start_us: 0,
1138 duration_us: 100,
1139 thread_id: 1,
1140 operation_count: None,
1141 flops: None,
1142 bytes_transferred: None,
1143 stack_trace: None,
1144 };
1145
1146 let buffered_event = BufferedEvent {
1147 event,
1148 priority: EventPriority::Normal,
1149 timestamp: Instant::now(),
1150 size_bytes: 100,
1151 compressed: false,
1152 category: "memory".to_string(),
1153 };
1154
1155 buffer.add_event(buffered_event);
1156 assert_eq!(buffer.events.len(), 1);
1157 assert_eq!(buffer.total_size, 100);
1158 }
1159
1160 #[test]
1161 fn test_event_priority_calculation() {
1162 let engine = create_streaming_engine();
1163
1164 let memory_event = ProfileEvent {
1165 name: "memory_leak_detected".to_string(),
1166 category: "memory".to_string(),
1167 start_us: 0,
1168 duration_us: 100,
1169 thread_id: 1,
1170 operation_count: None,
1171 flops: None,
1172 bytes_transferred: None,
1173 stack_trace: None,
1174 };
1175
1176 let priority = engine.calculate_event_priority(&memory_event);
1177 assert_eq!(priority, EventPriority::Critical);
1178 }
1179
1180 #[tokio::test]
1181 async fn test_compression_manager() {
1182 let config = CompressionConfig::default();
1183 let manager = CompressionManager::new(config);
1184
1185 let event = BufferedEvent {
1186 event: ProfileEvent {
1187 name: "test".to_string(),
1188 category: "memory".to_string(),
1189 start_us: 0,
1190 duration_us: 100,
1191 thread_id: 1,
1192 operation_count: None,
1193 flops: None,
1194 bytes_transferred: None,
1195 stack_trace: None,
1196 },
1197 priority: EventPriority::Normal,
1198 timestamp: Instant::now(),
1199 size_bytes: 2000, compressed: false,
1201 category: "memory".to_string(),
1202 };
1203
1204 let compressed = manager.compress_event(event).await.unwrap();
1205 assert!(compressed.compressed);
1206 assert!(compressed.size_bytes < 2000);
1207 }
1208
1209 #[test]
1210 fn test_adaptive_rate_controller() {
1211 let config = AdaptiveBitrateConfig::default();
1212 let controller = AdaptiveRateController::new(config);
1213
1214 assert_eq!(controller.current_bitrate.load(Ordering::Relaxed), 100);
1215 assert_eq!(controller.target_bitrate.load(Ordering::Relaxed), 100);
1216 }
1217
1218 #[test]
1219 fn test_quality_level() {
1220 let level = QualityLevel::new("test", 0.8, 50, 500);
1221 assert_eq!(level.name, "test");
1222 assert_eq!(level.sampling_rate, 0.8);
1223 assert_eq!(level.min_events_per_second, 50);
1224 assert_eq!(level.max_events_per_second, 500);
1225 }
1226
1227 #[test]
1228 fn test_streaming_config_defaults() {
1229 let config = StreamingConfig::default();
1230 assert_eq!(config.base_port, 9090);
1231 assert_eq!(config.max_connections, 100);
1232 assert!(config.compression.enabled);
1233 assert!(config.adaptive_bitrate.enabled);
1234 }
1235
1236 #[test]
1237 fn test_high_performance_engine() {
1238 let engine = create_high_performance_streaming_engine();
1239 assert_eq!(engine.config.adaptive_bitrate.max_bitrate, 2000);
1240 assert_eq!(engine.config.buffer_size, 50000);
1241 assert!(engine.config.advanced_features.delta_compression);
1242 }
1243
1244 #[test]
1245 fn test_low_latency_engine() {
1246 let engine = create_low_latency_streaming_engine();
1247 assert_eq!(engine.config.adaptive_bitrate.initial_bitrate, 500);
1248 assert_eq!(engine.config.buffer_size, 1000);
1249 assert!(!engine.config.compression.enabled);
1250 assert_eq!(engine.config.quality.metrics_threshold.latency_ms, 50);
1251 }
1252}