1use crate::identity::PeerIdentity;
4use crate::media::{MediaStreamManager, WebRtcTrack};
5use crate::types::{CallEvent, CallId, CallState, MediaConstraints};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use thiserror::Error;
10use tokio::sync::{RwLock, broadcast};
11use webrtc::peer_connection::RTCPeerConnection;
12
13#[derive(Error, Debug)]
15pub enum CallError {
16 #[error("Call not found: {0}")]
18 CallNotFound(String),
19
20 #[error("Invalid call state")]
22 InvalidState,
23
24 #[error("Configuration error: {0}")]
26 ConfigError(String),
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CallManagerConfig {
32 pub max_concurrent_calls: usize,
34}
35
36impl Default for CallManagerConfig {
37 fn default() -> Self {
38 Self {
39 max_concurrent_calls: 10,
40 }
41 }
42}
43
44pub trait NetworkAdapter: Send + Sync {}
46
47pub struct Call<I: PeerIdentity> {
49 pub id: CallId,
51 pub remote_peer: I,
53 pub peer_connection: Arc<RTCPeerConnection>,
55 pub state: CallState,
57 pub constraints: MediaConstraints,
59 pub tracks: Vec<WebRtcTrack>,
61}
62
63pub struct CallManager<I: PeerIdentity> {
65 calls: Arc<RwLock<HashMap<CallId, Call<I>>>>,
66 event_sender: broadcast::Sender<CallEvent<I>>,
67 #[allow(dead_code)]
68 config: CallManagerConfig,
69 media_manager: Arc<RwLock<MediaStreamManager>>,
70}
71
72impl<I: PeerIdentity> CallManager<I> {
73 pub async fn new(config: CallManagerConfig) -> Result<Self, CallError> {
79 let (event_sender, _) = broadcast::channel(100);
80 let media_manager = Arc::new(RwLock::new(MediaStreamManager::new()));
81 Ok(Self {
82 calls: Arc::new(RwLock::new(HashMap::new())),
83 event_sender,
84 config,
85 media_manager,
86 })
87 }
88
89 pub async fn start(&self) -> Result<(), CallError> {
95 Ok(())
96 }
97
98 pub async fn initiate_call(
104 &self,
105 callee: I,
106 constraints: MediaConstraints,
107 ) -> Result<CallId, CallError> {
108 let calls = self.calls.read().await;
110 if calls.len() >= self.config.max_concurrent_calls {
111 return Err(CallError::ConfigError(format!(
112 "Maximum concurrent calls limit reached: {}",
113 self.config.max_concurrent_calls
114 )));
115 }
116 drop(calls);
117
118 let call_id = CallId::new();
119
120 tracing::info!("Initiating call {} to peer: {}", call_id, callee.to_string_repr());
121
122 let peer_connection = Arc::new(
124 webrtc::api::APIBuilder::new().build().new_peer_connection(
125 webrtc::peer_connection::configuration::RTCConfiguration::default(),
126 ).await.map_err(|e| {
127 tracing::error!("Failed to create peer connection for call {}: {}", call_id, e);
128 CallError::ConfigError(format!("Failed to create peer connection: {}", e))
129 })?
130 );
131
132 tracing::debug!("Created peer connection for call {}", call_id);
133
134 let mut media_manager = self.media_manager.write().await;
136 let mut tracks = Vec::new();
137
138 if constraints.has_audio() {
139 let audio_track = media_manager.create_audio_track().await
140 .map_err(|e| CallError::ConfigError(format!("Failed to create audio track: {:?}", e)))?;
141 tracks.push((*audio_track).clone());
142
143 let track: Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync> = audio_track.track.clone();
145 peer_connection.add_track(track).await
146 .map_err(|e| CallError::ConfigError(format!("Failed to add audio track: {}", e)))?;
147 }
148
149 if constraints.has_video() {
150 let video_track = media_manager.create_video_track().await
151 .map_err(|e| CallError::ConfigError(format!("Failed to create video track: {:?}", e)))?;
152 tracks.push((*video_track).clone());
153
154 let track: Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync> = video_track.track.clone();
156 peer_connection.add_track(track).await
157 .map_err(|e| CallError::ConfigError(format!("Failed to add video track: {}", e)))?;
158 }
159
160 let call = Call {
161 id: call_id,
162 remote_peer: callee.clone(),
163 peer_connection,
164 state: CallState::Calling,
165 constraints: constraints.clone(),
166 tracks,
167 };
168
169 let mut calls = self.calls.write().await;
170 calls.insert(call_id, call);
171
172 let _ = self.event_sender.send(CallEvent::CallInitiated {
174 call_id,
175 callee,
176 constraints,
177 });
178
179 Ok(call_id)
180 }
181
182 pub async fn accept_call(
188 &self,
189 call_id: CallId,
190 _constraints: MediaConstraints,
191 ) -> Result<(), CallError> {
192 let mut calls = self.calls.write().await;
193 if let Some(call) = calls.get_mut(&call_id) {
194 match call.state {
196 CallState::Calling | CallState::Connecting => {
197 call.state = CallState::Connected;
198
199 let _ = self.event_sender.send(CallEvent::ConnectionEstablished { call_id });
201
202 tracing::info!("Call {} accepted", call_id);
203 Ok(())
204 }
205 _ => {
206 tracing::warn!("Invalid state transition: cannot accept call {} in state {:?}", call_id, call.state);
207 Err(CallError::InvalidState)
208 }
209 }
210 } else {
211 tracing::warn!("Attempted to accept non-existent call {}", call_id);
212 Err(CallError::CallNotFound(call_id.to_string()))
213 }
214 }
215
216 pub async fn reject_call(&self, call_id: CallId) -> Result<(), CallError> {
222 let mut calls = self.calls.write().await;
223 if let Some(call) = calls.get_mut(&call_id) {
224 match call.state {
226 CallState::Calling | CallState::Connecting => {
227 call.state = CallState::Failed;
228
229 let _ = self.event_sender.send(CallEvent::CallRejected { call_id });
231
232 Ok(())
233 }
234 _ => {
235 tracing::warn!("Invalid state transition: cannot reject call {} in state {:?}", call_id, call.state);
236 Err(CallError::InvalidState)
237 }
238 }
239 } else {
240 Err(CallError::CallNotFound(call_id.to_string()))
241 }
242 }
243
244 pub async fn end_call(&self, call_id: CallId) -> Result<(), CallError> {
250 let mut calls = self.calls.write().await;
251 if let Some(call) = calls.remove(&call_id) {
252 let mut media_manager = self.media_manager.write().await;
254 for track in &call.tracks {
255 media_manager.remove_track(&track.id);
256 }
257 drop(media_manager);
258
259 let _ = call.peer_connection.close().await;
261
262 let _ = self.event_sender.send(CallEvent::CallEnded { call_id });
264
265 tracing::info!("Ended call {} and cleaned up {} tracks", call_id, call.tracks.len());
266 Ok(())
267 } else {
268 Err(CallError::CallNotFound(call_id.to_string()))
269 }
270 }
271
272 #[must_use]
274 pub async fn get_call_state(&self, call_id: CallId) -> Option<CallState> {
275 let calls = self.calls.read().await;
276 calls.get(&call_id).map(|call| call.state)
277 }
278
279 pub async fn create_offer(&self, call_id: CallId) -> Result<String, CallError> {
285 let calls = self.calls.read().await;
286 if let Some(call) = calls.get(&call_id) {
287 tracing::debug!("Creating SDP offer for call {}", call_id);
288 let offer = call.peer_connection.create_offer(None).await
289 .map_err(|e| {
290 tracing::error!("Failed to create offer for call {}: {}", call_id, e);
291 CallError::ConfigError(format!("Failed to create offer: {}", e))
292 })?;
293 call.peer_connection.set_local_description(offer.clone()).await
294 .map_err(|e| {
295 tracing::error!("Failed to set local description for call {}: {}", call_id, e);
296 CallError::ConfigError(format!("Failed to set local description: {}", e))
297 })?;
298 tracing::debug!("SDP offer created for call {}", call_id);
299 Ok(offer.sdp)
300 } else {
301 tracing::warn!("Attempted to create offer for non-existent call {}", call_id);
302 Err(CallError::CallNotFound(call_id.to_string()))
303 }
304 }
305
306 pub async fn handle_answer(&self, call_id: CallId, sdp: String) -> Result<(), CallError> {
312 let calls = self.calls.read().await;
313 if let Some(call) = calls.get(&call_id) {
314 if sdp.trim().is_empty() {
316 return Err(CallError::ConfigError("SDP answer cannot be empty".to_string()));
317 }
318
319 let answer = webrtc::peer_connection::sdp::session_description::RTCSessionDescription::answer(sdp)
320 .map_err(|e| CallError::ConfigError(format!("Invalid SDP answer: {}", e)))?;
321
322 call.peer_connection.set_remote_description(answer).await
323 .map_err(|e| CallError::ConfigError(format!("Failed to set remote description: {}", e)))?;
324 Ok(())
325 } else {
326 Err(CallError::CallNotFound(call_id.to_string()))
327 }
328 }
329
330 pub async fn add_ice_candidate(&self, call_id: CallId, candidate: String) -> Result<(), CallError> {
336 let calls = self.calls.read().await;
337 if let Some(call) = calls.get(&call_id) {
338 let rtc_candidate = webrtc::ice_transport::ice_candidate::RTCIceCandidateInit {
339 candidate,
340 ..Default::default()
341 };
342 call.peer_connection.add_ice_candidate(rtc_candidate).await
343 .map_err(|e| CallError::ConfigError(format!("Failed to add ICE candidate: {}", e)))?;
344 Ok(())
345 } else {
346 Err(CallError::CallNotFound(call_id.to_string()))
347 }
348 }
349
350 pub async fn start_ice_gathering(&self, call_id: CallId) -> Result<(), CallError> {
356 let calls = self.calls.read().await;
357 if let Some(_call) = calls.get(&call_id) {
358 Ok(())
361 } else {
362 Err(CallError::CallNotFound(call_id.to_string()))
363 }
364 }
365
366 #[must_use]
368 pub fn subscribe_events(&self) -> broadcast::Receiver<CallEvent<I>> {
369 self.event_sender.subscribe()
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use crate::identity::PeerIdentityString;
377
378 #[tokio::test]
379 async fn test_call_manager_initiate_call() {
380 let config = CallManagerConfig::default();
381 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
382
383 let callee = PeerIdentityString::new("callee");
384 let constraints = MediaConstraints::audio_only();
385
386 let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
387
388 let state = call_manager.get_call_state(call_id).await;
389 assert_eq!(state, Some(CallState::Calling));
390 }
391
392 #[tokio::test]
393 async fn test_call_manager_accept_call() {
394 let config = CallManagerConfig::default();
395 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
396
397 let callee = PeerIdentityString::new("callee");
398 let constraints = MediaConstraints::audio_only();
399
400 let call_id = call_manager.initiate_call(callee, constraints.clone()).await.unwrap();
401
402 call_manager.accept_call(call_id, constraints).await.unwrap();
403
404 let state = call_manager.get_call_state(call_id).await;
405 assert_eq!(state, Some(CallState::Connected));
406 }
407
408 #[tokio::test]
409 async fn test_call_manager_reject_call() {
410 let config = CallManagerConfig::default();
411 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
412
413 let callee = PeerIdentityString::new("callee");
414 let constraints = MediaConstraints::audio_only();
415
416 let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
417
418 call_manager.reject_call(call_id).await.unwrap();
419
420 let state = call_manager.get_call_state(call_id).await;
421 assert_eq!(state, Some(CallState::Failed));
422 }
423
424 #[tokio::test]
425 async fn test_call_manager_end_call() {
426 let config = CallManagerConfig::default();
427 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
428
429 let callee = PeerIdentityString::new("callee");
430 let constraints = MediaConstraints::audio_only();
431
432 let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
433
434 call_manager.end_call(call_id).await.unwrap();
435
436 let state = call_manager.get_call_state(call_id).await;
437 assert_eq!(state, None);
438 }
439
440 #[tokio::test]
441 async fn test_call_manager_create_offer() {
442 let config = CallManagerConfig::default();
443 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
444
445 let callee = PeerIdentityString::new("callee");
446 let constraints = MediaConstraints::audio_only();
447
448 let _call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
449
450 }
456
457 #[tokio::test]
458 async fn test_call_manager_add_ice_candidate() {
459 let config = CallManagerConfig::default();
460 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
461
462 let callee = PeerIdentityString::new("callee");
463 let constraints = MediaConstraints::audio_only();
464
465 let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
466
467 let candidate = "candidate:1 1 UDP 2122260223 192.168.1.1 12345 typ host".to_string();
469 let result = call_manager.add_ice_candidate(call_id, candidate).await;
470 assert!(result.is_ok() || matches!(result, Err(CallError::ConfigError(_))));
473 }
474
475 #[tokio::test]
476 async fn test_call_manager_start_ice_gathering() {
477 let config = CallManagerConfig::default();
478 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
479
480 let callee = PeerIdentityString::new("callee");
481 let constraints = MediaConstraints::audio_only();
482
483 let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
484
485 let result = call_manager.start_ice_gathering(call_id).await;
486 assert!(result.is_ok() || matches!(result, Err(CallError::ConfigError(_))));
488 }
489
490 #[tokio::test]
491 async fn test_call_manager_call_not_found() {
492 let config = CallManagerConfig::default();
493 let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
494
495 let fake_call_id = CallId::new();
496
497 let result = call_manager.accept_call(fake_call_id, MediaConstraints::audio_only()).await;
498 assert!(matches!(result, Err(CallError::CallNotFound(_))));
499
500 let result = call_manager.reject_call(fake_call_id).await;
501 assert!(matches!(result, Err(CallError::CallNotFound(_))));
502
503 let result = call_manager.end_call(fake_call_id).await;
504 assert!(matches!(result, Err(CallError::CallNotFound(_))));
505
506 let result = call_manager.create_offer(fake_call_id).await;
507 assert!(matches!(result, Err(CallError::CallNotFound(_))));
508
509 let result = call_manager.handle_answer(fake_call_id, "dummy".to_string()).await;
510 assert!(matches!(result, Err(CallError::CallNotFound(_))));
511
512 let result = call_manager.add_ice_candidate(fake_call_id, "dummy".to_string()).await;
513 assert!(matches!(result, Err(CallError::CallNotFound(_))));
514
515 let result = call_manager.start_ice_gathering(fake_call_id).await;
516 assert!(matches!(result, Err(CallError::CallNotFound(_))));
517 }
518}