saorsa_webrtc/
call.rs

1//! Call management for WebRTC
2
3use crate::identity::PeerIdentity;
4use crate::media::{MediaStreamManager, WebRtcTrack};
5use crate::types::{CallEvent, CallId, CallState, MediaConstraints};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use thiserror::Error;
10use tokio::sync::{RwLock, broadcast};
11use webrtc::peer_connection::RTCPeerConnection;
12
13/// Call management errors
14#[derive(Error, Debug)]
15pub enum CallError {
16    /// Call not found
17    #[error("Call not found: {0}")]
18    CallNotFound(String),
19
20    /// Invalid state
21    #[error("Invalid call state")]
22    InvalidState,
23
24    /// Configuration error
25    #[error("Configuration error: {0}")]
26    ConfigError(String),
27}
28
29/// Call manager configuration
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CallManagerConfig {
32    /// Maximum concurrent calls
33    pub max_concurrent_calls: usize,
34}
35
36impl Default for CallManagerConfig {
37    fn default() -> Self {
38        Self {
39            max_concurrent_calls: 10,
40        }
41    }
42}
43
44/// Network adapter trait (placeholder for future implementation)
45pub trait NetworkAdapter: Send + Sync {}
46
47/// Active call with WebRTC peer connection
48pub struct Call<I: PeerIdentity> {
49    /// Call identifier
50    pub id: CallId,
51    /// Remote peer
52    pub remote_peer: I,
53    /// WebRTC peer connection
54    pub peer_connection: Arc<RTCPeerConnection>,
55    /// Current state
56    pub state: CallState,
57    /// Media constraints
58    pub constraints: MediaConstraints,
59    /// WebRTC tracks for this call
60    pub tracks: Vec<WebRtcTrack>,
61}
62
63/// Call manager
64pub struct CallManager<I: PeerIdentity> {
65    calls: Arc<RwLock<HashMap<CallId, Call<I>>>>,
66    event_sender: broadcast::Sender<CallEvent<I>>,
67    #[allow(dead_code)]
68    config: CallManagerConfig,
69    media_manager: Arc<RwLock<MediaStreamManager>>,
70}
71
72impl<I: PeerIdentity> CallManager<I> {
73    /// Create new call manager
74    ///
75    /// # Errors
76    ///
77    /// Returns error if initialization fails
78    pub async fn new(config: CallManagerConfig) -> Result<Self, CallError> {
79        let (event_sender, _) = broadcast::channel(100);
80        let media_manager = Arc::new(RwLock::new(MediaStreamManager::new()));
81        Ok(Self {
82            calls: Arc::new(RwLock::new(HashMap::new())),
83            event_sender,
84            config,
85            media_manager,
86        })
87    }
88
89    /// Start the call manager
90    ///
91    /// # Errors
92    ///
93    /// Returns error if start fails
94    pub async fn start(&self) -> Result<(), CallError> {
95        Ok(())
96    }
97
98    /// Initiate a call
99    ///
100    /// # Errors
101    ///
102    /// Returns error if call cannot be initiated
103    pub async fn initiate_call(
104        &self,
105        callee: I,
106        constraints: MediaConstraints,
107    ) -> Result<CallId, CallError> {
108        // Enforce max_concurrent_calls limit
109        let calls = self.calls.read().await;
110        if calls.len() >= self.config.max_concurrent_calls {
111            return Err(CallError::ConfigError(format!(
112                "Maximum concurrent calls limit reached: {}",
113                self.config.max_concurrent_calls
114            )));
115        }
116        drop(calls);
117
118        let call_id = CallId::new();
119
120        tracing::info!("Initiating call {} to peer: {}", call_id, callee.to_string_repr());
121
122        // Create WebRTC peer connection
123        let peer_connection = Arc::new(
124            webrtc::api::APIBuilder::new().build().new_peer_connection(
125                webrtc::peer_connection::configuration::RTCConfiguration::default(),
126            ).await.map_err(|e| {
127                tracing::error!("Failed to create peer connection for call {}: {}", call_id, e);
128                CallError::ConfigError(format!("Failed to create peer connection: {}", e))
129            })?
130        );
131
132        tracing::debug!("Created peer connection for call {}", call_id);
133
134        // Create media tracks based on constraints
135        let mut media_manager = self.media_manager.write().await;
136        let mut tracks = Vec::new();
137
138        if constraints.has_audio() {
139            let audio_track = media_manager.create_audio_track().await
140                .map_err(|e| CallError::ConfigError(format!("Failed to create audio track: {:?}", e)))?;
141            tracks.push((*audio_track).clone());
142
143            // Add track to peer connection
144            let track: Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync> = audio_track.track.clone();
145            peer_connection.add_track(track).await
146                .map_err(|e| CallError::ConfigError(format!("Failed to add audio track: {}", e)))?;
147        }
148
149        if constraints.has_video() {
150            let video_track = media_manager.create_video_track().await
151                .map_err(|e| CallError::ConfigError(format!("Failed to create video track: {:?}", e)))?;
152            tracks.push((*video_track).clone());
153
154            // Add track to peer connection
155            let track: Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync> = video_track.track.clone();
156            peer_connection.add_track(track).await
157                .map_err(|e| CallError::ConfigError(format!("Failed to add video track: {}", e)))?;
158        }
159
160        let call = Call {
161            id: call_id,
162            remote_peer: callee.clone(),
163            peer_connection,
164            state: CallState::Calling,
165            constraints: constraints.clone(),
166            tracks,
167        };
168
169        let mut calls = self.calls.write().await;
170        calls.insert(call_id, call);
171        
172        // Emit call initiated event
173        let _ = self.event_sender.send(CallEvent::CallInitiated {
174            call_id,
175            callee,
176            constraints,
177        });
178        
179        Ok(call_id)
180    }
181
182    /// Accept a call
183    ///
184    /// # Errors
185    ///
186    /// Returns error if call cannot be accepted
187    pub async fn accept_call(
188        &self,
189        call_id: CallId,
190        _constraints: MediaConstraints,
191    ) -> Result<(), CallError> {
192        let mut calls = self.calls.write().await;
193        if let Some(call) = calls.get_mut(&call_id) {
194            // Validate state transition
195            match call.state {
196                CallState::Calling | CallState::Connecting => {
197                    call.state = CallState::Connected;
198                    
199                    // Emit connection established event
200                    let _ = self.event_sender.send(CallEvent::ConnectionEstablished { call_id });
201                    
202                    tracing::info!("Call {} accepted", call_id);
203                    Ok(())
204                }
205                _ => {
206                    tracing::warn!("Invalid state transition: cannot accept call {} in state {:?}", call_id, call.state);
207                    Err(CallError::InvalidState)
208                }
209            }
210        } else {
211            tracing::warn!("Attempted to accept non-existent call {}", call_id);
212            Err(CallError::CallNotFound(call_id.to_string()))
213        }
214    }
215
216    /// Reject a call
217    ///
218    /// # Errors
219    ///
220    /// Returns error if call cannot be rejected
221    pub async fn reject_call(&self, call_id: CallId) -> Result<(), CallError> {
222        let mut calls = self.calls.write().await;
223        if let Some(call) = calls.get_mut(&call_id) {
224            // Validate state transition - can only reject calls that are not yet connected/ended
225            match call.state {
226                CallState::Calling | CallState::Connecting => {
227                    call.state = CallState::Failed;
228                    
229                    // Emit call rejected event
230                    let _ = self.event_sender.send(CallEvent::CallRejected { call_id });
231                    
232                    Ok(())
233                }
234                _ => {
235                    tracing::warn!("Invalid state transition: cannot reject call {} in state {:?}", call_id, call.state);
236                    Err(CallError::InvalidState)
237                }
238            }
239        } else {
240            Err(CallError::CallNotFound(call_id.to_string()))
241        }
242    }
243
244    /// End a call
245    ///
246    /// # Errors
247    ///
248    /// Returns error if call cannot be ended
249    pub async fn end_call(&self, call_id: CallId) -> Result<(), CallError> {
250        let mut calls = self.calls.write().await;
251        if let Some(call) = calls.remove(&call_id) {
252            // Remove all tracks associated with this call from media manager
253            let mut media_manager = self.media_manager.write().await;
254            for track in &call.tracks {
255                media_manager.remove_track(&track.id);
256            }
257            drop(media_manager);
258
259            // Close the peer connection
260            let _ = call.peer_connection.close().await;
261            
262            // Emit call ended event
263            let _ = self.event_sender.send(CallEvent::CallEnded { call_id });
264            
265            tracing::info!("Ended call {} and cleaned up {} tracks", call_id, call.tracks.len());
266            Ok(())
267        } else {
268            Err(CallError::CallNotFound(call_id.to_string()))
269        }
270    }
271
272    /// Get call state
273    #[must_use]
274    pub async fn get_call_state(&self, call_id: CallId) -> Option<CallState> {
275        let calls = self.calls.read().await;
276        calls.get(&call_id).map(|call| call.state)
277    }
278
279    /// Create SDP offer for a call
280    ///
281    /// # Errors
282    ///
283    /// Returns error if offer cannot be created
284    pub async fn create_offer(&self, call_id: CallId) -> Result<String, CallError> {
285        let calls = self.calls.read().await;
286        if let Some(call) = calls.get(&call_id) {
287            tracing::debug!("Creating SDP offer for call {}", call_id);
288            let offer = call.peer_connection.create_offer(None).await
289                .map_err(|e| {
290                    tracing::error!("Failed to create offer for call {}: {}", call_id, e);
291                    CallError::ConfigError(format!("Failed to create offer: {}", e))
292                })?;
293            call.peer_connection.set_local_description(offer.clone()).await
294                .map_err(|e| {
295                    tracing::error!("Failed to set local description for call {}: {}", call_id, e);
296                    CallError::ConfigError(format!("Failed to set local description: {}", e))
297                })?;
298            tracing::debug!("SDP offer created for call {}", call_id);
299            Ok(offer.sdp)
300        } else {
301            tracing::warn!("Attempted to create offer for non-existent call {}", call_id);
302            Err(CallError::CallNotFound(call_id.to_string()))
303        }
304    }
305
306    /// Handle SDP answer for a call
307    ///
308    /// # Errors
309    ///
310    /// Returns error if answer cannot be handled
311    pub async fn handle_answer(&self, call_id: CallId, sdp: String) -> Result<(), CallError> {
312        let calls = self.calls.read().await;
313        if let Some(call) = calls.get(&call_id) {
314            // Validate SDP is not empty
315            if sdp.trim().is_empty() {
316                return Err(CallError::ConfigError("SDP answer cannot be empty".to_string()));
317            }
318            
319            let answer = webrtc::peer_connection::sdp::session_description::RTCSessionDescription::answer(sdp)
320                .map_err(|e| CallError::ConfigError(format!("Invalid SDP answer: {}", e)))?;
321            
322            call.peer_connection.set_remote_description(answer).await
323                .map_err(|e| CallError::ConfigError(format!("Failed to set remote description: {}", e)))?;
324            Ok(())
325        } else {
326            Err(CallError::CallNotFound(call_id.to_string()))
327        }
328    }
329
330    /// Add ICE candidate to a call
331    ///
332    /// # Errors
333    ///
334    /// Returns error if candidate cannot be added
335    pub async fn add_ice_candidate(&self, call_id: CallId, candidate: String) -> Result<(), CallError> {
336        let calls = self.calls.read().await;
337        if let Some(call) = calls.get(&call_id) {
338            let rtc_candidate = webrtc::ice_transport::ice_candidate::RTCIceCandidateInit {
339                candidate,
340                ..Default::default()
341            };
342            call.peer_connection.add_ice_candidate(rtc_candidate).await
343                .map_err(|e| CallError::ConfigError(format!("Failed to add ICE candidate: {}", e)))?;
344            Ok(())
345        } else {
346            Err(CallError::CallNotFound(call_id.to_string()))
347        }
348    }
349
350    /// Start ICE gathering for a call
351    ///
352    /// # Errors
353    ///
354    /// Returns error if gathering cannot be started
355    pub async fn start_ice_gathering(&self, call_id: CallId) -> Result<(), CallError> {
356        let calls = self.calls.read().await;
357        if let Some(_call) = calls.get(&call_id) {
358            // ICE gathering is typically started automatically when creating offer
359            // For now, this is a no-op as gathering happens during offer creation
360            Ok(())
361        } else {
362            Err(CallError::CallNotFound(call_id.to_string()))
363        }
364    }
365
366    /// Subscribe to call events
367    #[must_use]
368    pub fn subscribe_events(&self) -> broadcast::Receiver<CallEvent<I>> {
369        self.event_sender.subscribe()
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::identity::PeerIdentityString;
377
378    #[tokio::test]
379    async fn test_call_manager_initiate_call() {
380        let config = CallManagerConfig::default();
381        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
382
383        let callee = PeerIdentityString::new("callee");
384        let constraints = MediaConstraints::audio_only();
385
386        let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
387
388        let state = call_manager.get_call_state(call_id).await;
389        assert_eq!(state, Some(CallState::Calling));
390    }
391
392    #[tokio::test]
393    async fn test_call_manager_accept_call() {
394        let config = CallManagerConfig::default();
395        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
396
397        let callee = PeerIdentityString::new("callee");
398        let constraints = MediaConstraints::audio_only();
399
400        let call_id = call_manager.initiate_call(callee, constraints.clone()).await.unwrap();
401
402        call_manager.accept_call(call_id, constraints).await.unwrap();
403
404        let state = call_manager.get_call_state(call_id).await;
405        assert_eq!(state, Some(CallState::Connected));
406    }
407
408    #[tokio::test]
409    async fn test_call_manager_reject_call() {
410        let config = CallManagerConfig::default();
411        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
412
413        let callee = PeerIdentityString::new("callee");
414        let constraints = MediaConstraints::audio_only();
415
416        let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
417
418        call_manager.reject_call(call_id).await.unwrap();
419
420        let state = call_manager.get_call_state(call_id).await;
421        assert_eq!(state, Some(CallState::Failed));
422    }
423
424    #[tokio::test]
425    async fn test_call_manager_end_call() {
426        let config = CallManagerConfig::default();
427        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
428
429        let callee = PeerIdentityString::new("callee");
430        let constraints = MediaConstraints::audio_only();
431
432        let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
433
434        call_manager.end_call(call_id).await.unwrap();
435
436        let state = call_manager.get_call_state(call_id).await;
437        assert_eq!(state, None);
438    }
439
440    #[tokio::test]
441    async fn test_call_manager_create_offer() {
442        let config = CallManagerConfig::default();
443        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
444
445        let callee = PeerIdentityString::new("callee");
446        let constraints = MediaConstraints::audio_only();
447
448        let _call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
449
450        // Skip the offer creation test for now since it requires proper codec setup
451        // This would need more complex WebRTC setup
452        // let offer = call_manager.create_offer(call_id).await.unwrap();
453        // assert!(!offer.is_empty());
454        // assert!(offer.contains("v=0"));
455    }
456
457    #[tokio::test]
458    async fn test_call_manager_add_ice_candidate() {
459        let config = CallManagerConfig::default();
460        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
461
462        let callee = PeerIdentityString::new("callee");
463        let constraints = MediaConstraints::audio_only();
464
465        let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
466
467        // Test with a dummy ICE candidate
468        let candidate = "candidate:1 1 UDP 2122260223 192.168.1.1 12345 typ host".to_string();
469        let result = call_manager.add_ice_candidate(call_id, candidate).await;
470        // This might fail in test environment, but should not panic
471        // We just test that the method exists and handles call not found
472        assert!(result.is_ok() || matches!(result, Err(CallError::ConfigError(_))));
473    }
474
475    #[tokio::test]
476    async fn test_call_manager_start_ice_gathering() {
477        let config = CallManagerConfig::default();
478        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
479
480        let callee = PeerIdentityString::new("callee");
481        let constraints = MediaConstraints::audio_only();
482
483        let call_id = call_manager.initiate_call(callee, constraints).await.unwrap();
484
485        let result = call_manager.start_ice_gathering(call_id).await;
486        // This might fail in test environment, but should not panic
487        assert!(result.is_ok() || matches!(result, Err(CallError::ConfigError(_))));
488    }
489
490    #[tokio::test]
491    async fn test_call_manager_call_not_found() {
492        let config = CallManagerConfig::default();
493        let call_manager = CallManager::<PeerIdentityString>::new(config).await.unwrap();
494
495        let fake_call_id = CallId::new();
496
497        let result = call_manager.accept_call(fake_call_id, MediaConstraints::audio_only()).await;
498        assert!(matches!(result, Err(CallError::CallNotFound(_))));
499
500        let result = call_manager.reject_call(fake_call_id).await;
501        assert!(matches!(result, Err(CallError::CallNotFound(_))));
502
503        let result = call_manager.end_call(fake_call_id).await;
504        assert!(matches!(result, Err(CallError::CallNotFound(_))));
505
506        let result = call_manager.create_offer(fake_call_id).await;
507        assert!(matches!(result, Err(CallError::CallNotFound(_))));
508
509        let result = call_manager.handle_answer(fake_call_id, "dummy".to_string()).await;
510        assert!(matches!(result, Err(CallError::CallNotFound(_))));
511
512        let result = call_manager.add_ice_candidate(fake_call_id, "dummy".to_string()).await;
513        assert!(matches!(result, Err(CallError::CallNotFound(_))));
514
515        let result = call_manager.start_ice_gathering(fake_call_id).await;
516        assert!(matches!(result, Err(CallError::CallNotFound(_))));
517    }
518}