1use crate::{
44 config::ConversionConfig,
45 core::VoiceConverter,
46 realtime::{RealtimeConfig, RealtimeConverter},
47 types::{ConversionRequest, ConversionTarget, ConversionType, VoiceCharacteristics},
48 Error, Result,
49};
50use serde::{Deserialize, Serialize};
51use std::collections::HashMap;
52use std::sync::Arc;
53use tokio::sync::RwLock;
54use tracing::{debug, info, warn};
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
58pub enum StreamingPlatform {
59 Twitch,
61 YouTubeLive,
63 Discord,
65 OBSStudio,
67 Streamlabs,
69 XSplit,
71 RTMP,
73 FacebookLive,
75 TikTokLive,
77}
78
79impl StreamingPlatform {
80 pub fn streaming_constraints(&self) -> StreamingConstraints {
82 match self {
83 StreamingPlatform::Twitch => StreamingConstraints {
84 max_latency_ms: 100.0, max_bitrate_kbps: 6000,
86 sample_rate: 48000,
87 channels: 2,
88 recommended_buffer_ms: 50.0,
89 bandwidth_adaptation: true,
90 quality_levels: vec![
91 StreamQuality::Source,
92 StreamQuality::High,
93 StreamQuality::Medium,
94 StreamQuality::Low,
95 ],
96 },
97 StreamingPlatform::YouTubeLive => StreamingConstraints {
98 max_latency_ms: 200.0, max_bitrate_kbps: 8000,
100 sample_rate: 48000,
101 channels: 2,
102 recommended_buffer_ms: 100.0,
103 bandwidth_adaptation: true,
104 quality_levels: vec![
105 StreamQuality::Source,
106 StreamQuality::High,
107 StreamQuality::Medium,
108 ],
109 },
110 StreamingPlatform::Discord => StreamingConstraints {
111 max_latency_ms: 40.0, max_bitrate_kbps: 320, sample_rate: 48000,
114 channels: 2,
115 recommended_buffer_ms: 20.0,
116 bandwidth_adaptation: false, quality_levels: vec![StreamQuality::High, StreamQuality::Medium],
118 },
119 StreamingPlatform::OBSStudio => StreamingConstraints {
120 max_latency_ms: 50.0,
121 max_bitrate_kbps: 10000, sample_rate: 48000,
123 channels: 2,
124 recommended_buffer_ms: 25.0,
125 bandwidth_adaptation: false, quality_levels: vec![
127 StreamQuality::Source,
128 StreamQuality::High,
129 StreamQuality::Medium,
130 StreamQuality::Low,
131 ],
132 },
133 StreamingPlatform::Streamlabs => StreamingConstraints {
134 max_latency_ms: 75.0,
135 max_bitrate_kbps: 8000,
136 sample_rate: 48000,
137 channels: 2,
138 recommended_buffer_ms: 40.0,
139 bandwidth_adaptation: true,
140 quality_levels: vec![
141 StreamQuality::Source,
142 StreamQuality::High,
143 StreamQuality::Medium,
144 ],
145 },
146 StreamingPlatform::XSplit => StreamingConstraints {
147 max_latency_ms: 60.0,
148 max_bitrate_kbps: 10000,
149 sample_rate: 48000,
150 channels: 2,
151 recommended_buffer_ms: 30.0,
152 bandwidth_adaptation: true,
153 quality_levels: vec![
154 StreamQuality::Source,
155 StreamQuality::High,
156 StreamQuality::Medium,
157 StreamQuality::Low,
158 ],
159 },
160 StreamingPlatform::RTMP => StreamingConstraints {
161 max_latency_ms: 150.0, max_bitrate_kbps: 6000,
163 sample_rate: 44100, channels: 2,
165 recommended_buffer_ms: 75.0,
166 bandwidth_adaptation: true,
167 quality_levels: vec![
168 StreamQuality::High,
169 StreamQuality::Medium,
170 StreamQuality::Low,
171 ],
172 },
173 StreamingPlatform::FacebookLive => StreamingConstraints {
174 max_latency_ms: 300.0, max_bitrate_kbps: 4000,
176 sample_rate: 44100,
177 channels: 2,
178 recommended_buffer_ms: 150.0,
179 bandwidth_adaptation: true,
180 quality_levels: vec![
181 StreamQuality::High,
182 StreamQuality::Medium,
183 StreamQuality::Low,
184 ],
185 },
186 StreamingPlatform::TikTokLive => StreamingConstraints {
187 max_latency_ms: 200.0,
188 max_bitrate_kbps: 3000, sample_rate: 44100,
190 channels: 2,
191 recommended_buffer_ms: 100.0,
192 bandwidth_adaptation: true,
193 quality_levels: vec![StreamQuality::Medium, StreamQuality::Low],
194 },
195 }
196 }
197
198 pub fn as_str(&self) -> &'static str {
200 match self {
201 StreamingPlatform::Twitch => "Twitch",
202 StreamingPlatform::YouTubeLive => "YouTube Live",
203 StreamingPlatform::Discord => "Discord",
204 StreamingPlatform::OBSStudio => "OBS Studio",
205 StreamingPlatform::Streamlabs => "Streamlabs",
206 StreamingPlatform::XSplit => "XSplit",
207 StreamingPlatform::RTMP => "RTMP",
208 StreamingPlatform::FacebookLive => "Facebook Live",
209 StreamingPlatform::TikTokLive => "TikTok Live",
210 }
211 }
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
216pub enum StreamQuality {
217 Source,
219 High,
221 Medium,
223 Low,
225}
226
227impl StreamQuality {
228 pub fn quality_factor(&self) -> f32 {
230 match self {
231 StreamQuality::Source => 1.0,
232 StreamQuality::High => 0.9,
233 StreamQuality::Medium => 0.7,
234 StreamQuality::Low => 0.5,
235 }
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
241pub struct StreamingConstraints {
242 pub max_latency_ms: f32,
244 pub max_bitrate_kbps: u32,
246 pub sample_rate: u32,
248 pub channels: u32,
250 pub recommended_buffer_ms: f32,
252 pub bandwidth_adaptation: bool,
254 pub quality_levels: Vec<StreamQuality>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct StreamConfig {
261 pub platform: StreamingPlatform,
263 pub buffer_size: usize,
265 pub sample_rate: u32,
267 pub channels: u32,
269 pub quality: StreamQuality,
271 pub enable_monitoring: bool,
273 pub enable_adaptation: bool,
275 pub voice_activity_detection: bool,
277 pub automatic_gain_control: bool,
279 pub noise_suppression: bool,
281 pub platform_optimizations: HashMap<String, f32>,
283}
284
285impl StreamConfig {
286 pub fn twitch_optimized() -> Self {
288 let mut platform_optimizations = HashMap::new();
289 platform_optimizations.insert("twitch_chat_integration".to_string(), 1.0);
290 platform_optimizations.insert("twitch_bitrate_optimization".to_string(), 0.9);
291 platform_optimizations.insert("twitch_emote_response".to_string(), 0.8);
292
293 Self {
294 platform: StreamingPlatform::Twitch,
295 buffer_size: 1024,
296 sample_rate: 48000,
297 channels: 2,
298 quality: StreamQuality::High,
299 enable_monitoring: true,
300 enable_adaptation: true,
301 voice_activity_detection: true,
302 automatic_gain_control: true,
303 noise_suppression: true,
304 platform_optimizations,
305 }
306 }
307
308 pub fn youtube_optimized() -> Self {
310 let mut platform_optimizations = HashMap::new();
311 platform_optimizations.insert("youtube_quality_priority".to_string(), 1.0);
312 platform_optimizations.insert("youtube_latency_tolerance".to_string(), 0.7);
313
314 Self {
315 platform: StreamingPlatform::YouTubeLive,
316 buffer_size: 2048, sample_rate: 48000,
318 channels: 2,
319 quality: StreamQuality::Source,
320 enable_monitoring: true,
321 enable_adaptation: true,
322 voice_activity_detection: true,
323 automatic_gain_control: true,
324 noise_suppression: true,
325 platform_optimizations,
326 }
327 }
328
329 pub fn discord_optimized() -> Self {
331 let mut platform_optimizations = HashMap::new();
332 platform_optimizations.insert("discord_voice_channel_optimization".to_string(), 1.0);
333 platform_optimizations.insert("discord_push_to_talk_support".to_string(), 0.9);
334
335 Self {
336 platform: StreamingPlatform::Discord,
337 buffer_size: 480, sample_rate: 48000,
339 channels: 2,
340 quality: StreamQuality::High,
341 enable_monitoring: true,
342 enable_adaptation: false, voice_activity_detection: true,
344 automatic_gain_control: false, noise_suppression: false, platform_optimizations,
347 }
348 }
349
350 pub fn obs_optimized() -> Self {
352 let mut platform_optimizations = HashMap::new();
353 platform_optimizations.insert("obs_plugin_integration".to_string(), 1.0);
354 platform_optimizations.insert("obs_audio_filter_chain".to_string(), 0.9);
355
356 Self {
357 platform: StreamingPlatform::OBSStudio,
358 buffer_size: 512,
359 sample_rate: 48000,
360 channels: 2,
361 quality: StreamQuality::Source,
362 enable_monitoring: true,
363 enable_adaptation: false, voice_activity_detection: true,
365 automatic_gain_control: true,
366 noise_suppression: true,
367 platform_optimizations,
368 }
369 }
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
374pub enum StreamVoiceMode {
375 StreamerVoice,
377 GuestVoice,
379 CharacterVoice,
381 TextToSpeech,
383 AlertVoice,
385 NarratorVoice,
387}
388
389#[derive(Debug)]
391pub struct StreamProcessor {
392 platform: StreamingPlatform,
394 config: StreamConfig,
396 realtime_converter: RealtimeConverter,
398 voice_converter: Arc<VoiceConverter>,
400 constraints: StreamingConstraints,
402 voice_presets: Arc<RwLock<HashMap<String, VoiceCharacteristics>>>,
404 active_streams: Arc<RwLock<HashMap<String, StreamSession>>>,
406 performance_monitor: StreamPerformanceMonitor,
408 adaptation_state: BandwidthAdaptationState,
410}
411
412impl StreamProcessor {
413 pub fn new(platform: StreamingPlatform, config: StreamConfig) -> Result<Self> {
415 let constraints = platform.streaming_constraints();
416
417 let realtime_config = RealtimeConfig {
419 buffer_size: config.buffer_size,
420 sample_rate: config.sample_rate,
421 target_latency_ms: constraints.max_latency_ms,
422 overlap_factor: 0.25,
423 adaptive_buffering: config.enable_adaptation,
424 max_threads: 2,
425 enable_lookahead: false, lookahead_size: 0,
427 };
428
429 let realtime_converter = RealtimeConverter::new(realtime_config)?;
430 let voice_converter = Arc::new(VoiceConverter::new()?);
431
432 Ok(Self {
433 platform,
434 config,
435 realtime_converter,
436 voice_converter,
437 constraints,
438 voice_presets: Arc::new(RwLock::new(HashMap::new())),
439 active_streams: Arc::new(RwLock::new(HashMap::new())),
440 performance_monitor: StreamPerformanceMonitor::new(),
441 adaptation_state: BandwidthAdaptationState::new(),
442 })
443 }
444
445 pub async fn process_stream_audio(
447 &mut self,
448 input_audio: &[f32],
449 voice_preset: &str,
450 ) -> Result<Vec<f32>> {
451 let start_time = std::time::Instant::now();
452
453 if self.config.enable_adaptation {
455 self.update_bandwidth_adaptation().await?;
456 }
457
458 let voice_characteristics = {
460 let presets = self.voice_presets.read().await;
461 presets.get(voice_preset).cloned().unwrap_or_default()
462 };
463
464 let target = ConversionTarget::new(voice_characteristics);
466 self.realtime_converter.set_conversion_target(target);
467
468 let mut result = self.realtime_converter.process_chunk(input_audio).await?;
470
471 if self.config.noise_suppression {
473 result = self.apply_streaming_noise_suppression(&result)?;
474 }
475
476 if self.config.automatic_gain_control {
477 result = self.apply_streaming_agc(&result)?;
478 }
479
480 let processing_time = start_time.elapsed();
482 self.performance_monitor.record_processing(
483 processing_time,
484 input_audio.len(),
485 &self.constraints,
486 );
487
488 debug!(
489 "Stream audio processed: {} samples in {:.2}ms for {}",
490 input_audio.len(),
491 processing_time.as_secs_f32() * 1000.0,
492 self.platform.as_str()
493 );
494
495 Ok(result)
496 }
497
498 pub async fn process_stream_with_mode(
500 &mut self,
501 input_audio: &[f32],
502 voice_preset: &str,
503 mode: StreamVoiceMode,
504 ) -> Result<Vec<f32>> {
505 let processed_audio = match mode {
507 StreamVoiceMode::StreamerVoice => {
508 self.apply_streamer_processing(input_audio, voice_preset)
509 .await?
510 }
511 StreamVoiceMode::GuestVoice => {
512 self.apply_guest_processing(input_audio, voice_preset)
513 .await?
514 }
515 StreamVoiceMode::CharacterVoice => {
516 self.apply_character_processing(input_audio, voice_preset)
517 .await?
518 }
519 StreamVoiceMode::TextToSpeech => {
520 self.apply_tts_processing(input_audio, voice_preset).await?
521 }
522 StreamVoiceMode::AlertVoice => {
523 self.apply_alert_processing(input_audio, voice_preset)
524 .await?
525 }
526 StreamVoiceMode::NarratorVoice => {
527 self.apply_narrator_processing(input_audio, voice_preset)
528 .await?
529 }
530 };
531
532 Ok(processed_audio)
533 }
534
535 pub async fn register_voice_preset(
537 &self,
538 preset_name: String,
539 characteristics: VoiceCharacteristics,
540 ) {
541 let mut presets = self.voice_presets.write().await;
542 presets.insert(preset_name.clone(), characteristics);
543 info!("Registered voice preset: {}", preset_name);
544 }
545
546 pub async fn start_stream_session(
548 &self,
549 session_id: String,
550 stream_title: String,
551 ) -> Result<()> {
552 let session = StreamSession {
553 session_id: session_id.clone(),
554 stream_title,
555 start_time: std::time::Instant::now(),
556 platform: self.platform,
557 processed_frames: 0,
558 total_latency_ms: 0.0,
559 quality_drops: 0,
560 };
561
562 let mut streams = self.active_streams.write().await;
563 streams.insert(session_id.clone(), session);
564
565 info!(
566 "Started streaming session: {} on {}",
567 session_id,
568 self.platform.as_str()
569 );
570 Ok(())
571 }
572
573 pub async fn stop_stream_session(&self, session_id: &str) -> Result<StreamSession> {
575 let mut streams = self.active_streams.write().await;
576 streams
577 .remove(session_id)
578 .ok_or_else(|| Error::processing(format!("Stream session not found: {}", session_id)))
579 }
580
581 pub fn get_stream_metrics(&self) -> StreamPerformanceMetrics {
583 self.performance_monitor.get_current_metrics()
584 }
585
586 pub fn is_stream_performance_acceptable(&self) -> bool {
588 self.performance_monitor
589 .check_streaming_performance(&self.constraints)
590 }
591
592 pub fn get_platform_integration(&self) -> PlatformIntegration {
594 match self.platform {
595 StreamingPlatform::Twitch => PlatformIntegration::Twitch(TwitchIntegration {
596 api_version: "v1".to_string(),
597 chat_integration: true,
598 emote_support: true,
599 subscriber_features: true,
600 clip_integration: false,
601 }),
602 StreamingPlatform::YouTubeLive => PlatformIntegration::YouTube(YouTubeIntegration {
603 api_version: "v3".to_string(),
604 live_chat_integration: true,
605 super_chat_support: true,
606 recording_support: true,
607 analytics_integration: true,
608 }),
609 StreamingPlatform::Discord => PlatformIntegration::Discord(DiscordIntegration {
610 api_version: "v10".to_string(),
611 voice_channel_integration: true,
612 bot_integration: true,
613 stage_channel_support: true,
614 permission_system: true,
615 }),
616 StreamingPlatform::OBSStudio => PlatformIntegration::OBS(OBSIntegration {
617 plugin_version: "1.0.0".to_string(),
618 audio_filter_support: true,
619 scene_switching: true,
620 source_integration: true,
621 hotkey_support: true,
622 }),
623 StreamingPlatform::Streamlabs => {
624 PlatformIntegration::Streamlabs(StreamlabsIntegration {
625 api_version: "v1".to_string(),
626 donation_integration: true,
627 alert_system: true,
628 overlay_support: true,
629 chatbot_integration: true,
630 })
631 }
632 StreamingPlatform::XSplit => PlatformIntegration::XSplit(XSplitIntegration {
633 plugin_version: "1.0.0".to_string(),
634 scene_management: true,
635 audio_plugin_support: true,
636 broadcast_profiles: true,
637 }),
638 StreamingPlatform::RTMP => PlatformIntegration::RTMP(RTMPIntegration {
639 protocol_version: "1.0".to_string(),
640 streaming_server_support: true,
641 custom_endpoints: true,
642 authentication_support: true,
643 }),
644 StreamingPlatform::FacebookLive => PlatformIntegration::Facebook(FacebookIntegration {
645 api_version: "v15.0".to_string(),
646 live_video_integration: true,
647 comment_integration: true,
648 reaction_support: true,
649 }),
650 StreamingPlatform::TikTokLive => PlatformIntegration::TikTok(TikTokIntegration {
651 api_version: "v1".to_string(),
652 live_stream_integration: true,
653 gift_integration: true,
654 comment_moderation: true,
655 }),
656 }
657 }
658
659 async fn update_bandwidth_adaptation(&mut self) -> Result<()> {
662 let metrics = self.performance_monitor.get_current_metrics();
664
665 if metrics.average_latency_ms > self.constraints.max_latency_ms * 1.2 {
666 self.adaptation_state.decrease_quality();
667 } else if metrics.average_latency_ms < self.constraints.max_latency_ms * 0.5 {
668 self.adaptation_state.increase_quality();
669 }
670
671 Ok(())
672 }
673
674 async fn apply_streamer_processing(
675 &mut self,
676 input_audio: &[f32],
677 voice_preset: &str,
678 ) -> Result<Vec<f32>> {
679 self.process_stream_audio(input_audio, voice_preset).await
680 }
681
682 async fn apply_guest_processing(
683 &mut self,
684 input_audio: &[f32],
685 voice_preset: &str,
686 ) -> Result<Vec<f32>> {
687 let mut result = self.process_stream_audio(input_audio, voice_preset).await?;
689
690 for sample in result.iter_mut() {
692 *sample *= 0.9; }
694
695 Ok(result)
696 }
697
698 async fn apply_character_processing(
699 &mut self,
700 input_audio: &[f32],
701 voice_preset: &str,
702 ) -> Result<Vec<f32>> {
703 self.process_stream_audio(input_audio, voice_preset).await
704 }
705
706 async fn apply_tts_processing(
707 &mut self,
708 input_audio: &[f32],
709 voice_preset: &str,
710 ) -> Result<Vec<f32>> {
711 let mut result = self.process_stream_audio(input_audio, voice_preset).await?;
713
714 for sample in result.iter_mut() {
716 *sample = (*sample * 1.1).clamp(-1.0, 1.0);
717 }
718
719 Ok(result)
720 }
721
722 async fn apply_alert_processing(
723 &mut self,
724 input_audio: &[f32],
725 _voice_preset: &str,
726 ) -> Result<Vec<f32>> {
727 let mut result = input_audio.to_vec();
729
730 for sample in result.iter_mut() {
732 *sample = (*sample * 1.3).clamp(-1.0, 1.0);
733 }
734
735 Ok(result)
736 }
737
738 async fn apply_narrator_processing(
739 &mut self,
740 input_audio: &[f32],
741 voice_preset: &str,
742 ) -> Result<Vec<f32>> {
743 let mut result = self.process_stream_audio(input_audio, voice_preset).await?;
745
746 for sample in result.iter_mut() {
748 *sample = (*sample * 1.05).clamp(-1.0, 1.0);
749 }
750
751 Ok(result)
752 }
753
754 fn apply_streaming_noise_suppression(&self, audio: &[f32]) -> Result<Vec<f32>> {
755 let threshold = 0.02; let mut processed = audio.to_vec();
758
759 for sample in processed.iter_mut() {
760 if sample.abs() < threshold {
761 *sample *= 0.05; }
763 }
764
765 Ok(processed)
766 }
767
768 fn apply_streaming_agc(&self, audio: &[f32]) -> Result<Vec<f32>> {
769 let target_level = 0.8; let current_level = audio.iter().map(|&x| x.abs()).sum::<f32>() / audio.len() as f32;
772
773 if current_level > 0.0 {
774 let gain = target_level / current_level;
775 let clamped_gain = gain.clamp(0.3, 3.0); Ok(audio
778 .iter()
779 .map(|&x| (x * clamped_gain).clamp(-1.0, 1.0))
780 .collect())
781 } else {
782 Ok(audio.to_vec())
783 }
784 }
785}
786
787#[derive(Debug, Clone)]
789pub struct StreamSession {
790 pub session_id: String,
792 pub stream_title: String,
794 pub start_time: std::time::Instant,
796 pub platform: StreamingPlatform,
798 pub processed_frames: u64,
800 pub total_latency_ms: f32,
802 pub quality_drops: u32,
804}
805
806#[derive(Debug, Clone)]
808pub struct BandwidthAdaptationState {
809 pub current_quality: StreamQuality,
811 pub target_bitrate: u32,
813 pub adaptation_history: Vec<AdaptationEvent>,
815}
816
817impl BandwidthAdaptationState {
818 fn new() -> Self {
819 Self {
820 current_quality: StreamQuality::High,
821 target_bitrate: 128,
822 adaptation_history: Vec::new(),
823 }
824 }
825
826 fn decrease_quality(&mut self) {
827 self.current_quality = match self.current_quality {
828 StreamQuality::Source => StreamQuality::High,
829 StreamQuality::High => StreamQuality::Medium,
830 StreamQuality::Medium => StreamQuality::Low,
831 StreamQuality::Low => StreamQuality::Low,
832 };
833
834 self.adaptation_history.push(AdaptationEvent {
835 timestamp: std::time::Instant::now(),
836 direction: AdaptationDirection::Decrease,
837 new_quality: self.current_quality,
838 });
839 }
840
841 fn increase_quality(&mut self) {
842 self.current_quality = match self.current_quality {
843 StreamQuality::Low => StreamQuality::Medium,
844 StreamQuality::Medium => StreamQuality::High,
845 StreamQuality::High => StreamQuality::Source,
846 StreamQuality::Source => StreamQuality::Source,
847 };
848
849 self.adaptation_history.push(AdaptationEvent {
850 timestamp: std::time::Instant::now(),
851 direction: AdaptationDirection::Increase,
852 new_quality: self.current_quality,
853 });
854 }
855}
856
857#[derive(Debug, Clone)]
859pub struct AdaptationEvent {
860 pub timestamp: std::time::Instant,
862 pub direction: AdaptationDirection,
864 pub new_quality: StreamQuality,
866}
867
868#[derive(Debug, Clone, Copy, PartialEq, Eq)]
870pub enum AdaptationDirection {
871 Increase,
873 Decrease,
875}
876
877#[derive(Debug)]
879pub struct StreamPerformanceMonitor {
880 processing_times: Vec<std::time::Duration>,
881 bandwidth_samples: Vec<u32>,
882 quality_drops: u32,
883 last_check: std::time::Instant,
884}
885
886impl StreamPerformanceMonitor {
887 fn new() -> Self {
888 Self {
889 processing_times: Vec::new(),
890 bandwidth_samples: Vec::new(),
891 quality_drops: 0,
892 last_check: std::time::Instant::now(),
893 }
894 }
895
896 fn record_processing(
897 &mut self,
898 processing_time: std::time::Duration,
899 _sample_count: usize,
900 constraints: &StreamingConstraints,
901 ) {
902 self.processing_times.push(processing_time);
903
904 if self.processing_times.len() > 100 {
906 self.processing_times.drain(0..50);
907 }
908
909 let latency_ms = processing_time.as_secs_f32() * 1000.0;
911 if latency_ms > constraints.max_latency_ms {
912 self.quality_drops += 1;
913 }
914 }
915
916 fn check_streaming_performance(&self, constraints: &StreamingConstraints) -> bool {
917 if self.processing_times.is_empty() {
918 return true;
919 }
920
921 let avg_latency_ms = self
922 .processing_times
923 .iter()
924 .map(|d| d.as_secs_f32() * 1000.0)
925 .sum::<f32>()
926 / self.processing_times.len() as f32;
927
928 avg_latency_ms <= constraints.max_latency_ms
929 }
930
931 fn get_current_metrics(&self) -> StreamPerformanceMetrics {
932 let avg_latency_ms = if self.processing_times.is_empty() {
933 0.0
934 } else {
935 self.processing_times
936 .iter()
937 .map(|d| d.as_secs_f32() * 1000.0)
938 .sum::<f32>()
939 / self.processing_times.len() as f32
940 };
941
942 StreamPerformanceMetrics {
943 average_latency_ms: avg_latency_ms,
944 current_bitrate_kbps: self.bandwidth_samples.last().copied().unwrap_or(0),
945 quality_drops: self.quality_drops,
946 uptime_seconds: self.last_check.elapsed().as_secs(),
947 buffer_health_percent: 95.0, }
949 }
950}
951
952#[derive(Debug, Clone, Serialize, Deserialize)]
954pub struct StreamPerformanceMetrics {
955 pub average_latency_ms: f32,
957 pub current_bitrate_kbps: u32,
959 pub quality_drops: u32,
961 pub uptime_seconds: u64,
963 pub buffer_health_percent: f32,
965}
966
967#[derive(Debug, Clone, Serialize, Deserialize)]
969pub enum PlatformIntegration {
970 Twitch(TwitchIntegration),
972 YouTube(YouTubeIntegration),
974 Discord(DiscordIntegration),
976 OBS(OBSIntegration),
978 Streamlabs(StreamlabsIntegration),
980 XSplit(XSplitIntegration),
982 RTMP(RTMPIntegration),
984 Facebook(FacebookIntegration),
986 TikTok(TikTokIntegration),
988}
989
990#[derive(Debug, Clone, Serialize, Deserialize)]
992pub struct TwitchIntegration {
993 pub api_version: String,
995 pub chat_integration: bool,
997 pub emote_support: bool,
999 pub subscriber_features: bool,
1001 pub clip_integration: bool,
1003}
1004
1005#[derive(Debug, Clone, Serialize, Deserialize)]
1007pub struct YouTubeIntegration {
1008 pub api_version: String,
1010 pub live_chat_integration: bool,
1012 pub super_chat_support: bool,
1014 pub recording_support: bool,
1016 pub analytics_integration: bool,
1018}
1019
1020#[derive(Debug, Clone, Serialize, Deserialize)]
1022pub struct DiscordIntegration {
1023 pub api_version: String,
1025 pub voice_channel_integration: bool,
1027 pub bot_integration: bool,
1029 pub stage_channel_support: bool,
1031 pub permission_system: bool,
1033}
1034
1035#[derive(Debug, Clone, Serialize, Deserialize)]
1037pub struct OBSIntegration {
1038 pub plugin_version: String,
1040 pub audio_filter_support: bool,
1042 pub scene_switching: bool,
1044 pub source_integration: bool,
1046 pub hotkey_support: bool,
1048}
1049
1050#[derive(Debug, Clone, Serialize, Deserialize)]
1052pub struct StreamlabsIntegration {
1053 pub api_version: String,
1055 pub donation_integration: bool,
1057 pub alert_system: bool,
1059 pub overlay_support: bool,
1061 pub chatbot_integration: bool,
1063}
1064
1065#[derive(Debug, Clone, Serialize, Deserialize)]
1067pub struct XSplitIntegration {
1068 pub plugin_version: String,
1070 pub scene_management: bool,
1072 pub audio_plugin_support: bool,
1074 pub broadcast_profiles: bool,
1076}
1077
1078#[derive(Debug, Clone, Serialize, Deserialize)]
1080pub struct RTMPIntegration {
1081 pub protocol_version: String,
1083 pub streaming_server_support: bool,
1085 pub custom_endpoints: bool,
1087 pub authentication_support: bool,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1093pub struct FacebookIntegration {
1094 pub api_version: String,
1096 pub live_video_integration: bool,
1098 pub comment_integration: bool,
1100 pub reaction_support: bool,
1102}
1103
1104#[derive(Debug, Clone, Serialize, Deserialize)]
1106pub struct TikTokIntegration {
1107 pub api_version: String,
1109 pub live_stream_integration: bool,
1111 pub gift_integration: bool,
1113 pub comment_moderation: bool,
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119 use super::*;
1120
1121 #[test]
1122 fn test_streaming_platform_constraints() {
1123 let twitch_constraints = StreamingPlatform::Twitch.streaming_constraints();
1124 assert!(twitch_constraints.max_latency_ms <= 100.0);
1125 assert_eq!(twitch_constraints.sample_rate, 48000);
1126
1127 let discord_constraints = StreamingPlatform::Discord.streaming_constraints();
1128 assert!(discord_constraints.max_latency_ms <= 40.0);
1129 assert!(!discord_constraints.bandwidth_adaptation);
1130 }
1131
1132 #[test]
1133 fn test_stream_quality_levels() {
1134 assert_eq!(StreamQuality::Source.quality_factor(), 1.0);
1135 assert_eq!(StreamQuality::High.quality_factor(), 0.9);
1136 assert_eq!(StreamQuality::Medium.quality_factor(), 0.7);
1137 assert_eq!(StreamQuality::Low.quality_factor(), 0.5);
1138 }
1139
1140 #[test]
1141 fn test_stream_config_creation() {
1142 let twitch_config = StreamConfig::twitch_optimized();
1143 assert_eq!(twitch_config.platform, StreamingPlatform::Twitch);
1144 assert!(twitch_config.enable_adaptation);
1145
1146 let discord_config = StreamConfig::discord_optimized();
1147 assert_eq!(discord_config.platform, StreamingPlatform::Discord);
1148 assert!(!discord_config.enable_adaptation);
1149 assert_eq!(discord_config.buffer_size, 480);
1150 }
1151
1152 #[tokio::test]
1153 async fn test_stream_processor_creation() {
1154 let config = StreamConfig::twitch_optimized();
1155 let processor = StreamProcessor::new(StreamingPlatform::Twitch, config);
1156 assert!(processor.is_ok());
1157
1158 let processor = processor.unwrap();
1159 assert_eq!(processor.platform, StreamingPlatform::Twitch);
1160 }
1161
1162 #[tokio::test]
1163 async fn test_voice_preset_registration() {
1164 let config = StreamConfig::twitch_optimized();
1165 let processor = StreamProcessor::new(StreamingPlatform::Twitch, config).unwrap();
1166
1167 let characteristics = VoiceCharacteristics::default();
1168 processor
1169 .register_voice_preset("streamer_main".to_string(), characteristics)
1170 .await;
1171
1172 let presets = processor.voice_presets.read().await;
1173 assert!(presets.contains_key("streamer_main"));
1174 }
1175
1176 #[tokio::test]
1177 async fn test_stream_session_management() {
1178 let config = StreamConfig::twitch_optimized();
1179 let processor = StreamProcessor::new(StreamingPlatform::Twitch, config).unwrap();
1180
1181 let result = processor
1183 .start_stream_session("stream1".to_string(), "Test Stream".to_string())
1184 .await;
1185 assert!(result.is_ok());
1186
1187 let streams = processor.active_streams.read().await;
1189 assert!(streams.contains_key("stream1"));
1190 }
1191
1192 #[test]
1193 fn test_bandwidth_adaptation() {
1194 let mut adaptation = BandwidthAdaptationState::new();
1195
1196 let initial_quality = adaptation.current_quality;
1197 adaptation.decrease_quality();
1198 assert_ne!(adaptation.current_quality, initial_quality);
1199
1200 adaptation.increase_quality();
1201 assert_eq!(adaptation.current_quality, initial_quality);
1202 }
1203
1204 #[test]
1205 fn test_stream_voice_modes() {
1206 let modes = [
1207 StreamVoiceMode::StreamerVoice,
1208 StreamVoiceMode::GuestVoice,
1209 StreamVoiceMode::CharacterVoice,
1210 StreamVoiceMode::TextToSpeech,
1211 StreamVoiceMode::AlertVoice,
1212 StreamVoiceMode::NarratorVoice,
1213 ];
1214
1215 for mode in &modes {
1216 let serialized = serde_json::to_string(mode).unwrap();
1218 let deserialized: StreamVoiceMode = serde_json::from_str(&serialized).unwrap();
1219 assert_eq!(*mode, deserialized);
1220 }
1221 }
1222
1223 #[test]
1224 fn test_performance_monitor() {
1225 let mut monitor = StreamPerformanceMonitor::new();
1226 let constraints = StreamingPlatform::Twitch.streaming_constraints();
1227
1228 monitor.record_processing(std::time::Duration::from_millis(50), 1024, &constraints);
1230
1231 let metrics = monitor.get_current_metrics();
1232 assert!(metrics.average_latency_ms > 0.0);
1233 assert!(monitor.check_streaming_performance(&constraints));
1234 }
1235
1236 #[test]
1237 fn test_platform_integration_info() {
1238 let config = StreamConfig::twitch_optimized();
1239 let processor = StreamProcessor::new(StreamingPlatform::Twitch, config).unwrap();
1240
1241 let integration = processor.get_platform_integration();
1242 match integration {
1243 PlatformIntegration::Twitch(twitch) => {
1244 assert!(twitch.chat_integration);
1245 assert!(twitch.emote_support);
1246 }
1247 _ => panic!("Expected Twitch integration"),
1248 }
1249 }
1250
1251 #[tokio::test]
1252 async fn test_streaming_audio_processing() {
1253 let config = StreamConfig::discord_optimized();
1254 let mut processor = StreamProcessor::new(StreamingPlatform::Discord, config).unwrap();
1255
1256 let characteristics = VoiceCharacteristics::default();
1258 processor
1259 .register_voice_preset("test_voice".to_string(), characteristics)
1260 .await;
1261
1262 let test_audio = vec![0.1, -0.2, 0.3, -0.4, 0.5];
1264 let result = processor
1265 .process_stream_audio(&test_audio, "test_voice")
1266 .await;
1267
1268 assert!(result.is_ok());
1269 let processed = result.unwrap();
1270 assert!(!processed.is_empty());
1271 }
1272
1273 #[tokio::test]
1274 async fn test_noise_suppression_streaming() {
1275 let config = StreamConfig::twitch_optimized();
1276 let processor = StreamProcessor::new(StreamingPlatform::Twitch, config).unwrap();
1277
1278 let noisy_audio = vec![0.01, 0.5, 0.015, -0.7]; let processed = processor
1280 .apply_streaming_noise_suppression(&noisy_audio)
1281 .unwrap();
1282
1283 assert!(processed[0].abs() < noisy_audio[0].abs() * 0.1);
1285 assert!(processed[2].abs() < noisy_audio[2].abs() * 0.1);
1286 }
1287
1288 #[tokio::test]
1289 async fn test_streaming_agc() {
1290 let config = StreamConfig::youtube_optimized();
1291 let processor = StreamProcessor::new(StreamingPlatform::YouTubeLive, config).unwrap();
1292
1293 let quiet_audio = vec![0.1, -0.1, 0.05, -0.05];
1294 let processed = processor.apply_streaming_agc(&quiet_audio).unwrap();
1295
1296 let original_level =
1298 quiet_audio.iter().map(|&x| x.abs()).sum::<f32>() / quiet_audio.len() as f32;
1299 let processed_level =
1300 processed.iter().map(|&x| x.abs()).sum::<f32>() / processed.len() as f32;
1301 assert!(processed_level > original_level * 2.0); }
1303}