voirs_feedback/realtime/
system.rs1use super::stream::FeedbackStream;
4use super::types::{RealtimeConfig, RealtimeStats};
5use crate::traits::{FeedbackResponse, SessionState};
6use crate::FeedbackError;
7use std::collections::HashMap;
8use std::sync::{Arc, RwLock};
9use std::time::Duration;
10use tokio::time::timeout;
11use uuid::Uuid;
12use voirs_sdk::AudioBuffer;
13
14#[derive(Debug, Clone)]
16pub struct RealtimeFeedbackSystem {
17 config: RealtimeConfig,
19 streams: Arc<RwLock<HashMap<Uuid, FeedbackStream>>>,
21 stats: Arc<RwLock<RealtimeStats>>,
23}
24
25impl RealtimeFeedbackSystem {
26 pub async fn new() -> Result<Self, FeedbackError> {
28 Self::with_config(RealtimeConfig::default()).await
29 }
30
31 pub async fn with_config(config: RealtimeConfig) -> Result<Self, FeedbackError> {
33 Ok(Self {
34 config,
35 streams: Arc::new(RwLock::new(HashMap::new())),
36 stats: Arc::new(RwLock::new(RealtimeStats::default())),
37 })
38 }
39
40 pub async fn create_stream(
42 &self,
43 user_id: &str,
44 session_state: &SessionState,
45 ) -> Result<FeedbackStream, FeedbackError> {
46 let stream = FeedbackStream::new(
47 user_id.to_string(),
48 self.config.clone(),
49 session_state.clone(),
50 );
51
52 let ui_timeout = Duration::from_millis(50);
54
55 let streams_result = self.streams.try_write();
57 let stats_result = self.stats.try_write();
58
59 match (streams_result, stats_result) {
60 (Ok(mut streams), Ok(mut stats)) => {
61 streams.insert(stream.stream_id, stream.clone());
63 stats.active_streams += 1;
64 }
65 _ => {
66 return Err(FeedbackError::Timeout);
68 }
69 }
70
71 Ok(stream)
72 }
73
74 pub async fn get_statistics(&self) -> Result<RealtimeStats, FeedbackError> {
76 match self.stats.try_read() {
78 Ok(stats) => Ok(stats.clone()),
79 Err(_) => Err(FeedbackError::Timeout),
80 }
81 }
82
83 pub async fn remove_stream(&self, stream_id: Uuid) -> Result<(), FeedbackError> {
85 let streams_result = self.streams.try_write();
87 let stats_result = self.stats.try_write();
88
89 match (streams_result, stats_result) {
90 (Ok(mut streams), Ok(mut stats)) => {
91 if streams.remove(&stream_id).is_some() {
93 if stats.active_streams > 0 {
95 stats.active_streams -= 1;
96 }
97 }
98 }
99 _ => {
100 return Err(FeedbackError::Timeout);
102 }
103 }
104
105 Ok(())
106 }
107}