use crate::identity::PeerIdentity;
use crate::link_transport::PeerConnection;
use crate::media::{GenericTrack, MediaStreamManager, WebRtcTrack};
use crate::quic_media_transport::{MediaTransportError, MediaTransportState, QuicMediaTransport};
use crate::types::{CallEvent, CallId, CallState, MediaCapabilities, MediaConstraints};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{broadcast, RwLock};
use webrtc::peer_connection::RTCPeerConnection;
#[derive(Error, Debug)]
pub enum CallError {
#[error("Call not found: {0}")]
CallNotFound(String),
#[error("Invalid call state")]
InvalidState,
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Transport error: {0}")]
TransportError(String),
}
impl From<MediaTransportError> for CallError {
fn from(err: MediaTransportError) -> Self {
CallError::TransportError(err.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CallManagerConfig {
pub max_concurrent_calls: usize,
}
impl Default for CallManagerConfig {
fn default() -> Self {
Self {
max_concurrent_calls: 10,
}
}
}
pub trait NetworkAdapter: Send + Sync {}
pub struct Call<I: PeerIdentity> {
pub id: CallId,
pub remote_peer: I,
pub peer_connection: Arc<RTCPeerConnection>,
pub media_transport: Option<Arc<QuicMediaTransport>>,
pub state: CallState,
pub constraints: MediaConstraints,
pub tracks: Vec<WebRtcTrack>,
pub quic_tracks: Vec<GenericTrack>,
}
impl<I: PeerIdentity> Call<I> {
#[must_use]
pub fn is_quic_call(&self) -> bool {
self.media_transport.is_some()
}
#[must_use]
pub fn get_quic_tracks(&self) -> &[GenericTrack] {
&self.quic_tracks
}
#[must_use]
pub fn get_quic_track_by_id(&self, id: &str) -> Option<&GenericTrack> {
self.quic_tracks.iter().find(|t| t.id() == id)
}
pub fn add_quic_track(&mut self, track: GenericTrack) {
tracing::info!(
call_id = %self.id,
track_id = %track.id(),
track_type = ?track.media_type(),
"Adding QUIC track to call"
);
self.quic_tracks.push(track);
}
pub fn remove_quic_track(&mut self, track_id: &str) -> bool {
if let Some(pos) = self.quic_tracks.iter().position(|t| t.id() == track_id) {
tracing::info!(
call_id = %self.id,
track_id = %track_id,
"Removing QUIC track from call"
);
self.quic_tracks.remove(pos);
true
} else {
false
}
}
#[must_use]
pub fn transport(&self) -> Option<&Arc<QuicMediaTransport>> {
self.media_transport.as_ref()
}
}
pub struct CallManager<I: PeerIdentity> {
calls: Arc<RwLock<HashMap<CallId, Call<I>>>>,
event_sender: broadcast::Sender<CallEvent<I>>,
#[allow(dead_code)]
config: CallManagerConfig,
media_manager: Arc<RwLock<MediaStreamManager>>,
}
impl<I: PeerIdentity> CallManager<I> {
pub async fn new(config: CallManagerConfig) -> Result<Self, CallError> {
let (event_sender, _) = broadcast::channel(100);
let media_manager = Arc::new(RwLock::new(MediaStreamManager::new()));
Ok(Self {
calls: Arc::new(RwLock::new(HashMap::new())),
event_sender,
config,
media_manager,
})
}
pub async fn start(&self) -> Result<(), CallError> {
Ok(())
}
pub async fn initiate_call(
&self,
callee: I,
constraints: MediaConstraints,
) -> Result<CallId, CallError> {
let calls = self.calls.read().await;
if calls.len() >= self.config.max_concurrent_calls {
return Err(CallError::ConfigError(format!(
"Maximum concurrent calls limit reached: {}",
self.config.max_concurrent_calls
)));
}
drop(calls);
let call_id = CallId::new();
tracing::info!(
"Initiating call {} to peer: {}",
call_id,
callee.to_string_repr()
);
let media_transport = Arc::new(QuicMediaTransport::new());
tracing::debug!("Created QuicMediaTransport for call {}", call_id);
let peer_connection = Arc::new(
webrtc::api::APIBuilder::new()
.build()
.new_peer_connection(
webrtc::peer_connection::configuration::RTCConfiguration::default(),
)
.await
.map_err(|e| {
tracing::error!(
"Failed to create peer connection for call {}: {}",
call_id,
e
);
CallError::ConfigError(format!("Failed to create peer connection: {}", e))
})?,
);
tracing::debug!("Created peer connection for call {}", call_id);
let mut media_manager = self.media_manager.write().await;
let mut tracks = Vec::new();
if constraints.has_audio() {
let audio_track = media_manager.create_audio_track().await.map_err(|e| {
CallError::ConfigError(format!("Failed to create audio track: {:?}", e))
})?;
tracks.push((*audio_track).clone());
let track: Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync> =
audio_track.track.clone();
peer_connection
.add_track(track)
.await
.map_err(|e| CallError::ConfigError(format!("Failed to add audio track: {}", e)))?;
}
if constraints.has_video() {
let video_track = media_manager.create_video_track().await.map_err(|e| {
CallError::ConfigError(format!("Failed to create video track: {:?}", e))
})?;
tracks.push((*video_track).clone());
let track: Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync> =
video_track.track.clone();
peer_connection
.add_track(track)
.await
.map_err(|e| CallError::ConfigError(format!("Failed to add video track: {}", e)))?;
}
let call = Call {
id: call_id,
remote_peer: callee.clone(),
peer_connection,
media_transport: Some(media_transport),
state: CallState::Calling,
constraints: constraints.clone(),
tracks,
quic_tracks: Vec::new(),
};
let mut calls = self.calls.write().await;
calls.insert(call_id, call);
let _ = self.event_sender.send(CallEvent::CallInitiated {
call_id,
callee,
constraints,
});
Ok(call_id)
}
pub async fn accept_call(
&self,
call_id: CallId,
_constraints: MediaConstraints,
) -> Result<(), CallError> {
let mut calls = self.calls.write().await;
if let Some(call) = calls.get_mut(&call_id) {
match call.state {
CallState::Calling | CallState::Connecting => {
let old_state = call.state;
call.state = CallState::Connected;
tracing::debug!(
call_id = %call_id,
old_state = ?old_state,
new_state = ?CallState::Connected,
"Call state transition"
);
let _ = self
.event_sender
.send(CallEvent::ConnectionEstablished { call_id });
tracing::info!("Call {} accepted", call_id);
Ok(())
}
_ => {
tracing::warn!(
"Invalid state transition: cannot accept call {} in state {:?}",
call_id,
call.state
);
Err(CallError::InvalidState)
}
}
} else {
tracing::warn!("Attempted to accept non-existent call {}", call_id);
Err(CallError::CallNotFound(call_id.to_string()))
}
}
pub async fn reject_call(&self, call_id: CallId) -> Result<(), CallError> {
let mut calls = self.calls.write().await;
if let Some(call) = calls.get_mut(&call_id) {
match call.state {
CallState::Calling | CallState::Connecting => {
let old_state = call.state;
call.state = CallState::Failed;
tracing::debug!(
call_id = %call_id,
old_state = ?old_state,
new_state = ?CallState::Failed,
"Call state transition"
);
let _ = self.event_sender.send(CallEvent::CallRejected { call_id });
Ok(())
}
_ => {
tracing::warn!(
"Invalid state transition: cannot reject call {} in state {:?}",
call_id,
call.state
);
Err(CallError::InvalidState)
}
}
} else {
Err(CallError::CallNotFound(call_id.to_string()))
}
}
pub async fn end_call(&self, call_id: CallId) -> Result<(), CallError> {
let mut calls = self.calls.write().await;
if let Some(call) = calls.remove(&call_id) {
let mut media_manager = self.media_manager.write().await;
for track in &call.tracks {
media_manager.remove_track(&track.id);
}
drop(media_manager);
if let Some(ref transport) = call.media_transport {
if let Err(e) = transport.disconnect().await {
tracing::warn!(
"Failed to disconnect QuicMediaTransport for call {}: {}",
call_id,
e
);
} else {
tracing::debug!("QuicMediaTransport disconnected for call {}", call_id);
}
}
let _ = call.peer_connection.close().await;
let _ = self.event_sender.send(CallEvent::CallEnded { call_id });
tracing::info!(
"Ended call {} and cleaned up {} tracks",
call_id,
call.tracks.len()
);
Ok(())
} else {
Err(CallError::CallNotFound(call_id.to_string()))
}
}
#[must_use]
pub async fn get_call_state(&self, call_id: CallId) -> Option<CallState> {
let calls = self.calls.read().await;
calls.get(&call_id).map(|call| call.state)
}
#[deprecated(
since = "0.3.0",
note = "Use QUIC-native call flow (exchange_capabilities) instead. SDP is only for legacy WebRTC calls."
)]
#[tracing::instrument(skip(self), fields(call_id = %call_id))]
pub async fn create_offer(&self, call_id: CallId) -> Result<String, CallError> {
let calls = self.calls.read().await;
if let Some(call) = calls.get(&call_id) {
tracing::debug!("Creating SDP offer");
let offer = call.peer_connection.create_offer(None).await.map_err(|e| {
tracing::error!("Failed to create offer: {}", e);
CallError::ConfigError(format!("Failed to create offer: {}", e))
})?;
call.peer_connection
.set_local_description(offer.clone())
.await
.map_err(|e| {
tracing::error!("Failed to set local description: {}", e);
CallError::ConfigError(format!("Failed to set local description: {}", e))
})?;
tracing::debug!("SDP offer created successfully");
Ok(offer.sdp)
} else {
tracing::warn!("Attempted to create offer for non-existent call");
Err(CallError::CallNotFound(call_id.to_string()))
}
}
#[deprecated(
since = "0.3.0",
note = "Use QUIC-native call flow (confirm_connection) instead. SDP is only for legacy WebRTC calls."
)]
#[tracing::instrument(skip(self, sdp), fields(call_id = %call_id, sdp_len = sdp.len()))]
pub async fn handle_answer(&self, call_id: CallId, sdp: String) -> Result<(), CallError> {
tracing::debug!("Processing SDP answer");
let calls = self.calls.read().await;
if let Some(call) = calls.get(&call_id) {
if sdp.trim().is_empty() {
return Err(CallError::ConfigError(
"SDP answer cannot be empty".to_string(),
));
}
let answer =
webrtc::peer_connection::sdp::session_description::RTCSessionDescription::answer(
sdp,
)
.map_err(|e| CallError::ConfigError(format!("Invalid SDP answer: {}", e)))?;
call.peer_connection
.set_remote_description(answer)
.await
.map_err(|e| {
CallError::ConfigError(format!("Failed to set remote description: {}", e))
})?;
tracing::debug!("SDP answer processed successfully");
Ok(())
} else {
Err(CallError::CallNotFound(call_id.to_string()))
}
}
#[deprecated(
since = "0.3.0",
note = "Use QUIC-native call flow (exchange_capabilities/confirm_connection) instead. ICE is only for legacy WebRTC calls."
)]
#[tracing::instrument(skip(self, candidate), fields(call_id = %call_id))]
pub async fn add_ice_candidate(
&self,
call_id: CallId,
candidate: String,
) -> Result<(), CallError> {
tracing::trace!("Adding ICE candidate (legacy WebRTC)");
let calls = self.calls.read().await;
if let Some(call) = calls.get(&call_id) {
let rtc_candidate = webrtc::ice_transport::ice_candidate::RTCIceCandidateInit {
candidate,
..Default::default()
};
call.peer_connection
.add_ice_candidate(rtc_candidate)
.await
.map_err(|e| {
CallError::ConfigError(format!("Failed to add ICE candidate: {}", e))
})?;
tracing::trace!("ICE candidate added successfully");
Ok(())
} else {
Err(CallError::CallNotFound(call_id.to_string()))
}
}
#[deprecated(
since = "0.3.0",
note = "Use QUIC-native call flow (exchange_capabilities/confirm_connection) instead. ICE is only for legacy WebRTC calls."
)]
pub async fn start_ice_gathering(&self, call_id: CallId) -> Result<(), CallError> {
let calls = self.calls.read().await;
if let Some(_call) = calls.get(&call_id) {
Ok(())
} else {
Err(CallError::CallNotFound(call_id.to_string()))
}
}
#[tracing::instrument(skip(self), fields(call_id = %call_id))]
pub async fn exchange_capabilities(
&self,
call_id: CallId,
) -> Result<MediaCapabilities, CallError> {
let mut calls = self.calls.write().await;
let call = calls
.get_mut(&call_id)
.ok_or_else(|| CallError::CallNotFound(call_id.to_string()))?;
match call.state {
CallState::Calling | CallState::Connecting => {
}
_ => {
tracing::warn!(
"Cannot exchange capabilities for call {} in state {:?}",
call_id,
call.state
);
return Err(CallError::InvalidState);
}
}
if call.state == CallState::Calling {
call.state = CallState::Connecting;
tracing::debug!(
call_id = %call_id,
"Call state transition: Calling -> Connecting"
);
}
let capabilities = MediaCapabilities::from_constraints(&call.constraints);
tracing::info!(
call_id = %call_id,
audio = capabilities.audio,
video = capabilities.video,
data_channel = capabilities.data_channel,
max_bandwidth = capabilities.max_bandwidth_kbps,
"Capabilities exchanged"
);
Ok(capabilities)
}
#[tracing::instrument(skip(self, peer_capabilities), fields(call_id = %call_id))]
pub async fn confirm_connection(
&self,
call_id: CallId,
peer_capabilities: MediaCapabilities,
) -> Result<(), CallError> {
let mut calls = self.calls.write().await;
let call = calls
.get_mut(&call_id)
.ok_or_else(|| CallError::CallNotFound(call_id.to_string()))?;
if call.state != CallState::Connecting {
tracing::warn!(
"Cannot confirm connection for call {} in state {:?}",
call_id,
call.state
);
return Err(CallError::InvalidState);
}
if let Err(e) = Self::validate_remote_capabilities(&call.constraints, &peer_capabilities) {
tracing::warn!(
call_id = %call_id,
peer_audio = peer_capabilities.audio,
peer_video = peer_capabilities.video,
required_audio = call.constraints.audio,
required_video = call.constraints.video,
error = %e,
"Peer capabilities do not satisfy call constraints"
);
return Err(e);
}
if let Some(ref transport) = call.media_transport {
let transport_state = transport.state().await;
if transport_state != MediaTransportState::Connected {
tracing::warn!(
call_id = %call_id,
transport_state = ?transport_state,
"Transport is not connected"
);
return Err(CallError::TransportError(
"Transport is not connected".to_string(),
));
}
} else {
return Err(CallError::ConfigError(
"Call has no media transport".to_string(),
));
}
call.state = CallState::Connected;
tracing::debug!(
call_id = %call_id,
"Call state transition: Connecting -> Connected"
);
let _ = self
.event_sender
.send(CallEvent::ConnectionEstablished { call_id });
tracing::info!(
call_id = %call_id,
peer_audio = peer_capabilities.audio,
peer_video = peer_capabilities.video,
"Connection confirmed"
);
Ok(())
}
pub fn validate_remote_capabilities(
constraints: &MediaConstraints,
remote_caps: &MediaCapabilities,
) -> Result<(), CallError> {
if constraints.audio && !remote_caps.audio {
return Err(CallError::ConfigError(
"Remote peer does not support audio".to_string(),
));
}
if (constraints.video || constraints.screen_share) && !remote_caps.video {
return Err(CallError::ConfigError(
"Remote peer does not support video".to_string(),
));
}
Ok(())
}
#[must_use]
pub fn subscribe_events(&self) -> broadcast::Receiver<CallEvent<I>> {
self.event_sender.subscribe()
}
#[must_use]
pub async fn has_media_transport(&self, call_id: CallId) -> bool {
let calls = self.calls.read().await;
calls
.get(&call_id)
.is_some_and(|call| call.media_transport.is_some())
}
pub async fn initiate_quic_call(
&self,
callee: I,
constraints: MediaConstraints,
peer: PeerConnection,
) -> Result<CallId, CallError> {
let calls = self.calls.read().await;
if calls.len() >= self.config.max_concurrent_calls {
return Err(CallError::ConfigError(format!(
"Maximum concurrent calls limit reached: {}",
self.config.max_concurrent_calls
)));
}
drop(calls);
let call_id = CallId::new();
tracing::info!(
"Initiating QUIC call {} to peer: {}",
call_id,
callee.to_string_repr()
);
let media_transport = Arc::new(QuicMediaTransport::new());
media_transport.connect(peer).await?;
tracing::debug!("QuicMediaTransport connected for call {}", call_id);
let peer_connection = Arc::new(
webrtc::api::APIBuilder::new()
.build()
.new_peer_connection(
webrtc::peer_connection::configuration::RTCConfiguration::default(),
)
.await
.map_err(|e| {
tracing::error!(
"Failed to create peer connection for call {}: {}",
call_id,
e
);
CallError::ConfigError(format!("Failed to create peer connection: {}", e))
})?,
);
let call = Call {
id: call_id,
remote_peer: callee.clone(),
peer_connection,
media_transport: Some(media_transport),
state: CallState::Connecting,
constraints: constraints.clone(),
tracks: Vec::new(), quic_tracks: Vec::new(), };
let mut calls = self.calls.write().await;
calls.insert(call_id, call);
let _ = self.event_sender.send(CallEvent::CallInitiated {
call_id,
callee,
constraints,
});
Ok(call_id)
}
pub async fn connect_quic_transport(
&self,
call_id: CallId,
peer: PeerConnection,
) -> Result<(), CallError> {
let calls = self.calls.read().await;
let call = calls
.get(&call_id)
.ok_or_else(|| CallError::CallNotFound(call_id.to_string()))?;
let transport = call
.media_transport
.as_ref()
.ok_or_else(|| CallError::ConfigError("Call has no media transport".to_string()))?;
tracing::debug!(
"Connecting QuicMediaTransport for call {} to peer {}",
call_id,
peer.peer_id
);
transport.connect(peer).await?;
tracing::info!("QuicMediaTransport connected for call {}", call_id);
Ok(())
}
pub async fn update_state_from_transport(
&self,
call_id: CallId,
) -> Result<CallState, CallError> {
let mut calls = self.calls.write().await;
let call = calls
.get_mut(&call_id)
.ok_or_else(|| CallError::CallNotFound(call_id.to_string()))?;
let transport = call
.media_transport
.as_ref()
.ok_or_else(|| CallError::ConfigError("Call has no media transport".to_string()))?;
let transport_state = transport.state().await;
let old_state = call.state;
let new_state = if old_state == CallState::Connected
&& transport_state == MediaTransportState::Disconnected
{
CallState::Ending
} else {
CallState::from_transport_state(transport_state)
};
if old_state != new_state {
call.state = new_state;
tracing::debug!(
call_id = %call_id,
old_state = ?old_state,
new_state = ?new_state,
transport_state = ?transport_state,
"Call state updated from transport"
);
match new_state {
CallState::Connected => {
let _ = self
.event_sender
.send(CallEvent::ConnectionEstablished { call_id });
}
CallState::Failed => {
let _ = self.event_sender.send(CallEvent::ConnectionFailed {
call_id,
error: "Transport failed".to_string(),
});
}
_ => {}
}
}
Ok(new_state)
}
#[must_use]
pub fn is_valid_quic_transition(from: CallState, to: CallState) -> bool {
matches!(
(from, to),
(CallState::Idle, CallState::Calling)
| (CallState::Idle, CallState::Connecting) | (CallState::Calling, CallState::Connecting)
| (CallState::Connecting, CallState::Connected)
| (CallState::Connected, CallState::Ending)
| (CallState::Ending, CallState::Idle)
| (CallState::Calling, CallState::Failed)
| (CallState::Connecting, CallState::Failed)
| (CallState::Connected, CallState::Failed)
| (CallState::Failed, CallState::Idle)
)
}
pub async fn fail_call(&self, call_id: CallId, reason: String) -> Result<(), CallError> {
let mut calls = self.calls.write().await;
let call = calls
.get_mut(&call_id)
.ok_or_else(|| CallError::CallNotFound(call_id.to_string()))?;
if !Self::is_valid_quic_transition(call.state, CallState::Failed) {
tracing::warn!(
call_id = %call_id,
current_state = ?call.state,
"Cannot transition to Failed from current state"
);
return Err(CallError::InvalidState);
}
let old_state = call.state;
call.state = CallState::Failed;
tracing::warn!(
call_id = %call_id,
old_state = ?old_state,
reason = %reason,
"Call failed"
);
let _ = self.event_sender.send(CallEvent::ConnectionFailed {
call_id,
error: reason,
});
Ok(())
}
pub async fn get_call_info(
&self,
call_id: CallId,
) -> Option<(CallState, MediaConstraints, bool)> {
let calls = self.calls.read().await;
calls.get(&call_id).map(|call| {
(
call.state,
call.constraints.clone(),
call.media_transport.is_some(),
)
})
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::identity::PeerIdentityString;
#[tokio::test]
async fn test_call_manager_initiate_call() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Calling));
}
#[tokio::test]
async fn test_call_manager_initiate_call_creates_media_transport() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
assert!(
call_manager.has_media_transport(call_id).await,
"New calls should have QuicMediaTransport initialized"
);
}
#[tokio::test]
async fn test_call_manager_accept_call() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints.clone())
.await
.unwrap();
call_manager
.accept_call(call_id, constraints)
.await
.unwrap();
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Connected));
}
#[tokio::test]
async fn test_call_manager_reject_call() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
call_manager.reject_call(call_id).await.unwrap();
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Failed));
}
#[tokio::test]
async fn test_call_manager_end_call() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
call_manager.end_call(call_id).await.unwrap();
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, None);
}
#[tokio::test]
#[allow(deprecated)]
async fn test_call_manager_create_offer_legacy() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let _call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
}
#[tokio::test]
#[allow(deprecated)]
async fn test_call_manager_add_ice_candidate_legacy() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let candidate = "candidate:1 1 UDP 2122260223 192.168.1.1 12345 typ host".to_string();
let result = call_manager.add_ice_candidate(call_id, candidate).await;
assert!(result.is_ok() || matches!(result, Err(CallError::ConfigError(_))));
}
#[tokio::test]
#[allow(deprecated)]
async fn test_call_manager_start_ice_gathering_legacy() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let result = call_manager.start_ice_gathering(call_id).await;
assert!(result.is_ok() || matches!(result, Err(CallError::ConfigError(_))));
}
#[tokio::test]
async fn test_call_manager_call_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let result = call_manager
.accept_call(fake_call_id, MediaConstraints::audio_only())
.await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
let result = call_manager.reject_call(fake_call_id).await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
let result = call_manager.end_call(fake_call_id).await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
#[allow(deprecated)]
let result = call_manager.create_offer(fake_call_id).await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
#[allow(deprecated)]
let result = call_manager
.handle_answer(fake_call_id, "dummy".to_string())
.await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
#[allow(deprecated)]
let result = call_manager
.add_ice_candidate(fake_call_id, "dummy".to_string())
.await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
#[allow(deprecated)]
let result = call_manager.start_ice_gathering(fake_call_id).await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
}
fn test_peer() -> crate::link_transport::PeerConnection {
crate::link_transport::PeerConnection {
peer_id: "test-peer".to_string(),
remote_addr: "127.0.0.1:9000".parse().unwrap(),
}
}
#[tokio::test]
async fn test_initiate_quic_call() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("quic-callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Connecting));
assert!(call_manager.has_media_transport(call_id).await);
}
#[tokio::test]
async fn test_initiate_quic_call_respects_max_concurrent() {
let config = CallManagerConfig {
max_concurrent_calls: 1,
};
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let _ = call_manager
.initiate_quic_call(callee.clone(), constraints.clone(), test_peer())
.await
.unwrap();
let result = call_manager
.initiate_quic_call(callee, constraints, test_peer())
.await;
assert!(matches!(result, Err(CallError::ConfigError(_))));
}
#[tokio::test]
async fn test_connect_quic_transport() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let peer = test_peer();
let result = call_manager.connect_quic_transport(call_id, peer).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_connect_quic_transport_call_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let peer = test_peer();
let result = call_manager
.connect_quic_transport(fake_call_id, peer)
.await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
}
#[tokio::test]
async fn test_end_call_with_quic_transport() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("quic-callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
assert!(call_manager.has_media_transport(call_id).await);
let result = call_manager.end_call(call_id).await;
assert!(result.is_ok());
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, None);
}
#[tokio::test]
async fn test_end_call_with_legacy_transport() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("legacy-callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let result = call_manager.end_call(call_id).await;
assert!(result.is_ok());
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, None);
}
#[tokio::test]
async fn test_update_state_from_transport() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let new_state = call_manager.update_state_from_transport(call_id).await;
assert!(new_state.is_ok());
assert_eq!(new_state.unwrap(), CallState::Connected);
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Connected));
}
#[tokio::test]
async fn test_update_state_from_transport_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let result = call_manager.update_state_from_transport(fake_call_id).await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
}
#[test]
fn test_valid_quic_transitions() {
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Idle,
CallState::Calling
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Idle,
CallState::Connecting
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Calling,
CallState::Connecting
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Connecting,
CallState::Connected
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Connected,
CallState::Ending
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Ending,
CallState::Idle
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Calling,
CallState::Failed
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Connecting,
CallState::Failed
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Connected,
CallState::Failed
));
assert!(CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Failed,
CallState::Idle
));
assert!(
!CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Idle,
CallState::Connected
)
);
assert!(
!CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Connected,
CallState::Calling
)
);
assert!(
!CallManager::<PeerIdentityString>::is_valid_quic_transition(
CallState::Ending,
CallState::Connected
)
);
}
#[tokio::test]
async fn test_exchange_capabilities() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::video_call();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let capabilities = call_manager.exchange_capabilities(call_id).await.unwrap();
assert!(capabilities.audio);
assert!(capabilities.video);
assert!(!capabilities.data_channel);
assert!(capabilities.max_bandwidth_kbps > 0);
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Connecting));
}
#[tokio::test]
async fn test_exchange_capabilities_audio_only() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let capabilities = call_manager.exchange_capabilities(call_id).await.unwrap();
assert!(capabilities.audio);
assert!(!capabilities.video);
assert_eq!(capabilities.max_bandwidth_kbps, 128);
}
#[tokio::test]
async fn test_exchange_capabilities_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let result = call_manager.exchange_capabilities(fake_call_id).await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
}
#[tokio::test]
async fn test_exchange_capabilities_invalid_state() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints.clone())
.await
.unwrap();
call_manager
.accept_call(call_id, constraints)
.await
.unwrap();
let result = call_manager.exchange_capabilities(call_id).await;
assert!(matches!(result, Err(CallError::InvalidState)));
}
#[tokio::test]
async fn test_confirm_connection() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let peer_caps = MediaCapabilities::audio_only();
let result = call_manager.confirm_connection(call_id, peer_caps).await;
assert!(result.is_ok());
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Connected));
}
#[tokio::test]
async fn test_confirm_connection_incompatible_caps() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::video_call(); let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let peer_caps = MediaCapabilities::audio_only();
let result = call_manager.confirm_connection(call_id, peer_caps).await;
assert!(matches!(result, Err(CallError::ConfigError(_))));
}
#[tokio::test]
async fn test_confirm_connection_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let peer_caps = MediaCapabilities::audio_only();
let result = call_manager
.confirm_connection(fake_call_id, peer_caps)
.await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
}
#[tokio::test]
async fn test_confirm_connection_invalid_state() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let call_id = call_manager
.initiate_call(callee, constraints)
.await
.unwrap();
let peer_caps = MediaCapabilities::audio_only();
let result = call_manager.confirm_connection(call_id, peer_caps).await;
assert!(matches!(result, Err(CallError::InvalidState)));
}
#[tokio::test]
async fn test_confirm_connection_emits_event() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let mut event_rx = call_manager.subscribe_events();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let _ = event_rx.try_recv();
let peer_caps = MediaCapabilities::audio_only();
call_manager
.confirm_connection(call_id, peer_caps)
.await
.unwrap();
let event = event_rx.try_recv();
assert!(event.is_ok());
match event {
Ok(CallEvent::ConnectionEstablished { call_id: eid }) => {
assert_eq!(eid, call_id);
}
other => {
unreachable!("Expected ConnectionEstablished event, got: {:?}", other);
}
}
}
#[test]
fn test_validate_remote_capabilities_audio_only() {
let constraints = MediaConstraints::audio_only();
let caps = MediaCapabilities::audio_only();
let result =
CallManager::<PeerIdentityString>::validate_remote_capabilities(&constraints, &caps);
assert!(result.is_ok());
}
#[test]
fn test_validate_remote_capabilities_video_call() {
let constraints = MediaConstraints::video_call();
let video_caps = MediaCapabilities::video();
let audio_caps = MediaCapabilities::audio_only();
let result = CallManager::<PeerIdentityString>::validate_remote_capabilities(
&constraints,
&video_caps,
);
assert!(result.is_ok());
let result = CallManager::<PeerIdentityString>::validate_remote_capabilities(
&constraints,
&audio_caps,
);
assert!(matches!(result, Err(CallError::ConfigError(_))));
}
#[test]
fn test_validate_remote_capabilities_missing_audio() {
let constraints = MediaConstraints::video_call(); let caps = MediaCapabilities {
audio: false,
video: true,
data_channel: false,
max_bandwidth_kbps: 2500,
};
let result =
CallManager::<PeerIdentityString>::validate_remote_capabilities(&constraints, &caps);
assert!(matches!(result, Err(CallError::ConfigError(_))));
}
#[test]
fn test_validate_remote_capabilities_screen_share() {
let constraints = MediaConstraints::screen_share();
let caps = MediaCapabilities::video();
let result =
CallManager::<PeerIdentityString>::validate_remote_capabilities(&constraints, &caps);
assert!(result.is_ok());
let audio_caps = MediaCapabilities::audio_only();
let result = CallManager::<PeerIdentityString>::validate_remote_capabilities(
&constraints,
&audio_caps,
);
assert!(matches!(result, Err(CallError::ConfigError(_))));
}
#[tokio::test]
async fn test_peer_identity_type_safety() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let mut event_rx = call_manager.subscribe_events();
let callee = PeerIdentityString::new("typed-callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee.clone(), constraints, peer)
.await
.unwrap();
let event = event_rx.try_recv();
assert!(event.is_ok());
match event {
Ok(CallEvent::CallInitiated {
callee: event_callee,
..
}) => {
assert_eq!(event_callee.to_string_repr(), callee.to_string_repr());
}
other => {
unreachable!("Expected CallInitiated event, got: {:?}", other);
}
}
call_manager.end_call(call_id).await.unwrap();
}
#[tokio::test]
async fn test_fail_call() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let mut event_rx = call_manager.subscribe_events();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let _ = event_rx.try_recv();
let result = call_manager
.fail_call(call_id, "Network error".to_string())
.await;
assert!(result.is_ok());
let state = call_manager.get_call_state(call_id).await;
assert_eq!(state, Some(CallState::Failed));
let event = event_rx.try_recv();
assert!(event.is_ok());
match event {
Ok(CallEvent::ConnectionFailed {
call_id: eid,
error,
}) => {
assert_eq!(eid, call_id);
assert_eq!(error, "Network error");
}
other => {
unreachable!("Expected ConnectionFailed event, got: {:?}", other);
}
}
}
#[tokio::test]
async fn test_fail_call_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let result = call_manager
.fail_call(fake_call_id, "Error".to_string())
.await;
assert!(matches!(result, Err(CallError::CallNotFound(_))));
}
#[tokio::test]
async fn test_fail_call_from_invalid_state() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints.clone(), peer)
.await
.unwrap();
call_manager
.fail_call(call_id, "Error 1".to_string())
.await
.unwrap();
let result = call_manager.fail_call(call_id, "Error 2".to_string()).await;
assert!(matches!(result, Err(CallError::InvalidState)));
}
#[tokio::test]
async fn test_get_call_info() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::video_call();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints.clone(), peer)
.await
.unwrap();
let info = call_manager.get_call_info(call_id).await;
assert!(info.is_some());
let (state, call_constraints, has_transport) = info.unwrap();
assert_eq!(state, CallState::Connecting);
assert_eq!(call_constraints.video, constraints.video);
assert!(has_transport);
}
#[tokio::test]
async fn test_get_call_info_not_found() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let fake_call_id = CallId::new();
let info = call_manager.get_call_info(fake_call_id).await;
assert!(info.is_none());
}
#[tokio::test]
async fn test_call_quic_tracks_initially_empty() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let calls = call_manager.calls.read().await;
let call = calls.get(&call_id).unwrap();
assert!(call.quic_tracks.is_empty());
assert!(call.is_quic_call());
}
#[tokio::test]
async fn test_call_add_quic_track() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let transport = Arc::new(QuicMediaTransport::new());
let audio = crate::media::AudioTrack::with_quic("test-audio", transport);
let generic = crate::media::GenericTrack::audio(audio);
{
let mut calls = call_manager.calls.write().await;
let call = calls.get_mut(&call_id).unwrap();
call.add_quic_track(generic);
}
let calls = call_manager.calls.read().await;
let call = calls.get(&call_id).unwrap();
assert_eq!(call.quic_tracks.len(), 1);
assert!(call.get_quic_track_by_id("test-audio").is_some());
}
#[tokio::test]
async fn test_call_remove_quic_track() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::video_call();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let transport = Arc::new(QuicMediaTransport::new());
let audio = crate::media::AudioTrack::with_quic("audio-1", Arc::clone(&transport));
let video = crate::media::VideoTrack::with_quic("video-1", transport, 1280, 720);
{
let mut calls = call_manager.calls.write().await;
let call = calls.get_mut(&call_id).unwrap();
call.add_quic_track(crate::media::GenericTrack::audio(audio));
call.add_quic_track(crate::media::GenericTrack::video(video));
}
{
let mut calls = call_manager.calls.write().await;
let call = calls.get_mut(&call_id).unwrap();
let removed = call.remove_quic_track("audio-1");
assert!(removed);
}
let calls = call_manager.calls.read().await;
let call = calls.get(&call_id).unwrap();
assert_eq!(call.quic_tracks.len(), 1);
assert!(call.get_quic_track_by_id("audio-1").is_none());
assert!(call.get_quic_track_by_id("video-1").is_some());
}
#[tokio::test]
async fn test_call_transport_accessor() {
let config = CallManagerConfig::default();
let call_manager = CallManager::<PeerIdentityString>::new(config)
.await
.unwrap();
let callee = PeerIdentityString::new("callee");
let constraints = MediaConstraints::audio_only();
let peer = test_peer();
let call_id = call_manager
.initiate_quic_call(callee, constraints, peer)
.await
.unwrap();
let calls = call_manager.calls.read().await;
let call = calls.get(&call_id).unwrap();
assert!(call.transport().is_some());
}
}