enigma_rtc/
session.rs

1use std::sync::{Arc, Mutex};
2
3use async_trait::async_trait;
4use tokio::sync::{mpsc, Mutex as AsyncMutex};
5
6use crate::config::RtcConfig;
7use crate::error::{EnigmaRtcError, RtcResult};
8use crate::signaling::decode_signaling;
9use crate::types::{CallState, RtcEvent, SignalingMessage};
10use crate::webrtc::{RtcEngine, WebRtcEngine};
11
12pub struct CallSession {
13    state: Mutex<CallState>,
14    engine: Arc<dyn RtcEngine>,
15    event_tx: mpsc::UnboundedSender<RtcEvent>,
16    event_rx: AsyncMutex<mpsc::UnboundedReceiver<RtcEvent>>,
17    config: RtcConfig,
18}
19
20impl CallSession {
21    pub fn new(config: RtcConfig) -> RtcResult<Self> {
22        let (event_tx, event_rx) = mpsc::unbounded_channel();
23        let engine = Arc::new(WebRtcEngine::new(config.clone())?);
24        engine.set_event_sender(event_tx.clone())?;
25        Ok(Self::from_parts(config, engine, event_tx, event_rx))
26    }
27
28    pub fn with_engine<E>(config: RtcConfig, engine: Arc<E>) -> RtcResult<Self>
29    where
30        E: RtcEngine + 'static,
31    {
32        let (event_tx, event_rx) = mpsc::unbounded_channel();
33        engine.set_event_sender(event_tx.clone())?;
34        let engine: Arc<dyn RtcEngine> = engine;
35        Ok(Self::from_parts(config, engine, event_tx, event_rx))
36    }
37
38    pub fn state(&self) -> CallState {
39        match self.state.lock() {
40            Ok(guard) => *guard,
41            Err(poisoned) => *poisoned.into_inner(),
42        }
43    }
44
45    pub fn config(&self) -> &RtcConfig {
46        &self.config
47    }
48
49    pub fn create_offer(&self) -> RtcResult<SignalingMessage> {
50        {
51            let mut state = self
52                .state
53                .lock()
54                .map_err(|_| EnigmaRtcError::InvalidState)?;
55            if *state != CallState::Idle {
56                return Err(EnigmaRtcError::InvalidState);
57            }
58            *state = CallState::CreatingOffer;
59        }
60        self.emit_state(CallState::CreatingOffer)?;
61        let sdp = self.engine.create_offer()?;
62        if sdp.trim().is_empty() {
63            return Err(EnigmaRtcError::InvalidSdp);
64        }
65        {
66            let mut state = self
67                .state
68                .lock()
69                .map_err(|_| EnigmaRtcError::InvalidState)?;
70            *state = CallState::WaitingAnswer;
71        }
72        self.emit_state(CallState::WaitingAnswer)?;
73        Ok(SignalingMessage::Offer { sdp })
74    }
75
76    pub fn accept_offer(&self, offer_sdp: &str) -> RtcResult<SignalingMessage> {
77        if offer_sdp.trim().is_empty() {
78            return Err(EnigmaRtcError::InvalidSdp);
79        }
80        {
81            let mut state = self
82                .state
83                .lock()
84                .map_err(|_| EnigmaRtcError::InvalidState)?;
85            if *state != CallState::Idle {
86                return Err(EnigmaRtcError::InvalidState);
87            }
88            *state = CallState::IncomingOffer;
89        }
90        self.emit_state(CallState::IncomingOffer)?;
91        let sdp = self.engine.create_answer(offer_sdp)?;
92        if sdp.trim().is_empty() {
93            return Err(EnigmaRtcError::InvalidSdp);
94        }
95        {
96            let mut state = self
97                .state
98                .lock()
99                .map_err(|_| EnigmaRtcError::InvalidState)?;
100            *state = CallState::Connected;
101        }
102        self.emit_state(CallState::Connected)?;
103        self.emit_event(RtcEvent::ConnectionEstablished)?;
104        Ok(SignalingMessage::Answer { sdp })
105    }
106
107    pub fn accept_answer(&self, answer_sdp: &str) -> RtcResult<()> {
108        if answer_sdp.trim().is_empty() {
109            return Err(EnigmaRtcError::InvalidSdp);
110        }
111        {
112            let state = self
113                .state
114                .lock()
115                .map_err(|_| EnigmaRtcError::InvalidState)?;
116            if *state != CallState::WaitingAnswer {
117                return Err(EnigmaRtcError::InvalidState);
118            }
119        }
120        self.engine.apply_answer(answer_sdp)?;
121        {
122            let mut state = self
123                .state
124                .lock()
125                .map_err(|_| EnigmaRtcError::InvalidState)?;
126            *state = CallState::Connected;
127        }
128        self.emit_state(CallState::Connected)?;
129        self.emit_event(RtcEvent::ConnectionEstablished)
130    }
131
132    pub fn add_ice_candidate(&self, candidate_json: &str) -> RtcResult<()> {
133        {
134            let state = self
135                .state
136                .lock()
137                .map_err(|_| EnigmaRtcError::InvalidState)?;
138            match *state {
139                CallState::WaitingAnswer | CallState::IncomingOffer | CallState::Connected => {}
140                _ => return Err(EnigmaRtcError::InvalidState),
141            }
142        }
143        let message = decode_signaling(candidate_json)?;
144        match message {
145            SignalingMessage::IceCandidate { .. } => {
146                self.engine.add_remote_candidate(candidate_json)
147            }
148            _ => Err(EnigmaRtcError::InvalidCandidate),
149        }
150    }
151
152    pub fn hangup(&self, reason: Option<&str>) -> RtcResult<SignalingMessage> {
153        {
154            let mut state = self
155                .state
156                .lock()
157                .map_err(|_| EnigmaRtcError::InvalidState)?;
158            match *state {
159                CallState::Idle
160                | CallState::CreatingOffer
161                | CallState::WaitingAnswer
162                | CallState::IncomingOffer
163                | CallState::Connected => {
164                    *state = CallState::Ending;
165                }
166                CallState::Ending | CallState::Ended => return Err(EnigmaRtcError::InvalidState),
167            }
168        }
169        self.emit_state(CallState::Ending)?;
170        {
171            let mut state = self
172                .state
173                .lock()
174                .map_err(|_| EnigmaRtcError::InvalidState)?;
175            *state = CallState::Ended;
176        }
177        self.emit_state(CallState::Ended)?;
178        self.emit_event(RtcEvent::ConnectionClosed)?;
179        Ok(SignalingMessage::Hangup {
180            reason: reason
181                .filter(|s| !s.trim().is_empty())
182                .map(|s| s.to_string()),
183        })
184    }
185
186    pub fn set_microphone_enabled(&self, enabled: bool) -> RtcResult<()> {
187        {
188            let state = self
189                .state
190                .lock()
191                .map_err(|_| EnigmaRtcError::InvalidState)?;
192            if matches!(*state, CallState::Ended) {
193                return Err(EnigmaRtcError::InvalidState);
194            }
195        }
196        self.engine.set_microphone_enabled(enabled)
197    }
198
199    pub fn set_camera_enabled(&self, enabled: bool) -> RtcResult<()> {
200        {
201            let state = self
202                .state
203                .lock()
204                .map_err(|_| EnigmaRtcError::InvalidState)?;
205            if matches!(*state, CallState::Ended) {
206                return Err(EnigmaRtcError::InvalidState);
207            }
208        }
209        self.engine.set_camera_enabled(enabled)
210    }
211
212    fn emit_state(&self, state: CallState) -> RtcResult<()> {
213        self.emit_event(RtcEvent::StateChanged(state))
214    }
215
216    fn emit_event(&self, event: RtcEvent) -> RtcResult<()> {
217        self.event_tx
218            .send(event)
219            .map_err(|_| EnigmaRtcError::ChannelClosed)
220    }
221
222    fn from_parts(
223        config: RtcConfig,
224        engine: Arc<dyn RtcEngine>,
225        event_tx: mpsc::UnboundedSender<RtcEvent>,
226        event_rx: mpsc::UnboundedReceiver<RtcEvent>,
227    ) -> Self {
228        Self {
229            state: Mutex::new(CallState::Idle),
230            engine,
231            event_tx,
232            event_rx: AsyncMutex::new(event_rx),
233            config,
234        }
235    }
236}
237
238#[async_trait]
239pub(crate) trait SessionEventReceiver: Send + Sync {
240    async fn recv_event(&self) -> RtcResult<RtcEvent>;
241}
242
243#[async_trait]
244impl SessionEventReceiver for CallSession {
245    async fn recv_event(&self) -> RtcResult<RtcEvent> {
246        let mut rx = self.event_rx.lock().await;
247        match rx.recv().await {
248            Some(event) => Ok(event),
249            None => Err(EnigmaRtcError::ChannelClosed),
250        }
251    }
252}
253
254impl CallSession {
255    pub async fn next_event(&self) -> RtcResult<RtcEvent> {
256        self.recv_event().await
257    }
258}