Skip to main content

voirs_feedback/realtime/
system.rs

1//! Core real-time feedback system
2
3use super::stream::FeedbackStream;
4use super::types::{RealtimeConfig, RealtimeStats};
5use crate::traits::{FeedbackResponse, SessionState};
6use crate::FeedbackError;
7use std::collections::HashMap;
8use std::sync::{Arc, RwLock};
9use std::time::Duration;
10use tokio::time::timeout;
11use uuid::Uuid;
12use voirs_sdk::AudioBuffer;
13
14/// Main real-time feedback system
15#[derive(Debug, Clone)]
16pub struct RealtimeFeedbackSystem {
17    /// System configuration
18    config: RealtimeConfig,
19    /// Active streams
20    streams: Arc<RwLock<HashMap<Uuid, FeedbackStream>>>,
21    /// System statistics
22    stats: Arc<RwLock<RealtimeStats>>,
23}
24
25impl RealtimeFeedbackSystem {
26    /// Create a new real-time feedback system
27    pub async fn new() -> Result<Self, FeedbackError> {
28        Self::with_config(RealtimeConfig::default()).await
29    }
30
31    /// Create a new real-time feedback system with custom configuration
32    pub async fn with_config(config: RealtimeConfig) -> Result<Self, FeedbackError> {
33        Ok(Self {
34            config,
35            streams: Arc::new(RwLock::new(HashMap::new())),
36            stats: Arc::new(RwLock::new(RealtimeStats::default())),
37        })
38    }
39
40    /// Create a new feedback stream
41    pub async fn create_stream(
42        &self,
43        user_id: &str,
44        session_state: &SessionState,
45    ) -> Result<FeedbackStream, FeedbackError> {
46        let stream = FeedbackStream::new(
47            user_id.to_string(),
48            self.config.clone(),
49            session_state.clone(),
50        );
51
52        // Use non-blocking operations with timeout to preserve UI responsiveness
53        let ui_timeout = Duration::from_millis(50);
54
55        // Try to acquire both locks non-blocking for UI responsiveness
56        let streams_result = self.streams.try_write();
57        let stats_result = self.stats.try_write();
58
59        match (streams_result, stats_result) {
60            (Ok(mut streams), Ok(mut stats)) => {
61                // Both locks acquired successfully - update atomically
62                streams.insert(stream.stream_id, stream.clone());
63                stats.active_streams += 1;
64            }
65            _ => {
66                // One or both locks failed - return timeout to preserve UI responsiveness
67                return Err(FeedbackError::Timeout);
68            }
69        }
70
71        Ok(stream)
72    }
73
74    /// Get system statistics
75    pub async fn get_statistics(&self) -> Result<RealtimeStats, FeedbackError> {
76        // Use try_read for non-blocking access to preserve UI responsiveness
77        match self.stats.try_read() {
78            Ok(stats) => Ok(stats.clone()),
79            Err(_) => Err(FeedbackError::Timeout),
80        }
81    }
82
83    /// Remove a stream
84    pub async fn remove_stream(&self, stream_id: Uuid) -> Result<(), FeedbackError> {
85        // Try to acquire both locks non-blocking for UI responsiveness
86        let streams_result = self.streams.try_write();
87        let stats_result = self.stats.try_write();
88
89        match (streams_result, stats_result) {
90            (Ok(mut streams), Ok(mut stats)) => {
91                // Both locks acquired successfully - update atomically
92                if streams.remove(&stream_id).is_some() {
93                    // Only decrement stats if the stream actually existed
94                    if stats.active_streams > 0 {
95                        stats.active_streams -= 1;
96                    }
97                }
98            }
99            _ => {
100                // One or both locks failed - return timeout to preserve UI responsiveness
101                return Err(FeedbackError::Timeout);
102            }
103        }
104
105        Ok(())
106    }
107}