1use std::sync::{Arc, Mutex};
2
3use async_trait::async_trait;
4use tokio::sync::{mpsc, Mutex as AsyncMutex};
5
6use crate::config::RtcConfig;
7use crate::error::{EnigmaRtcError, RtcResult};
8use crate::signaling::decode_signaling;
9use crate::types::{CallState, RtcEvent, SignalingMessage};
10use crate::webrtc::{RtcEngine, WebRtcEngine};
11
12pub struct CallSession {
13 state: Mutex<CallState>,
14 engine: Arc<dyn RtcEngine>,
15 event_tx: mpsc::UnboundedSender<RtcEvent>,
16 event_rx: AsyncMutex<mpsc::UnboundedReceiver<RtcEvent>>,
17 config: RtcConfig,
18}
19
20impl CallSession {
21 pub fn new(config: RtcConfig) -> RtcResult<Self> {
22 let (event_tx, event_rx) = mpsc::unbounded_channel();
23 let engine = Arc::new(WebRtcEngine::new(config.clone())?);
24 engine.set_event_sender(event_tx.clone())?;
25 Ok(Self::from_parts(config, engine, event_tx, event_rx))
26 }
27
28 pub fn with_engine<E>(config: RtcConfig, engine: Arc<E>) -> RtcResult<Self>
29 where
30 E: RtcEngine + 'static,
31 {
32 let (event_tx, event_rx) = mpsc::unbounded_channel();
33 engine.set_event_sender(event_tx.clone())?;
34 let engine: Arc<dyn RtcEngine> = engine;
35 Ok(Self::from_parts(config, engine, event_tx, event_rx))
36 }
37
38 pub fn state(&self) -> CallState {
39 match self.state.lock() {
40 Ok(guard) => *guard,
41 Err(poisoned) => *poisoned.into_inner(),
42 }
43 }
44
45 pub fn config(&self) -> &RtcConfig {
46 &self.config
47 }
48
49 pub fn create_offer(&self) -> RtcResult<SignalingMessage> {
50 {
51 let mut state = self
52 .state
53 .lock()
54 .map_err(|_| EnigmaRtcError::InvalidState)?;
55 if *state != CallState::Idle {
56 return Err(EnigmaRtcError::InvalidState);
57 }
58 *state = CallState::CreatingOffer;
59 }
60 self.emit_state(CallState::CreatingOffer)?;
61 let sdp = self.engine.create_offer()?;
62 if sdp.trim().is_empty() {
63 return Err(EnigmaRtcError::InvalidSdp);
64 }
65 {
66 let mut state = self
67 .state
68 .lock()
69 .map_err(|_| EnigmaRtcError::InvalidState)?;
70 *state = CallState::WaitingAnswer;
71 }
72 self.emit_state(CallState::WaitingAnswer)?;
73 Ok(SignalingMessage::Offer { sdp })
74 }
75
76 pub fn accept_offer(&self, offer_sdp: &str) -> RtcResult<SignalingMessage> {
77 if offer_sdp.trim().is_empty() {
78 return Err(EnigmaRtcError::InvalidSdp);
79 }
80 {
81 let mut state = self
82 .state
83 .lock()
84 .map_err(|_| EnigmaRtcError::InvalidState)?;
85 if *state != CallState::Idle {
86 return Err(EnigmaRtcError::InvalidState);
87 }
88 *state = CallState::IncomingOffer;
89 }
90 self.emit_state(CallState::IncomingOffer)?;
91 let sdp = self.engine.create_answer(offer_sdp)?;
92 if sdp.trim().is_empty() {
93 return Err(EnigmaRtcError::InvalidSdp);
94 }
95 {
96 let mut state = self
97 .state
98 .lock()
99 .map_err(|_| EnigmaRtcError::InvalidState)?;
100 *state = CallState::Connected;
101 }
102 self.emit_state(CallState::Connected)?;
103 self.emit_event(RtcEvent::ConnectionEstablished)?;
104 Ok(SignalingMessage::Answer { sdp })
105 }
106
107 pub fn accept_answer(&self, answer_sdp: &str) -> RtcResult<()> {
108 if answer_sdp.trim().is_empty() {
109 return Err(EnigmaRtcError::InvalidSdp);
110 }
111 {
112 let state = self
113 .state
114 .lock()
115 .map_err(|_| EnigmaRtcError::InvalidState)?;
116 if *state != CallState::WaitingAnswer {
117 return Err(EnigmaRtcError::InvalidState);
118 }
119 }
120 self.engine.apply_answer(answer_sdp)?;
121 {
122 let mut state = self
123 .state
124 .lock()
125 .map_err(|_| EnigmaRtcError::InvalidState)?;
126 *state = CallState::Connected;
127 }
128 self.emit_state(CallState::Connected)?;
129 self.emit_event(RtcEvent::ConnectionEstablished)
130 }
131
132 pub fn add_ice_candidate(&self, candidate_json: &str) -> RtcResult<()> {
133 {
134 let state = self
135 .state
136 .lock()
137 .map_err(|_| EnigmaRtcError::InvalidState)?;
138 match *state {
139 CallState::WaitingAnswer | CallState::IncomingOffer | CallState::Connected => {}
140 _ => return Err(EnigmaRtcError::InvalidState),
141 }
142 }
143 let message = decode_signaling(candidate_json)?;
144 match message {
145 SignalingMessage::IceCandidate { .. } => {
146 self.engine.add_remote_candidate(candidate_json)
147 }
148 _ => Err(EnigmaRtcError::InvalidCandidate),
149 }
150 }
151
152 pub fn hangup(&self, reason: Option<&str>) -> RtcResult<SignalingMessage> {
153 {
154 let mut state = self
155 .state
156 .lock()
157 .map_err(|_| EnigmaRtcError::InvalidState)?;
158 match *state {
159 CallState::Idle
160 | CallState::CreatingOffer
161 | CallState::WaitingAnswer
162 | CallState::IncomingOffer
163 | CallState::Connected => {
164 *state = CallState::Ending;
165 }
166 CallState::Ending | CallState::Ended => return Err(EnigmaRtcError::InvalidState),
167 }
168 }
169 self.emit_state(CallState::Ending)?;
170 {
171 let mut state = self
172 .state
173 .lock()
174 .map_err(|_| EnigmaRtcError::InvalidState)?;
175 *state = CallState::Ended;
176 }
177 self.emit_state(CallState::Ended)?;
178 self.emit_event(RtcEvent::ConnectionClosed)?;
179 Ok(SignalingMessage::Hangup {
180 reason: reason
181 .filter(|s| !s.trim().is_empty())
182 .map(|s| s.to_string()),
183 })
184 }
185
186 pub fn set_microphone_enabled(&self, enabled: bool) -> RtcResult<()> {
187 {
188 let state = self
189 .state
190 .lock()
191 .map_err(|_| EnigmaRtcError::InvalidState)?;
192 if matches!(*state, CallState::Ended) {
193 return Err(EnigmaRtcError::InvalidState);
194 }
195 }
196 self.engine.set_microphone_enabled(enabled)
197 }
198
199 pub fn set_camera_enabled(&self, enabled: bool) -> RtcResult<()> {
200 {
201 let state = self
202 .state
203 .lock()
204 .map_err(|_| EnigmaRtcError::InvalidState)?;
205 if matches!(*state, CallState::Ended) {
206 return Err(EnigmaRtcError::InvalidState);
207 }
208 }
209 self.engine.set_camera_enabled(enabled)
210 }
211
212 fn emit_state(&self, state: CallState) -> RtcResult<()> {
213 self.emit_event(RtcEvent::StateChanged(state))
214 }
215
216 fn emit_event(&self, event: RtcEvent) -> RtcResult<()> {
217 self.event_tx
218 .send(event)
219 .map_err(|_| EnigmaRtcError::ChannelClosed)
220 }
221
222 fn from_parts(
223 config: RtcConfig,
224 engine: Arc<dyn RtcEngine>,
225 event_tx: mpsc::UnboundedSender<RtcEvent>,
226 event_rx: mpsc::UnboundedReceiver<RtcEvent>,
227 ) -> Self {
228 Self {
229 state: Mutex::new(CallState::Idle),
230 engine,
231 event_tx,
232 event_rx: AsyncMutex::new(event_rx),
233 config,
234 }
235 }
236}
237
238#[async_trait]
239pub(crate) trait SessionEventReceiver: Send + Sync {
240 async fn recv_event(&self) -> RtcResult<RtcEvent>;
241}
242
243#[async_trait]
244impl SessionEventReceiver for CallSession {
245 async fn recv_event(&self) -> RtcResult<RtcEvent> {
246 let mut rx = self.event_rx.lock().await;
247 match rx.recv().await {
248 Some(event) => Ok(event),
249 None => Err(EnigmaRtcError::ChannelClosed),
250 }
251 }
252}
253
254impl CallSession {
255 pub async fn next_event(&self) -> RtcResult<RtcEvent> {
256 self.recv_event().await
257 }
258}