1use crate::media::depacketizer::{Depacketizer, DepacketizerFactory};
2use crate::media::track::{MediaStreamTrack, SampleStreamSource, SampleStreamTrack, sample_track};
3use crate::rtp::{
4 FirRequest, FullIntraRequest, GenericNack, PictureLossIndication, RtcpPacket, RtpPacket,
5};
6use crate::stats::{StatsReport, gather_once};
7use crate::stats_collector::StatsCollector;
8use crate::transports::dtls::{self, DtlsTransport};
9use crate::transports::get_local_ip;
10use crate::transports::ice::stun::random_u32;
11use crate::transports::ice::{IceCandidate, IceGathererState, IceTransport, conn::IceConn};
12use crate::transports::rtp::RtpTransport;
13use crate::transports::sctp::SctpTransport;
14use crate::{
15 Attribute, AudioCapability, Direction, MediaKind, MediaSection, Origin, RtcConfiguration,
16 RtcError, RtcResult, SdpType, SessionDescription, TransportMode, VideoCapability,
17};
18use base64::prelude::*;
19use std::collections::{HashMap, VecDeque};
20use std::net::{IpAddr, Ipv4Addr};
21use std::{
22 sync::{
23 Arc, Mutex,
24 atomic::{AtomicBool, AtomicU8, AtomicU16, AtomicU32, AtomicU64, Ordering},
25 },
26 time::{SystemTime, UNIX_EPOCH},
27};
28use tokio::sync::{broadcast, mpsc, watch};
29use tracing::{debug, trace};
30
31use async_trait::async_trait;
32use futures::stream::{FuturesUnordered, StreamExt};
33use std::future::Future;
34use std::pin::Pin;
35use std::sync::Weak;
36
37#[async_trait]
38pub trait RtpSenderInterceptor: Send + Sync {
39 async fn on_packet_sent(&self, _packet: &RtpPacket) {}
40 async fn on_rtcp_received(&self, _packet: &RtcpPacket, _transport: Arc<RtpTransport>) {}
41 fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
42 None
43 }
44}
45
46#[async_trait]
47pub trait RtpReceiverInterceptor: Send + Sync {
48 async fn on_packet_received(&self, _packet: &RtpPacket) -> Option<RtcpPacket> {
49 None
50 }
51 async fn on_rtcp_received(&self, _packet: &RtcpPacket, _transport: Arc<RtpTransport>) {}
52 fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
53 None
54 }
55}
56
57pub trait NackStats: Send + Sync {
58 fn get_nack_count(&self) -> u64;
59 fn get_recovered_count(&self) -> u64 {
60 0
61 }
62}
63
64pub struct DefaultRtpSenderNackHandler {
65 buffer: Mutex<VecDeque<RtpPacket>>,
66 max_size: usize,
67 pub nack_recv_count: AtomicU64,
68}
69
70pub struct DefaultRtpSenderBitrateHandler;
71
72impl DefaultRtpSenderBitrateHandler {
73 pub fn new() -> Self {
74 Self
75 }
76}
77
78#[async_trait]
79impl RtpSenderInterceptor for DefaultRtpSenderBitrateHandler {
80 async fn on_rtcp_received(&self, packet: &RtcpPacket, _transport: Arc<RtpTransport>) {
81 if let RtcpPacket::RemoteBitrateEstimate(remb) = packet {
82 debug!("Received REMB: {} bps", remb.bitrate_bps);
83 }
84 }
85}
86
87impl DefaultRtpSenderNackHandler {
88 pub fn new(max_size: usize) -> Self {
89 Self {
90 buffer: Mutex::new(VecDeque::with_capacity(max_size)),
91 max_size,
92 nack_recv_count: AtomicU64::new(0),
93 }
94 }
95}
96
97#[async_trait]
98impl RtpSenderInterceptor for DefaultRtpSenderNackHandler {
99 async fn on_packet_sent(&self, packet: &RtpPacket) {
100 let mut buffer = self.buffer.lock().unwrap();
101 buffer.push_back(packet.clone());
102 if buffer.len() > self.max_size {
103 buffer.pop_front();
104 }
105 }
106
107 async fn on_rtcp_received(&self, packet: &RtcpPacket, transport: Arc<RtpTransport>) {
108 if let RtcpPacket::GenericNack(nack) = packet {
109 debug!(
110 "NACK: received NACK for {} packets",
111 nack.lost_packets.len()
112 );
113 self.nack_recv_count
114 .fetch_add(nack.lost_packets.len() as u64, Ordering::Relaxed);
115
116 let to_resend = {
117 let buffer = self.buffer.lock().unwrap();
118 let mut packets = Vec::new();
119 for seq in &nack.lost_packets {
120 if let Some(packet) = buffer.iter().find(|p| p.header.sequence_number == *seq) {
121 packets.push(packet.clone());
122 }
123 }
124 packets
125 };
126
127 for packet in to_resend {
128 let seq_num = packet.header.sequence_number;
129 debug!("NACK: retransmitting packet seq={}", seq_num);
130 let _ = transport.send_rtp(&packet).await;
131 }
132 }
133 }
134
135 fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
136 Some(self)
137 }
138}
139
140impl NackStats for DefaultRtpSenderNackHandler {
141 fn get_nack_count(&self) -> u64 {
142 self.nack_recv_count.load(Ordering::Relaxed)
143 }
144}
145
146pub struct DefaultRtpReceiverNackHandler {
147 last_seq: AtomicU16,
148 last_ssrc: AtomicU32,
149 initialized: std::sync::atomic::AtomicBool,
150 pub nack_sent_count: AtomicU64,
151 pub nack_recovered_count: AtomicU64,
152}
153
154impl DefaultRtpReceiverNackHandler {
155 pub fn new() -> Self {
156 Self {
157 last_seq: AtomicU16::new(0),
158 last_ssrc: AtomicU32::new(0),
159 initialized: std::sync::atomic::AtomicBool::new(false),
160 nack_sent_count: AtomicU64::new(0),
161 nack_recovered_count: AtomicU64::new(0),
162 }
163 }
164}
165
166#[async_trait]
167impl RtpReceiverInterceptor for DefaultRtpReceiverNackHandler {
168 async fn on_packet_received(&self, packet: &RtpPacket) -> Option<RtcpPacket> {
169 let seq = packet.header.sequence_number;
170 let ssrc = packet.header.ssrc;
171
172 let last_ssrc = self.last_ssrc.load(Ordering::SeqCst);
174 if last_ssrc != 0 && last_ssrc != ssrc {
175 debug!(
176 "NACK: SSRC changed from {} to {}, resetting state",
177 last_ssrc, ssrc
178 );
179 self.last_ssrc.store(ssrc, Ordering::SeqCst);
180 self.last_seq.store(seq, Ordering::SeqCst);
181 return None; }
183
184 if !self.initialized.swap(true, Ordering::SeqCst) {
185 self.last_ssrc.store(ssrc, Ordering::SeqCst);
186 self.last_seq.store(seq, Ordering::SeqCst);
187 return None;
188 }
189
190 let last = self.last_seq.load(Ordering::SeqCst);
191 let diff = seq.wrapping_sub(last);
192
193 if diff > 1 && diff < 32768 {
194 let mut lost = Vec::new();
195 let mut s = last.wrapping_add(1);
196 while s != seq {
197 lost.push(s);
198 s = s.wrapping_add(1);
199 }
200 debug!(
201 "NACK: detected gap from {} to {}, lost {} packets",
202 last,
203 seq,
204 lost.len()
205 );
206 self.nack_sent_count
207 .fetch_add(lost.len() as u64, Ordering::Relaxed);
208 self.last_seq.store(seq, Ordering::SeqCst);
209 return Some(RtcpPacket::GenericNack(GenericNack {
210 sender_ssrc: 0, media_ssrc: packet.header.ssrc,
212 lost_packets: lost,
213 }));
214 }
215
216 if diff < 32768 {
217 self.last_seq.store(seq, Ordering::SeqCst);
218 } else if diff > 32768 {
219 debug!("NACK: received old packet seq={}, last={}", seq, last);
220 self.nack_recovered_count.fetch_add(1, Ordering::Relaxed);
221 }
222 None
223 }
224
225 fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
226 Some(self)
227 }
228}
229
230impl NackStats for DefaultRtpReceiverNackHandler {
231 fn get_nack_count(&self) -> u64 {
232 self.nack_sent_count.load(Ordering::Relaxed)
233 }
234
235 fn get_recovered_count(&self) -> u64 {
236 self.nack_recovered_count.load(Ordering::Relaxed)
237 }
238}
239
240enum ReceiverCommand {
241 AddTrack {
242 rid: Option<String>,
243 packet_rx: mpsc::Receiver<(crate::rtp::RtpPacket, std::net::SocketAddr)>,
244 feedback_rx:
245 std::sync::Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
246 source: std::sync::Arc<crate::media::track::SampleStreamSource>,
247 simulcast_ssrc: std::sync::Arc<std::sync::Mutex<Option<u32>>>,
248 },
249}
250
251enum LoopEvent {
252 Packet(
253 Option<(crate::rtp::RtpPacket, std::net::SocketAddr)>,
254 Option<String>,
255 mpsc::Receiver<(crate::rtp::RtpPacket, std::net::SocketAddr)>,
256 Box<dyn Depacketizer>,
257 ),
258 Feedback(Option<crate::media::track::FeedbackEvent>, Option<String>),
259}
260
261#[derive(Clone)]
262pub enum PeerConnectionEvent {
263 DataChannel(Arc<crate::transports::sctp::DataChannel>),
264 Track(Arc<RtpTransceiver>),
265}
266
267#[derive(Clone)]
268pub struct PeerConnection {
269 inner: Arc<PeerConnectionInner>,
270}
271
272struct PeerConnectionInner {
273 config: RtcConfiguration,
274 signaling_state: watch::Sender<SignalingState>,
275 _signaling_state_rx: watch::Receiver<SignalingState>,
276 peer_state: watch::Sender<PeerConnectionState>,
277 _peer_state_rx: watch::Receiver<PeerConnectionState>,
278 ice_connection_state: watch::Sender<IceConnectionState>,
279 _ice_connection_state_rx: watch::Receiver<IceConnectionState>,
280 ice_gathering_state: watch::Sender<IceGatheringState>,
281 _ice_gathering_state_rx: watch::Receiver<IceGatheringState>,
282 local_description: Mutex<Option<SessionDescription>>,
283 remote_description: Mutex<Option<SessionDescription>>,
284 transceivers: Mutex<Vec<Arc<RtpTransceiver>>>,
285 next_mid: AtomicU16,
286 ice_transport: IceTransport,
287 certificate: Arc<dtls::Certificate>,
288 dtls_fingerprint: String,
289 dtls_transport: Mutex<Option<Arc<DtlsTransport>>>,
290 rtp_transport: Mutex<Option<Arc<RtpTransport>>>,
291 sctp_transport: Mutex<Option<Arc<SctpTransport>>>,
292 data_channels: Arc<Mutex<Vec<std::sync::Weak<crate::transports::sctp::DataChannel>>>>,
293 event_tx: mpsc::UnboundedSender<PeerConnectionEvent>,
294 event_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<PeerConnectionEvent>>,
295 dtls_role: watch::Sender<Option<bool>>,
296 _dtls_role_rx: watch::Receiver<Option<bool>>,
297 stats_collector: Arc<StatsCollector>,
298 ssrc_generator: AtomicU32,
299}
300
301fn generate_sdes_key_params() -> String {
302 let mut key_salt = [0u8; 30];
303 rand::fill(&mut key_salt);
304 let encoded = BASE64_STANDARD.encode(&key_salt);
305 format!("inline:{}", encoded)
306}
307
308fn parse_sdes_key_params(params: &str) -> RtcResult<Vec<u8>> {
309 if !params.starts_with("inline:") {
310 return Err(RtcError::Internal("Unsupported key params".into()));
311 }
312 let key_salt_base64 = ¶ms[7..];
313 let key_salt_base64 = key_salt_base64.split('|').next().unwrap();
314 BASE64_STANDARD
315 .decode(key_salt_base64)
316 .map_err(|e| RtcError::Internal(format!("Invalid base64 key: {}", e)))
317}
318
319fn map_crypto_suite(suite: &str) -> RtcResult<crate::srtp::SrtpProfile> {
320 match suite {
321 "AES_CM_128_HMAC_SHA1_80" => Ok(crate::srtp::SrtpProfile::Aes128Sha1_80),
322 "AES_CM_128_HMAC_SHA1_32" => Ok(crate::srtp::SrtpProfile::Aes128Sha1_32),
323 "AEAD_AES_128_GCM" => Ok(crate::srtp::SrtpProfile::AeadAes128Gcm),
324 _ => Err(RtcError::Internal(format!(
325 "Unsupported crypto suite: {}",
326 suite
327 ))),
328 }
329}
330
331impl PeerConnection {
332 pub fn new(config: RtcConfiguration) -> Self {
333 let (ice_transport, ice_runner) = IceTransport::new(config.clone());
334 let certificate =
335 Arc::new(dtls::generate_certificate().expect("failed to generate certificate"));
336 let dtls_fingerprint = dtls::fingerprint(&certificate);
337
338 let (signaling_state_tx, signaling_state_rx) = watch::channel(SignalingState::Stable);
339 let (peer_state_tx, peer_state_rx) = watch::channel(PeerConnectionState::New);
340 let (ice_connection_state_tx, ice_connection_state_rx) =
341 watch::channel(IceConnectionState::New);
342 let (ice_gathering_state_tx, ice_gathering_state_rx) =
343 watch::channel(IceGatheringState::New);
344 let (dtls_role_tx, dtls_role_rx) = watch::channel(None);
345
346 let ssrc_generator = AtomicU32::new(config.ssrc_start);
347
348 let (event_tx, event_rx) = mpsc::unbounded_channel();
349
350 let inner = PeerConnectionInner {
351 config,
352 signaling_state: signaling_state_tx,
353 _signaling_state_rx: signaling_state_rx,
354 peer_state: peer_state_tx,
355 _peer_state_rx: peer_state_rx,
356 ice_connection_state: ice_connection_state_tx,
357 _ice_connection_state_rx: ice_connection_state_rx,
358 ice_gathering_state: ice_gathering_state_tx,
359 _ice_gathering_state_rx: ice_gathering_state_rx,
360 local_description: Mutex::new(None),
361 remote_description: Mutex::new(None),
362 transceivers: Mutex::new(Vec::new()),
363 next_mid: AtomicU16::new(0),
364 ice_transport,
365 certificate,
366 dtls_fingerprint,
367 dtls_transport: Mutex::new(None),
368 rtp_transport: Mutex::new(None),
369 sctp_transport: Mutex::new(None),
370 data_channels: Arc::new(Mutex::new(Vec::new())),
371 event_tx,
372 event_rx: tokio::sync::Mutex::new(event_rx),
373 dtls_role: dtls_role_tx,
374 _dtls_role_rx: dtls_role_rx.clone(),
375 stats_collector: Arc::new(StatsCollector::new()),
376 ssrc_generator,
377 };
378 let pc = Self {
379 inner: Arc::new(inner),
380 };
381
382 let inner_weak = Arc::downgrade(&pc.inner);
383 let ice_transport = pc.inner.ice_transport.clone();
384 let dtls_role_rx = dtls_role_rx;
385 let ice_connection_state_tx = pc.inner.ice_connection_state.clone();
386
387 let ice_transport_gathering = ice_transport.clone();
388 let ice_gathering_state_tx = pc.inner.ice_gathering_state.clone();
389 let inner_weak_gathering = inner_weak.clone();
390 tokio::spawn(async move {
391 let gathering_loop = run_gathering_loop(
392 ice_transport_gathering,
393 ice_gathering_state_tx,
394 inner_weak_gathering,
395 );
396
397 let dtls_loop = run_ice_dtls_loop(
398 ice_transport,
399 ice_connection_state_tx,
400 dtls_role_rx,
401 inner_weak,
402 );
403
404 tokio::join!(gathering_loop, dtls_loop, ice_runner);
405 });
406 pc
407 }
408
409 pub fn config(&self) -> &RtcConfiguration {
410 &self.inner.config
411 }
412
413 pub fn ice_transport(&self) -> IceTransport {
414 self.inner.ice_transport.clone()
415 }
416
417 pub fn add_transceiver(
418 &self,
419 kind: MediaKind,
420 direction: TransceiverDirection,
421 ) -> Arc<RtpTransceiver> {
422 let index = self
423 .inner
424 .transceivers
425 .lock()
426 .map(|list| list.len())
427 .unwrap_or(0);
428 let ssrc = 2000 + index as u32;
429 let mut builder = RtpReceiverBuilder::new(kind, ssrc)
430 .interceptor(self.inner.stats_collector.clone())
431 .depacketizer_factory(self.inner.config.depacketizer_strategy.factory.clone());
432
433 let nack_enabled = if let Some(caps) = &self.inner.config.media_capabilities {
434 match kind {
435 MediaKind::Audio => caps
436 .audio
437 .iter()
438 .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
439 MediaKind::Video => caps
440 .video
441 .iter()
442 .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
443 MediaKind::Application => false,
444 }
445 } else {
446 match kind {
447 MediaKind::Audio => AudioCapability::default()
448 .rtcp_fbs
449 .contains(&"nack".to_string()),
450 MediaKind::Video => VideoCapability::default()
451 .rtcp_fbs
452 .contains(&"nack".to_string()),
453 MediaKind::Application => false,
454 }
455 };
456
457 if nack_enabled {
458 builder = builder.nack();
459 }
460 let receiver = builder.build();
461
462 let transceiver = Arc::new(RtpTransceiver::new(kind, direction));
463 if direction.sends() {
464 let rand_val = random_u32();
465 let ssrc = self
466 .inner
467 .ssrc_generator
468 .fetch_add(1 + rand_val, Ordering::Relaxed);
469 *transceiver.sender_ssrc.lock().unwrap() = Some(ssrc);
470 *transceiver.sender_stream_id.lock().unwrap() = Some("default".to_string());
471 *transceiver.sender_track_id.lock().unwrap() =
472 Some(format!("track-{}", transceiver.id()));
473 }
474 *transceiver.receiver.lock().unwrap() = Some(receiver);
475
476 let mut list = self.inner.transceivers.lock().unwrap();
477 list.push(transceiver.clone());
478 transceiver
479 }
480
481 pub fn add_track(
482 &self,
483 track: Arc<dyn MediaStreamTrack>,
484 params: RtpCodecParameters,
485 ) -> RtcResult<Arc<RtpSender>> {
486 let stream_id = format!("{}", track.id());
487 self.add_track_with_stream_id(track, stream_id, params)
488 }
489
490 pub fn add_track_with_stream_id(
491 &self,
492 track: Arc<dyn MediaStreamTrack>,
493 stream_id: String,
494 params: RtpCodecParameters,
495 ) -> RtcResult<Arc<RtpSender>> {
496 let kind = match track.kind() {
497 crate::media::frame::MediaKind::Audio => MediaKind::Audio,
498 crate::media::frame::MediaKind::Video => MediaKind::Video,
499 };
500 let transceiver = self.add_transceiver(kind, TransceiverDirection::SendRecv);
501 let ssrc = transceiver
502 .sender_ssrc
503 .lock()
504 .unwrap()
505 .unwrap_or_else(|| self.inner.ssrc_generator.fetch_add(1, Ordering::Relaxed));
506
507 let mut builder = RtpSenderBuilder::new(track, ssrc)
508 .stream_id(stream_id)
509 .params(params)
510 .interceptor(self.inner.stats_collector.clone());
511
512 let nack_enabled = if let Some(caps) = &self.inner.config.media_capabilities {
513 match kind {
514 MediaKind::Audio => caps
515 .audio
516 .iter()
517 .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
518 MediaKind::Video => caps
519 .video
520 .iter()
521 .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
522 MediaKind::Application => false,
523 }
524 } else {
525 match kind {
526 MediaKind::Audio => AudioCapability::default()
527 .rtcp_fbs
528 .contains(&"nack".to_string()),
529 MediaKind::Video => VideoCapability::default()
530 .rtcp_fbs
531 .contains(&"nack".to_string()),
532 MediaKind::Application => false,
533 }
534 };
535
536 if nack_enabled {
537 builder = builder
538 .nack(self.inner.config.nack_buffer_size)
539 .bitrate_controller();
540 }
541
542 let sender = builder.build();
543
544 *transceiver.sender_ssrc.lock().unwrap() = Some(sender.ssrc());
546 *transceiver.sender_stream_id.lock().unwrap() = Some(sender.stream_id().to_string());
547 *transceiver.sender_track_id.lock().unwrap() = Some(sender.track_id().to_string());
548
549 if let Some(transport) = self.inner.rtp_transport.lock().unwrap().as_ref() {
551 sender.set_transport(transport.clone());
552 }
553
554 transceiver.set_sender(Some(sender.clone()));
555 Ok(sender)
556 }
557
558 pub fn get_transceivers(&self) -> Vec<Arc<RtpTransceiver>> {
559 self.inner.transceivers.lock().unwrap().clone()
560 }
561
562 pub async fn create_offer(&self) -> RtcResult<SessionDescription> {
563 let state = &self.inner.signaling_state;
564 if *state.borrow() != SignalingState::Stable {
565 return Err(RtcError::InvalidState(format!(
566 "cannot create offer while in state {:?}",
567 *state.borrow()
568 )));
569 }
570 let should_set_controlling = {
571 let local = self.inner.local_description.lock().unwrap();
572 let remote = self.inner.remote_description.lock().unwrap();
573 local.is_none() && remote.is_none()
574 };
575
576 if should_set_controlling {
577 self.inner
578 .ice_transport
579 .set_role(crate::transports::ice::IceRole::Controlling);
580 }
581 self.inner
582 .build_description(SdpType::Offer, |dir| dir)
583 .await
584 }
585
586 pub async fn create_answer(&self) -> RtcResult<SessionDescription> {
587 let state = &self.inner.signaling_state;
588 if *state.borrow() != SignalingState::HaveRemoteOffer {
589 return Err(RtcError::InvalidState(
590 "create_answer requires remote offer".into(),
591 ));
592 }
593 self.inner
594 .ice_transport
595 .set_role(crate::transports::ice::IceRole::Controlled);
596 self.inner
597 .build_description(SdpType::Answer, |dir| dir.answer_direction())
598 .await
599 }
600
601 pub fn set_local_description(&self, desc: SessionDescription) -> RtcResult<()> {
602 self.inner.validate_sdp_type(&desc.sdp_type)?;
603
604 if desc.sdp_type == SdpType::Offer {
608 let is_reinvite = {
609 let local = self.inner.local_description.lock().unwrap();
610 local.is_some()
611 };
612 if is_reinvite {
613 debug!("Offerer: extracting parameters from local reinvite offer");
614 let transceivers = self.inner.transceivers.lock().unwrap().clone();
616 for section in &desc.media_sections {
617 let mut matched_transceiver = transceivers
618 .iter()
619 .find(|t| t.mid().as_ref() == Some(§ion.mid))
620 .map(|t| t.clone());
621
622 if matched_transceiver.is_none() {
624 if let Some(t) = transceivers
625 .iter()
626 .find(|t| t.mid().is_none() && t.kind() == section.kind)
627 {
628 t.set_mid(section.mid.clone());
629 matched_transceiver = Some(t.clone());
630 }
631 }
632
633 if let Some(t) = matched_transceiver {
634 let payload_map = Self::extract_payload_map(section);
635 if !payload_map.is_empty() {
636 let _ = t.update_payload_map(payload_map);
637 }
638 let extmap = Self::extract_extmap(section);
639 let _ = t.update_extmap(extmap);
640 }
641 }
642 } else {
643 let transceivers = self.inner.transceivers.lock().unwrap().clone();
646 for section in &desc.media_sections {
647 if transceivers
648 .iter()
649 .any(|t| t.mid().as_ref() == Some(§ion.mid))
650 {
651 continue;
652 }
653 if let Some(t) = transceivers
655 .iter()
656 .find(|t| t.mid().is_none() && t.kind() == section.kind)
657 {
658 t.set_mid(section.mid.clone());
659 }
660 }
661 }
662 }
663
664 {
665 let state = &self.inner.signaling_state;
666 match desc.sdp_type {
667 SdpType::Offer => {
668 if *state.borrow() != SignalingState::Stable {
669 return Err(RtcError::InvalidState(
670 "set_local_description(offer) requires stable signaling state".into(),
671 ));
672 }
673 let _ = state.send(SignalingState::HaveLocalOffer);
674 }
675 SdpType::Answer => {
676 if *state.borrow() != SignalingState::HaveRemoteOffer {
677 return Err(RtcError::InvalidState(
678 "set_local_description(answer) requires remote offer".into(),
679 ));
680 }
681 let _ = state.send(SignalingState::Stable);
682 }
683 SdpType::Rollback | SdpType::Pranswer => {
684 return Err(RtcError::NotImplemented("pranswer/rollback"));
685 }
686 }
687 }
688 let mut local = self.inner.local_description.lock().unwrap();
689 *local = Some(desc);
690 Ok(())
691 }
692
693 pub async fn set_remote_description(&self, desc: SessionDescription) -> RtcResult<()> {
694 self.inner.validate_sdp_type(&desc.sdp_type)?;
695
696 let is_reinvite = {
698 let remote = self.inner.remote_description.lock().unwrap();
699 remote.is_some()
700 };
701
702 if is_reinvite {
703 let current_state = *self.inner.signaling_state.borrow();
705 match (desc.sdp_type, current_state) {
706 (SdpType::Offer, SignalingState::Stable) => {
708 debug!("Answerer: applying reinvite from offer");
709 self.handle_reinvite(&desc).await?;
710 }
711 (SdpType::Answer, SignalingState::HaveLocalOffer) => {
713 debug!("Offerer: applying reinvite from answer");
714 self.handle_reinvite(&desc).await?;
715 }
716 (SdpType::Offer, _) => {
718 return Err(RtcError::InvalidState(
719 "Cannot handle reinvite offer in non-stable state (glare?)".into(),
720 ));
721 }
722 _ => {}
723 }
724 }
725
726 for section in &desc.media_sections {
728 if let Ok(mid_val) = section.mid.parse::<u16>() {
729 self.inner.next_mid.fetch_max(mid_val + 1, Ordering::SeqCst);
730 }
731 }
732
733 {
734 let state = &self.inner.signaling_state;
735 match desc.sdp_type {
736 SdpType::Offer => {
737 if *state.borrow() != SignalingState::Stable {
738 return Err(RtcError::InvalidState(
739 "set_remote_description(offer) requires stable signaling state".into(),
740 ));
741 }
742 let _ = state.send(SignalingState::HaveRemoteOffer);
743 }
744 SdpType::Answer => {
745 if *state.borrow() != SignalingState::HaveLocalOffer {
746 return Err(RtcError::InvalidState(
747 "set_remote_description(answer) requires local offer".into(),
748 ));
749 }
750 let _ = state.send(SignalingState::Stable);
751 }
752 SdpType::Rollback | SdpType::Pranswer => {
753 return Err(RtcError::NotImplemented("pranswer/rollback"));
754 }
755 }
756 }
757
758 {
759 let current_role = *self.inner.dtls_role.borrow();
760 if current_role.is_none() {
761 let mut new_role = None;
762 if self.config().transport_mode == TransportMode::Rtp
763 || self.config().transport_mode == TransportMode::Srtp
764 {
765 new_role = Some(true);
766 } else {
767 for section in &desc.media_sections {
768 for attr in §ion.attributes {
769 if attr.key == "setup"
770 && let Some(val) = &attr.value
771 {
772 let is_client = match val.as_str() {
773 "active" => false,
774 "passive" => true,
775 "actpass" => false,
776 _ => true,
777 };
778 new_role = Some(is_client);
779 break;
780 }
781 }
782 if new_role.is_some() {
783 break;
784 }
785 }
786 }
787 if let Some(r) = new_role {
788 let _ = self.inner.dtls_role.send(Some(r));
789 }
790 }
791 }
792
793 let mut ufrag = None;
795 let mut pwd = None;
796 let mut candidates = Vec::new();
797 let mut remote_addr = None;
798
799 for attr in &desc.session.attributes {
801 if attr.key == "ice-ufrag" {
802 ufrag = attr.value.clone();
803 } else if attr.key == "ice-pwd" {
804 pwd = attr.value.clone();
805 }
806 }
807
808 for section in &desc.media_sections {
809 if self.config().transport_mode != TransportMode::WebRtc {
810 let conn_opt = section
811 .connection
812 .as_ref()
813 .or(desc.session.connection.as_ref());
814 if let Some(conn) = conn_opt {
815 let parts: Vec<&str> = conn.split_whitespace().collect();
816 if parts.len() >= 3
817 && parts[0] == "IN"
818 && parts[1] == "IP4"
819 && let Ok(ip) = parts[2].parse::<std::net::IpAddr>()
820 {
821 remote_addr = Some(std::net::SocketAddr::new(ip, section.port));
822 }
823 }
824 }
825
826 for attr in §ion.attributes {
827 if attr.key == "ice-ufrag" {
828 ufrag = attr.value.clone();
829 } else if attr.key == "ice-pwd" {
830 pwd = attr.value.clone();
831 } else if attr.key == "candidate"
832 && let Some(val) = &attr.value
833 && let Ok(c) = crate::transports::ice::IceCandidate::from_sdp(val)
834 {
835 candidates.push(c);
836 }
837 }
838 }
839
840 if self.config().transport_mode == TransportMode::WebRtc {
841 if let (Some(u), Some(p)) = (ufrag, pwd) {
842 let params = crate::transports::ice::IceParameters {
843 username_fragment: u,
844 password: p,
845 ice_lite: false,
846 tie_breaker: 0,
847 };
848 self.inner
849 .ice_transport
850 .start(params)
851 .map_err(|e| crate::RtcError::Internal(format!("ICE error: {}", e)))?;
852
853 for candidate in candidates {
854 self.inner.ice_transport.add_remote_candidate(candidate);
855 }
856 }
857 } else if let Some(addr) = remote_addr {
858 self.inner
859 .ice_transport
860 .start_direct(addr)
861 .await
862 .map_err(|e| crate::RtcError::Internal(format!("ICE direct error: {}", e)))?;
863 }
864
865 if desc.sdp_type == SdpType::Offer {
867 let mut transceivers = self.inner.transceivers.lock().unwrap();
868 for section in &desc.media_sections {
869 let mid = §ion.mid;
870 let mut found_transceiver = None;
871 let mut newly_matched = false;
872
873 for t in transceivers.iter() {
874 if let Some(t_mid) = t.mid()
875 && t_mid == *mid
876 {
877 found_transceiver = Some(t.clone());
878 break;
879 }
880 }
881
882 if found_transceiver.is_none() {
883 for t in transceivers.iter() {
885 if t.mid().is_none() && t.kind() == section.kind {
886 t.set_mid(mid.clone());
887 found_transceiver = Some(t.clone());
888 newly_matched = true;
889 break;
890 }
891 }
892 }
893
894 let mut ssrc = None;
895 let mut simulcast = None;
896 let mut rids = Vec::new();
897 let mut rid_ext_id = None;
898 let mut abs_send_time_ext_id = None;
899 let mut fid_group = None;
900 let mut rtx_ssrc = None;
901
902 for attr in §ion.attributes {
904 if attr.key == "ssrc-group"
905 && let Some(val) = &attr.value
906 && val.starts_with("FID")
907 {
908 let parts: Vec<&str> = val.split_whitespace().collect();
910 if parts.len() >= 3 {
911 if let Ok(primary) = parts[1].parse::<u32>() {
912 fid_group = Some(primary);
913 if let Ok(rtx) = parts[2].parse::<u32>() {
914 rtx_ssrc = Some(rtx);
915 }
916 }
917 }
918 }
919 }
920
921 for attr in §ion.attributes {
922 if attr.key == "ssrc" {
923 if let Some(val) = &attr.value
924 && let Some(ssrc_str) = val.split_whitespace().next()
925 && let Ok(parsed) = ssrc_str.parse::<u32>()
926 {
927 if let Some(primary) = fid_group {
929 if parsed == primary {
930 ssrc = Some(parsed);
931 }
932 } else if ssrc.is_none() {
933 ssrc = Some(parsed);
935 }
936 }
937 } else if attr.key == "simulcast"
938 && let Some(val) = &attr.value
939 {
940 simulcast = crate::sdp::Simulcast::parse(val);
941 } else if attr.key == "rid"
942 && let Some(val) = &attr.value
943 {
944 if let Some(rid) = crate::sdp::Rid::parse(val) {
945 rids.push(rid);
946 }
947 } else if attr.key == "extmap"
948 && let Some(val) = &attr.value
949 {
950 if val.contains("urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id") {
951 if let Some(id_str) = val.split_whitespace().next() {
952 if let Ok(id) = id_str.parse::<u8>() {
953 rid_ext_id = Some(id);
954 }
955 }
956 } else if val.contains(crate::sdp::ABS_SEND_TIME_URI) {
957 if let Some(id_str) = val.split_whitespace().next() {
958 if let Ok(id) = id_str.parse::<u8>() {
959 abs_send_time_ext_id = Some(id);
960 }
961 }
962 }
963 }
964 }
965
966 if let Some(id) = rid_ext_id {
967 if let Some(transport) = self.inner.rtp_transport.lock().unwrap().as_ref() {
968 transport.set_rid_extension_id(Some(id));
969 }
970 }
971
972 if let Some(id) = abs_send_time_ext_id {
973 if let Some(transport) = self.inner.rtp_transport.lock().unwrap().as_ref() {
974 transport.set_abs_send_time_extension_id(Some(id));
975 }
976 }
977
978 if let Some(t) = found_transceiver {
979 let payload_map = Self::extract_payload_map(section);
981 if !payload_map.is_empty() {
982 let _ = t.update_payload_map(payload_map);
983 }
984 let extmap = Self::extract_extmap(section);
985 let _ = t.update_extmap(extmap);
986 let direction: TransceiverDirection = section.direction.into();
987 t.set_direction(direction);
988
989 if let Some(ssrc_val) = ssrc {
990 if let Some(rx) = t.receiver.lock().unwrap().as_ref() {
991 rx.set_ssrc(ssrc_val);
992 if let Some(rtx) = rtx_ssrc {
993 rx.set_rtx_ssrc(rtx);
994 }
995
996 if let Some(sim) = &simulcast {
998 for rid_id in &sim.send {
1000 let _ = rx.add_simulcast_track(rid_id.clone());
1001 }
1002 }
1003 }
1004 }
1005
1006 if newly_matched {
1007 if ssrc.is_some() {
1008 if let Some(r) = t.receiver.lock().unwrap().as_ref() {
1009 r.track_event_sent.store(true, Ordering::SeqCst);
1010 }
1011 let _ = self.inner.event_tx.send(PeerConnectionEvent::Track(t));
1012 }
1013 }
1014 } else {
1015 let kind = section.kind;
1016 let direction = if kind == MediaKind::Application {
1017 TransceiverDirection::SendRecv
1018 } else {
1019 TransceiverDirection::RecvOnly
1020 };
1021 let t = Arc::new(RtpTransceiver::new(kind, direction));
1022 t.set_mid(mid.clone());
1023
1024 let receiver_ssrc = ssrc.unwrap_or_else(|| 2000 + transceivers.len() as u32);
1025
1026 let mut builder = RtpReceiverBuilder::new(kind, receiver_ssrc)
1027 .interceptor(self.inner.stats_collector.clone());
1028
1029 let nack_enabled = if let Some(caps) = &self.inner.config.media_capabilities {
1030 match kind {
1031 MediaKind::Audio => caps
1032 .audio
1033 .iter()
1034 .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
1035 MediaKind::Video => caps
1036 .video
1037 .iter()
1038 .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
1039 _ => false,
1040 }
1041 } else {
1042 match kind {
1043 MediaKind::Audio => AudioCapability::default()
1044 .rtcp_fbs
1045 .contains(&"nack".to_string()),
1046 MediaKind::Video => VideoCapability::default()
1047 .rtcp_fbs
1048 .contains(&"nack".to_string()),
1049 _ => false,
1050 }
1051 };
1052
1053 if nack_enabled {
1054 debug!("NACK: enabled for new receiver mid={}", mid);
1055 builder = builder.nack();
1056 } else {
1057 debug!("NACK: disabled for new receiver mid={}", mid);
1058 }
1059 let receiver = builder.build();
1060 if let Some(rtx) = rtx_ssrc {
1061 receiver.set_rtx_ssrc(rtx);
1062 }
1063
1064 {
1066 let transport_guard = self.inner.rtp_transport.lock().unwrap();
1067 if let Some(transport) = &*transport_guard {
1068 receiver.set_transport(
1069 transport.clone(),
1070 Some(self.inner.event_tx.clone()),
1071 Some(Arc::downgrade(&t)),
1072 );
1073 } else {
1074 debug!(
1075 "No existing transport to attach to new receiver mid={}",
1076 mid
1077 );
1078 }
1079 }
1080
1081 if let Some(sim) = &simulcast {
1083 for rid_id in &sim.send {
1084 let _ = receiver.add_simulcast_track(rid_id.clone());
1085 }
1086 }
1087
1088 *t.receiver.lock().unwrap() = Some(receiver);
1089
1090 transceivers.push(t.clone());
1091
1092 if ssrc.is_some() {
1093 if let Some(r) = t.receiver.lock().unwrap().as_ref() {
1094 r.track_event_sent.store(true, Ordering::SeqCst);
1095 }
1096 let _ = self.inner.event_tx.send(PeerConnectionEvent::Track(t));
1097 }
1098 }
1099 }
1100 } else if desc.sdp_type == SdpType::Answer {
1101 let transceivers = self.inner.transceivers.lock().unwrap();
1102 for section in &desc.media_sections {
1103 let mid = §ion.mid;
1104 let mut found_transceiver = None;
1105 for t in transceivers.iter() {
1106 if let Some(t_mid) = t.mid()
1107 && t_mid == *mid
1108 {
1109 found_transceiver = Some(t);
1110 break;
1111 }
1112 }
1113
1114 if let Some(t) = found_transceiver {
1115 let payload_map = Self::extract_payload_map(section);
1117 if !payload_map.is_empty() {
1118 let _ = t.update_payload_map(payload_map);
1119 }
1120 let extmap = Self::extract_extmap(section);
1121 let _ = t.update_extmap(extmap);
1122 let direction: TransceiverDirection = section.direction.into();
1123 t.set_direction(direction);
1124
1125 let mut ssrc = None;
1126 for attr in §ion.attributes {
1127 if attr.key == "ssrc"
1128 && ssrc.is_none()
1129 && let Some(val) = &attr.value
1130 && let Some(ssrc_str) = val.split_whitespace().next()
1131 && let Ok(parsed) = ssrc_str.parse::<u32>()
1132 {
1133 ssrc = Some(parsed);
1134 break;
1135 }
1136 }
1137
1138 if let Some(ssrc_val) = ssrc
1139 && let Some(rx) = t.receiver.lock().unwrap().as_ref()
1140 {
1141 rx.set_ssrc(ssrc_val);
1142 }
1143 }
1144 }
1145 }
1146
1147 let mut remote = self.inner.remote_description.lock().unwrap();
1148 *remote = Some(desc);
1149
1150 Ok(())
1151 }
1152
1153 pub(crate) async fn start_dtls(
1154 &self,
1155 is_client: bool,
1156 ) -> RtcResult<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>> {
1157 debug!("start_dtls: starting with is_client={}", is_client);
1158 let pair = self
1159 .inner
1160 .ice_transport
1161 .get_selected_pair()
1162 .await
1163 .ok_or(RtcError::Internal("No selected pair".into()))?;
1164
1165 let socket_rx = self.inner.ice_transport.subscribe_selected_socket();
1166
1167 let ice_conn = IceConn::new(socket_rx.clone(), pair.remote.address);
1169
1170 let mut pair_rx = self.inner.ice_transport.subscribe_selected_pair();
1172 let ice_conn_monitor = ice_conn.clone();
1173
1174 if self.config().transport_mode != TransportMode::WebRtc {
1175 let rtcp_addr = {
1176 let remote_desc = self.inner.remote_description.lock().unwrap();
1177 if let Some(desc) = &*remote_desc {
1178 if let Some(section) = desc.media_sections.first() {
1183 let has_mux = section.attributes.iter().any(|a| a.key == "rtcp-mux");
1184 if !has_mux {
1185 let mut addr = pair.remote.address;
1186 addr.set_port(addr.port() + 1);
1187 Some(addr)
1188 } else {
1189 None
1190 }
1191 } else {
1192 None
1193 }
1194 } else {
1195 None
1196 }
1197 };
1198
1199 if let Some(addr) = rtcp_addr {
1200 ice_conn.set_remote_rtcp_addr(Some(addr));
1201 debug!("RTCP-MUX not detected, setting RTCP address to {}", addr);
1202 }
1203 }
1204
1205 let srtp_required = self.config().transport_mode != TransportMode::Rtp;
1206 let allow_ssrc_change = self.config().enable_latching;
1207 let rtp_transport = Arc::new(RtpTransport::new_with_ssrc_change(
1208 ice_conn.clone(),
1209 srtp_required,
1210 allow_ssrc_change,
1211 ));
1212 {
1213 let mut rx = ice_conn.rtp_receiver.write().unwrap();
1214 *rx = Some(Arc::downgrade(&rtp_transport)
1215 as std::sync::Weak<dyn crate::transports::PacketReceiver>);
1216 }
1217 *self.inner.rtp_transport.lock().unwrap() = Some(rtp_transport.clone());
1218
1219 {
1220 let transceivers = self.inner.transceivers.lock().unwrap();
1221 for t in transceivers.iter() {
1222 t.set_rtp_transport(Arc::downgrade(&rtp_transport));
1224
1225 let receiver_arc = t.receiver.lock().unwrap().clone();
1226 if let Some(receiver) = &receiver_arc {
1227 receiver.set_transport(
1228 rtp_transport.clone(),
1229 Some(self.inner.event_tx.clone()),
1230 Some(Arc::downgrade(&t)),
1231 );
1232 }
1233 }
1234 }
1235
1236 self.inner
1237 .ice_transport
1238 .set_data_receiver(ice_conn.clone())
1239 .await;
1240
1241 if self.config().transport_mode == TransportMode::Srtp {
1242 self.setup_sdes(&rtp_transport)?;
1243 let rtcp_loop = Self::create_rtcp_loop(
1244 rtp_transport.clone(),
1245 Arc::downgrade(&self.inner),
1246 self.inner.stats_collector.clone(),
1247 );
1248 let pair_monitor = Self::create_pair_monitor(pair_rx.clone(), ice_conn_monitor.clone());
1249 let combined_loop = async move {
1250 tokio::select! {
1251 _ = rtcp_loop => {},
1252 _ = pair_monitor => {},
1253 }
1254 };
1255 return Ok(Box::pin(combined_loop) as Pin<Box<dyn Future<Output = ()> + Send>>);
1256 }
1257
1258 if self.config().transport_mode == TransportMode::Rtp {
1259 let rtcp_loop = Self::create_rtcp_loop(
1260 rtp_transport.clone(),
1261 Arc::downgrade(&self.inner),
1262 self.inner.stats_collector.clone(),
1263 );
1264
1265 let transceivers = self.inner.transceivers.lock().unwrap();
1266 for t in transceivers.iter() {
1267 let sender_arc = t.sender.lock().unwrap().clone();
1268 let receiver_arc = t.receiver.lock().unwrap().clone();
1269
1270 if let Some(sender) = &sender_arc {
1272 let mid_opt = t.mid();
1273 trace!(
1274 "start_dtls: transceiver kind={:?} mid={:?}",
1275 t.kind(),
1276 mid_opt
1277 );
1278 sender.set_transport(rtp_transport.clone());
1279 }
1280
1281 if let Some(receiver) = &receiver_arc {
1283 if let Some(sender) = &sender_arc {
1284 receiver.set_feedback_ssrc(sender.ssrc());
1285 }
1286 }
1287 }
1288 let pair_monitor = Self::create_pair_monitor(pair_rx.clone(), ice_conn_monitor.clone());
1289 let combined_loop = async move {
1290 tokio::select! {
1291 _ = rtcp_loop => {},
1292 _ = pair_monitor => {},
1293 }
1294 };
1295 return Ok(Box::pin(combined_loop) as Pin<Box<dyn Future<Output = ()> + Send>>);
1296 }
1297
1298 let (dtls, incoming_data_rx, dtls_runner) = DtlsTransport::new(
1299 ice_conn,
1300 self.inner.certificate.as_ref().clone(),
1301 is_client,
1302 self.config().dtls_buffer_size,
1303 )
1304 .await
1305 .map_err(|e| RtcError::Internal(format!("DTLS failed: {}", e)))?;
1306
1307 let sctp_port = if let Some(caps) = &self.config().media_capabilities {
1308 if let Some(app) = &caps.application {
1309 app.sctp_port
1310 } else {
1311 5000
1312 }
1313 } else {
1314 5000
1315 };
1316
1317 let sctp_needed = {
1318 let remote = self.inner.remote_description.lock().unwrap();
1319 if let Some(desc) = &*remote {
1320 desc.media_sections
1321 .iter()
1322 .any(|m| m.kind == MediaKind::Application)
1323 } else {
1324 false
1325 }
1326 };
1327
1328 let (dc_tx, mut dc_rx) = mpsc::unbounded_channel();
1329
1330 let mut sctp_runner: Pin<Box<dyn Future<Output = ()> + Send>>;
1331
1332 if sctp_needed {
1333 let (sctp, runner) = SctpTransport::new(
1334 dtls.clone(),
1335 incoming_data_rx,
1336 self.inner.data_channels.clone(),
1337 sctp_port,
1338 sctp_port,
1339 Some(dc_tx),
1340 is_client,
1341 self.config(),
1342 );
1343 *self.inner.sctp_transport.lock().unwrap() = Some(sctp);
1344 sctp_runner = Box::pin(runner);
1345 } else {
1346 drop(incoming_data_rx);
1347 sctp_runner = Box::pin(std::future::pending());
1348 }
1349
1350 *self.inner.dtls_transport.lock().unwrap() = Some(dtls.clone());
1351
1352 let dtls_clone = dtls.clone();
1353 let rtp_transport_clone = rtp_transport.clone();
1354 let inner_weak = Arc::downgrade(&self.inner);
1355 let stats_collector = self.inner.stats_collector.clone();
1356
1357 let mut dtls_runner: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(dtls_runner);
1358
1359 let inner_weak_dc = inner_weak.clone();
1360 let dc_listener = async move {
1361 while let Some(dc) = dc_rx.recv().await {
1362 if let Some(inner) = inner_weak_dc.upgrade() {
1363 let _ = inner.event_tx.send(PeerConnectionEvent::DataChannel(dc));
1364 } else {
1365 break;
1366 }
1367 }
1368 };
1369 let mut dc_listener: Pin<Box<dyn Future<Output = ()> + Send>> = if sctp_needed {
1370 Box::pin(dc_listener)
1371 } else {
1372 Box::pin(std::future::pending())
1373 };
1374
1375 let mut state_rx = dtls_clone.subscribe_state();
1376 loop {
1377 let state = state_rx.borrow().clone();
1378 match state {
1379 crate::transports::dtls::DtlsState::Connected(_, profile_opt) => {
1380 self.setup_srtp(&dtls_clone, is_client, profile_opt, &rtp_transport_clone);
1381
1382 let rtcp_loop = Self::create_rtcp_loop(
1383 rtp_transport_clone.clone(),
1384 inner_weak.clone(),
1385 stats_collector.clone(),
1386 );
1387
1388 let pair_monitor =
1389 Self::create_pair_monitor(pair_rx.clone(), ice_conn_monitor.clone());
1390
1391 let combined: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
1392 tokio::select! {
1393 _ = rtcp_loop => {},
1394 _ = dtls_runner => {},
1395 _ = sctp_runner => {},
1396 _ = dc_listener => {},
1397 _ = pair_monitor => {},
1398 }
1399 });
1400 return Ok(combined);
1401 }
1402 crate::transports::dtls::DtlsState::Failed => {
1403 return Err(RtcError::Internal("DTLS handshake failed".into()));
1404 }
1405 _ => {}
1406 }
1407
1408 tokio::select! {
1409 _ = &mut dtls_runner => {
1410 return Err(RtcError::Internal("DTLS runner stopped unexpectedly".into()));
1411 }
1412 _ = &mut sctp_runner => {
1413 return Err(RtcError::Internal("SCTP runner stopped unexpectedly".into()));
1414 }
1415 _ = &mut dc_listener => {
1416 debug!("DataChannel listener stopped unexpectedly");
1417 return Err(RtcError::Internal("DataChannel listener stopped unexpectedly".into()));
1418 }
1419 res = state_rx.changed() => {
1420 if res.is_err() { break; }
1421 }
1422 res = pair_rx.changed() => {
1423 if res.is_ok() {
1424 if let Some(pair) = pair_rx.borrow().clone() {
1425 if let Ok(mut addr) = ice_conn_monitor.remote_addr.write() {
1426 *addr = pair.remote.address;
1427 }
1428 }
1429 }
1430 }
1431 }
1432 }
1433
1434 Ok(Box::pin(async {}) as Pin<Box<dyn Future<Output = ()> + Send>>)
1435 }
1436
1437 fn setup_sdes(&self, rtp_transport: &Arc<RtpTransport>) -> RtcResult<()> {
1438 let (tx_keying, rx_keying, profile) = {
1439 let remote_desc = self.inner.remote_description.lock().unwrap();
1440 let local_desc = self.inner.local_description.lock().unwrap();
1441
1442 let remote_crypto = remote_desc
1443 .as_ref()
1444 .and_then(|d| d.media_sections.first())
1445 .and_then(|m| m.get_crypto_attributes().into_iter().next());
1446
1447 let local_crypto = local_desc
1448 .as_ref()
1449 .and_then(|d| d.media_sections.first())
1450 .and_then(|m| m.get_crypto_attributes().into_iter().next());
1451
1452 if let (Some(remote), Some(local)) = (remote_crypto, local_crypto) {
1453 let profile = map_crypto_suite(&remote.crypto_suite)?;
1454 if profile != map_crypto_suite(&local.crypto_suite)? {
1455 return Err(RtcError::Internal("Crypto suite mismatch".into()));
1456 }
1457
1458 let rx_key_salt = parse_sdes_key_params(&remote.key_params)?;
1459 let tx_key_salt = parse_sdes_key_params(&local.key_params)?;
1460
1461 let (key_len, salt_len) = match profile {
1462 crate::srtp::SrtpProfile::Aes128Sha1_80
1463 | crate::srtp::SrtpProfile::Aes128Sha1_32 => (16, 14),
1464 crate::srtp::SrtpProfile::AeadAes128Gcm => (16, 12),
1465 _ => (16, 14),
1466 };
1467
1468 if rx_key_salt.len() < key_len + salt_len || tx_key_salt.len() < key_len + salt_len
1469 {
1470 return Err(RtcError::Internal("Invalid key length".into()));
1471 }
1472
1473 let rx_keying = crate::srtp::SrtpKeyingMaterial::new(
1474 rx_key_salt[..key_len].to_vec(),
1475 rx_key_salt[key_len..key_len + salt_len].to_vec(),
1476 );
1477 let tx_keying = crate::srtp::SrtpKeyingMaterial::new(
1478 tx_key_salt[..key_len].to_vec(),
1479 tx_key_salt[key_len..key_len + salt_len].to_vec(),
1480 );
1481
1482 (tx_keying, rx_keying, profile)
1483 } else {
1484 return Err(RtcError::Internal(
1485 "Missing crypto attributes for SDES".into(),
1486 ));
1487 }
1488 };
1489
1490 let session = crate::srtp::SrtpSession::new(profile, tx_keying, rx_keying)
1491 .map_err(|e| RtcError::Internal(format!("SRTP error: {}", e)))?;
1492
1493 rtp_transport.start_srtp(session);
1494
1495 let transceivers = self.inner.transceivers.lock().unwrap();
1496 for t in transceivers.iter() {
1497 let sender_arc = t.sender.lock().unwrap().clone();
1498 let receiver_arc = t.receiver.lock().unwrap().clone();
1499
1500 if let Some(sender) = &sender_arc {
1501 sender.set_transport(rtp_transport.clone());
1502 }
1503
1504 if let Some(receiver) = &receiver_arc {
1505 receiver.set_transport(
1506 rtp_transport.clone(),
1507 Some(self.inner.event_tx.clone()),
1508 Some(Arc::downgrade(&t)),
1509 );
1510 if let Some(sender) = &sender_arc {
1511 receiver.set_feedback_ssrc(sender.ssrc());
1512 }
1513 }
1514 }
1515
1516 *self.inner.rtp_transport.lock().unwrap() = Some(rtp_transport.clone());
1517 Ok(())
1518 }
1519
1520 fn setup_srtp(
1521 &self,
1522 dtls: &DtlsTransport,
1523 is_client: bool,
1524 profile_opt: Option<u16>,
1525 rtp_transport: &Arc<RtpTransport>,
1526 ) {
1527 let profile = match profile_opt {
1529 Some(0x0001) => crate::srtp::SrtpProfile::Aes128Sha1_80,
1530 Some(0x0002) => crate::srtp::SrtpProfile::Aes128Sha1_32,
1531 Some(0x0007) => crate::srtp::SrtpProfile::AeadAes128Gcm,
1532 _ => crate::srtp::SrtpProfile::Aes128Sha1_80,
1533 };
1534
1535 let key_len = match profile {
1536 crate::srtp::SrtpProfile::AeadAes128Gcm => 16,
1537 _ => 16,
1538 };
1539 let salt_len = match profile {
1540 crate::srtp::SrtpProfile::AeadAes128Gcm => 12,
1541 _ => 14,
1542 };
1543
1544 let total_len = 2 * (key_len + salt_len);
1545
1546 if let Ok(mat) = dtls.export_keying_material("EXTRACTOR-dtls_srtp", total_len) {
1547 let client_key = &mat[0..key_len];
1548 let server_key = &mat[key_len..2 * key_len];
1549 let client_salt = &mat[2 * key_len..2 * key_len + salt_len];
1550 let server_salt = &mat[2 * key_len + salt_len..];
1551
1552 let (tx_key, tx_salt, rx_key, rx_salt) = if is_client {
1553 (client_key, client_salt, server_key, server_salt)
1554 } else {
1555 (server_key, server_salt, client_key, client_salt)
1556 };
1557
1558 let tx_keying = crate::srtp::SrtpKeyingMaterial::new(tx_key.to_vec(), tx_salt.to_vec());
1559 let rx_keying = crate::srtp::SrtpKeyingMaterial::new(rx_key.to_vec(), rx_salt.to_vec());
1560
1561 match crate::srtp::SrtpSession::new(profile, tx_keying, rx_keying) {
1562 Ok(session) => {
1563 rtp_transport.start_srtp(session);
1564
1565 let transceivers = self.inner.transceivers.lock().unwrap();
1566 for t in transceivers.iter() {
1567 let sender_arc = t.sender.lock().unwrap().clone();
1568 let receiver_arc = t.receiver.lock().unwrap().clone();
1569
1570 if let Some(sender) = &sender_arc {
1571 let mid_opt = t.mid();
1572 trace!(
1573 "start_dtls: transceiver kind={:?} mid={:?}",
1574 t.kind(),
1575 mid_opt
1576 );
1577 sender.set_transport(rtp_transport.clone());
1578 }
1579
1580 if let Some(receiver) = &receiver_arc {
1581 receiver.set_transport(
1582 rtp_transport.clone(),
1583 Some(self.inner.event_tx.clone()),
1584 Some(Arc::downgrade(&t)),
1585 );
1586 if let Some(sender) = &sender_arc {
1587 receiver.set_feedback_ssrc(sender.ssrc());
1588 }
1589 }
1590 }
1591
1592 *self.inner.rtp_transport.lock().unwrap() = Some(rtp_transport.clone());
1594 }
1595 Err(e) => {
1596 debug!("Failed to create SRTP session: {}", e);
1597 }
1598 }
1599 } else {
1600 debug!(
1601 "Failed to export keying material - DTLS state: {}",
1602 dtls.get_state()
1603 );
1604 }
1605 }
1606
1607 fn create_rtcp_loop(
1608 rtp_transport: Arc<RtpTransport>,
1609 inner_weak: Weak<PeerConnectionInner>,
1610 stats_collector: Arc<StatsCollector>,
1611 ) -> impl Future<Output = ()> + Send {
1612 let (rtcp_tx, mut rtcp_rx) = mpsc::channel(2000);
1613 rtp_transport.register_rtcp_listener(rtcp_tx);
1614
1615 async move {
1616 while let Some(packets) = rtcp_rx.recv().await {
1617 for packet in packets {
1618 match &packet {
1620 RtcpPacket::PictureLossIndication(_) => {}
1621 RtcpPacket::GenericNack(n) => {
1622 trace!("RTCP Loop: Got NACK for SSRC {}", n.media_ssrc)
1623 }
1624 RtcpPacket::ReceiverReport(rr) => trace!(
1625 "RTCP Loop: Got RR for SSRC count {}",
1626 rr.report_blocks.len()
1627 ),
1628 RtcpPacket::SenderReport(sr) => {
1629 trace!("RTCP Loop: Got SR for SSRC {}", sr.sender_ssrc)
1630 }
1631 _ => trace!("RTCP Loop: Got packet {:?}", packet),
1632 }
1633
1634 stats_collector.process_rtcp(&packet);
1635 let Some(inner) = inner_weak.upgrade() else {
1636 return;
1637 };
1638 {
1639 let transceivers = inner.transceivers.lock().unwrap();
1640 for t in transceivers.iter() {
1641 if let Some(sender) = &*t.sender.lock().unwrap() {
1642 let is_for_sender = match &packet {
1643 RtcpPacket::PictureLossIndication(p) => {
1644 if p.media_ssrc == sender.ssrc() {
1645 debug!("Received PLI for SSRC: {}", p.media_ssrc);
1646 true
1647 } else {
1648 false
1649 }
1650 }
1651 RtcpPacket::GenericNack(n) => n.media_ssrc == sender.ssrc(),
1652 _ => false,
1653 };
1654
1655 if is_for_sender {
1656 sender.deliver_rtcp(packet.clone());
1657 }
1658 }
1659 }
1660 }
1661 }
1662 }
1663 }
1664 }
1665
1666 fn create_pair_monitor(
1667 mut pair_rx: watch::Receiver<Option<crate::transports::ice::IceCandidatePair>>,
1668 ice_conn_monitor: Arc<IceConn>,
1669 ) -> impl Future<Output = ()> + Send {
1670 async move {
1671 if let Some(pair) = pair_rx.borrow().clone() {
1672 if let Ok(mut addr) = ice_conn_monitor.remote_addr.write() {
1673 trace!(
1674 "PeerConnection: pair_monitor initial update: {} -> {}",
1675 *addr, pair.remote.address
1676 );
1677 *addr = pair.remote.address;
1678 }
1679 }
1680 while pair_rx.changed().await.is_ok() {
1681 if let Some(pair) = pair_rx.borrow().clone() {
1682 let old_addr = if let Ok(addr) = ice_conn_monitor.remote_addr.read() {
1683 *addr
1684 } else {
1685 "0.0.0.0:0".parse().unwrap()
1686 };
1687 if let Ok(mut addr_guard) = ice_conn_monitor.remote_addr.write() {
1688 trace!(
1689 "PeerConnection: pair_monitor update: {} -> {}",
1690 old_addr, pair.remote.address
1691 );
1692 *addr_guard = pair.remote.address;
1693 }
1694 }
1695 }
1696 }
1697 }
1698
1699 pub fn signaling_state(&self) -> SignalingState {
1700 *self.inner.signaling_state.borrow()
1701 }
1702
1703 pub fn subscribe_signaling_state(&self) -> watch::Receiver<SignalingState> {
1704 self.inner.signaling_state.subscribe()
1705 }
1706
1707 pub fn subscribe_peer_state(&self) -> watch::Receiver<PeerConnectionState> {
1708 self.inner.peer_state.subscribe()
1709 }
1710
1711 pub async fn wait_for_connected(&self) -> RtcResult<()> {
1712 let mut peer_state_rx = self.subscribe_peer_state();
1713 loop {
1714 let state = *peer_state_rx.borrow_and_update();
1715 if state == PeerConnectionState::Connected {
1716 return Ok(());
1717 }
1718 if state == PeerConnectionState::Failed || state == PeerConnectionState::Closed {
1719 return Err(RtcError::Internal(format!(
1720 "Peer connection failed or closed: {:?}",
1721 state
1722 )));
1723 }
1724 if peer_state_rx.changed().await.is_err() {
1725 return Err(RtcError::Internal("Peer state channel closed".into()));
1726 }
1727 }
1728 }
1729
1730 pub fn subscribe_ice_connection_state(&self) -> watch::Receiver<IceConnectionState> {
1731 self.inner.ice_connection_state.subscribe()
1732 }
1733
1734 pub fn subscribe_ice_gathering_state(&self) -> watch::Receiver<IceGatheringState> {
1735 self.inner.ice_gathering_state.subscribe()
1736 }
1737
1738 pub fn local_description(&self) -> Option<SessionDescription> {
1739 self.inner.local_description.lock().unwrap().clone()
1740 }
1741
1742 pub fn remote_description(&self) -> Option<SessionDescription> {
1743 self.inner.remote_description.lock().unwrap().clone()
1744 }
1745
1746 pub fn close(&self) {
1747 self.inner.close();
1748 }
1749
1750 pub async fn recv(&self) -> Option<PeerConnectionEvent> {
1751 let mut rx = self.inner.event_rx.lock().await;
1752 rx.recv().await
1753 }
1754
1755 pub fn create_data_channel(
1756 &self,
1757 label: &str,
1758 config: Option<crate::transports::sctp::DataChannelConfig>,
1759 ) -> RtcResult<Arc<crate::transports::sctp::DataChannel>> {
1760 let has_app_transceiver = {
1762 let transceivers = self.inner.transceivers.lock().unwrap();
1763 transceivers
1764 .iter()
1765 .any(|t| t.kind() == MediaKind::Application)
1766 };
1767
1768 if !has_app_transceiver {
1769 self.add_transceiver(MediaKind::Application, TransceiverDirection::SendRecv);
1770 }
1771
1772 let mut config = config.unwrap_or_default();
1773 config.label = label.to_string();
1774
1775 let id = if let Some(negotiated_id) = config.negotiated {
1776 negotiated_id
1777 } else {
1778 let is_client = self.inner.dtls_role.borrow().unwrap_or(true);
1779 let offset = if is_client { 0 } else { 1 };
1780
1781 let channels = self.inner.data_channels.lock().unwrap();
1782 let mut id = offset;
1783 loop {
1784 let mut used = false;
1785 for weak_dc in channels.iter() {
1786 if let Some(dc) = weak_dc.upgrade() {
1787 if dc.id == id {
1788 used = true;
1789 break;
1790 }
1791 }
1792 }
1793 if !used {
1794 break;
1795 }
1796 id += 2;
1797 }
1798 id
1799 };
1800
1801 let dc = Arc::new(crate::transports::sctp::DataChannel::new(
1802 id,
1803 config.clone(),
1804 ));
1805
1806 self.inner
1807 .data_channels
1808 .lock()
1809 .unwrap()
1810 .push(Arc::downgrade(&dc));
1811
1812 if !dc.negotiated {
1813 let transport = self.inner.sctp_transport.lock().unwrap().clone();
1814 if let Some(transport) = transport {
1815 let dc_clone = dc.clone();
1816 tokio::spawn(async move {
1817 if let Err(e) = transport.send_dcep_open(&dc_clone).await {
1818 debug!("Failed to send DCEP OPEN: {}", e);
1819 }
1820 });
1821 }
1822 }
1823
1824 Ok(dc)
1825 }
1826
1827 pub async fn send_data(&self, channel_id: u16, data: &[u8]) -> RtcResult<()> {
1828 let transport = self.inner.sctp_transport.lock().unwrap().clone();
1829 if let Some(transport) = transport {
1830 transport
1831 .send_data(channel_id, data)
1832 .await
1833 .map_err(|e| RtcError::Internal(format!("SCTP send failed: {}", e)))
1834 } else {
1835 Err(RtcError::InvalidState("SCTP not connected".into()))
1836 }
1837 }
1838
1839 pub async fn send_text(&self, channel_id: u16, data: impl AsRef<str>) -> RtcResult<()> {
1840 let transport = self.inner.sctp_transport.lock().unwrap().clone();
1841 if let Some(transport) = transport {
1842 transport
1843 .send_text(channel_id, data)
1844 .await
1845 .map_err(|e| RtcError::Internal(format!("SCTP send failed: {}", e)))
1846 } else {
1847 Err(RtcError::InvalidState("SCTP not connected".into()))
1848 }
1849 }
1850
1851 pub async fn sctp_buffered_amount(&self) -> usize {
1852 let transport = self.inner.sctp_transport.lock().unwrap().clone();
1853 if let Some(transport) = transport {
1854 transport.buffered_amount()
1855 } else {
1856 0
1857 }
1858 }
1859
1860 pub async fn get_stats(&self) -> RtcResult<StatsReport> {
1861 gather_once(&[self.inner.stats_collector.clone()]).await
1862 }
1863
1864 pub async fn wait_for_gathering_complete(&self) {
1865 let _ = self.inner.ice_transport.start_gathering();
1866 let mut rx = self.subscribe_ice_gathering_state();
1867 loop {
1868 if *rx.borrow_and_update() == IceGatheringState::Complete {
1869 return;
1870 }
1871 if rx.changed().await.is_err() {
1872 return;
1873 }
1874 }
1875 }
1876
1877 pub fn subscribe_ice_candidates(&self) -> broadcast::Receiver<IceCandidate> {
1878 self.inner.ice_transport.subscribe_candidates()
1879 }
1880
1881 pub fn add_ice_candidate(&self, candidate: IceCandidate) -> RtcResult<()> {
1882 self.inner.ice_transport.add_remote_candidate(candidate);
1883 Ok(())
1884 }
1885
1886 async fn handle_reinvite(&self, new_desc: &SessionDescription) -> RtcResult<()> {
1888 debug!("Handling reinvite: updating RTP parameters");
1889
1890 let transceivers = self.inner.transceivers.lock().unwrap().clone();
1891
1892 for section in &new_desc.media_sections {
1894 let transceiver = transceivers
1896 .iter()
1897 .find(|t| t.mid().as_ref() == Some(§ion.mid));
1898
1899 if let Some(t) = transceiver {
1900 if let Some(receiver) = t.receiver() {
1902 let new_ssrc = Self::extract_ssrc_from_section(section);
1903 if let Some(new_ssrc) = new_ssrc {
1904 let old_ssrc = receiver.ssrc();
1905 if old_ssrc != new_ssrc {
1906 if old_ssrc != 0 {
1907 debug!(
1908 "SSRC changed for mid={} ({} -> {}), updating listener",
1909 section.mid, old_ssrc, new_ssrc
1910 );
1911 } else {
1912 debug!(
1913 "SSRC learned for mid={} (-> {}), updating listener",
1914 section.mid, new_ssrc
1915 );
1916 }
1917 receiver.set_ssrc(new_ssrc);
1918 }
1919 } else {
1920 receiver.ensure_provisional_listener();
1923 }
1924 }
1925
1926 let payload_map = Self::extract_payload_map(section);
1928 if !payload_map.is_empty() {
1929 for (pt, params) in &payload_map {
1931 trace!("Validating PT {}: clock_rate={}", pt, params.clock_rate);
1932 }
1934 t.update_payload_map(payload_map)?;
1935 }
1936
1937 let extmap = Self::extract_extmap(section);
1939 t.update_extmap(extmap)?;
1940
1941 let new_direction: TransceiverDirection = section.direction.into();
1943 let old_direction = t.direction();
1944 if new_direction != old_direction {
1945 debug!(
1946 "Direction changed for mid={}: {:?} -> {:?}",
1947 section.mid, old_direction, new_direction
1948 );
1949 t.set_direction(new_direction);
1950 Self::apply_direction_change(t, old_direction, new_direction).await?;
1951 }
1952 }
1953 }
1954
1955 *self.inner.remote_description.lock().unwrap() = Some(new_desc.clone());
1957
1958 debug!("Reinvite completed successfully");
1959 Ok(())
1960 }
1961
1962 fn extract_payload_map(section: &crate::MediaSection) -> HashMap<u8, RtpCodecParameters> {
1964 let mut payload_map = HashMap::new();
1965
1966 for attr in §ion.attributes {
1968 if attr.key == "rtpmap" {
1969 if let Some(val) = &attr.value {
1970 let parts: Vec<&str> = val.split_whitespace().collect();
1971 if parts.len() >= 2 {
1972 if let Ok(pt) = parts[0].parse::<u8>() {
1973 let codec_parts: Vec<&str> = parts[1].split('/').collect();
1975 if codec_parts.len() >= 2 {
1976 let clock_rate = codec_parts[1].parse().unwrap_or(90000);
1977 let channels = if codec_parts.len() >= 3 {
1978 codec_parts[2].parse().unwrap_or(0)
1979 } else {
1980 0
1981 };
1982
1983 payload_map.insert(
1984 pt,
1985 RtpCodecParameters {
1986 payload_type: pt,
1987 clock_rate,
1988 channels,
1989 },
1990 );
1991 }
1992 }
1993 }
1994 }
1995 }
1996 }
1997
1998 payload_map
1999 }
2000
2001 fn extract_extmap(section: &crate::MediaSection) -> HashMap<u8, String> {
2003 let mut extmap = HashMap::new();
2004
2005 for attr in §ion.attributes {
2007 if attr.key == "extmap" {
2008 if let Some(val) = &attr.value {
2009 let parts: Vec<&str> = val.split_whitespace().collect();
2010 if parts.len() >= 2 {
2011 if let Ok(id) = parts[0].parse::<u8>() {
2012 extmap.insert(id, parts[1].to_string());
2013 }
2014 }
2015 }
2016 }
2017 }
2018
2019 extmap
2020 }
2021
2022 fn extract_ssrc_from_section(section: &crate::MediaSection) -> Option<u32> {
2024 for attr in §ion.attributes {
2026 if attr.key == "ssrc" {
2027 if let Some(val) = &attr.value {
2028 if let Some(ssrc_str) = val.split_whitespace().next() {
2029 if let Ok(ssrc) = ssrc_str.parse::<u32>() {
2030 return Some(ssrc);
2031 }
2032 }
2033 }
2034 }
2035 }
2036 None
2037 }
2038
2039 async fn apply_direction_change(
2041 transceiver: &RtpTransceiver,
2042 old_direction: TransceiverDirection,
2043 new_direction: TransceiverDirection,
2044 ) -> RtcResult<()> {
2045 let old_sends = match old_direction {
2046 TransceiverDirection::SendRecv | TransceiverDirection::SendOnly => true,
2047 _ => false,
2048 };
2049 let new_sends = match new_direction {
2050 TransceiverDirection::SendRecv | TransceiverDirection::SendOnly => true,
2051 _ => false,
2052 };
2053
2054 let old_receives = match old_direction {
2055 TransceiverDirection::SendRecv | TransceiverDirection::RecvOnly => true,
2056 _ => false,
2057 };
2058 let new_receives = match new_direction {
2059 TransceiverDirection::SendRecv | TransceiverDirection::RecvOnly => true,
2060 _ => false,
2061 };
2062
2063 if old_sends != new_sends {
2065 if new_sends {
2066 debug!("Transceiver {} starting to send", transceiver.id());
2067 if let Some(sender) = transceiver.sender() {
2069 trace!("Sender {} would resume", sender.ssrc());
2071 }
2072 } else {
2073 debug!("Transceiver {} stopping send", transceiver.id());
2074 if let Some(sender) = transceiver.sender() {
2076 trace!("Sender {} would pause", sender.ssrc());
2078 }
2079 }
2080 }
2081
2082 if old_receives != new_receives {
2084 if new_receives {
2085 debug!("Transceiver {} starting to receive", transceiver.id());
2086 } else {
2088 debug!("Transceiver {} stopping receive", transceiver.id());
2089 }
2091 }
2092
2093 Ok(())
2094 }
2095}
2096
2097async fn run_gathering_loop(
2098 ice_transport: IceTransport,
2099 ice_gathering_state_tx: watch::Sender<IceGatheringState>,
2100 inner_weak: std::sync::Weak<PeerConnectionInner>,
2101) {
2102 let mut rx = ice_transport.subscribe_gathering_state();
2103 let mut ice_state_rx = ice_transport.subscribe_state();
2104 loop {
2105 let state = *rx.borrow_and_update();
2106 if state == crate::transports::ice::IceGathererState::Complete {
2107 if let Some(inner) = inner_weak.upgrade() {
2108 let update_local_description = || {
2109 if inner.config.transport_mode == TransportMode::WebRtc {
2110 let candidates = ice_transport.local_candidates();
2111 let candidate_strs: Vec<String> =
2112 candidates.iter().map(|c| c.to_sdp()).collect();
2113
2114 let mut local_guard = inner.local_description.lock().unwrap();
2115 if let Some(desc) = local_guard.as_mut() {
2116 desc.add_candidates(&candidate_strs);
2117 }
2118 true
2119 } else {
2120 let candidates = ice_transport.local_candidates();
2121 if let Some(candidate) = candidates.first() {
2122 let mut local_guard = inner.local_description.lock().unwrap();
2123 if let Some(desc) = local_guard.as_mut() {
2124 for media in &mut desc.media_sections {
2125 media.port = candidate.address.port();
2126 let ip_str = candidate.address.ip().to_string();
2127 let ip_ver = if candidate.address.is_ipv4() {
2128 "IP4"
2129 } else {
2130 "IP6"
2131 };
2132 media.connection = Some(format!("IN {} {}", ip_ver, ip_str));
2133 }
2134 }
2135 }
2136 true
2137 }
2138 };
2139
2140 if !update_local_description() {
2141 let mut sig_rx = inner.signaling_state.subscribe();
2142 loop {
2143 if update_local_description() {
2144 break;
2145 }
2146 if sig_rx.changed().await.is_err() {
2147 break;
2148 }
2149 }
2150 }
2151 }
2152 }
2153
2154 let pc_state = match state {
2155 crate::transports::ice::IceGathererState::New => IceGatheringState::New,
2156 crate::transports::ice::IceGathererState::Gathering => IceGatheringState::Gathering,
2157 crate::transports::ice::IceGathererState::Complete => IceGatheringState::Complete,
2158 };
2159
2160 if ice_gathering_state_tx.send(pc_state).is_err() {
2161 break;
2162 }
2163 if state == crate::transports::ice::IceGathererState::Complete {
2164 break;
2165 }
2166 tokio::select! {
2167 res = rx.changed() => {
2168 if res.is_err() { break; }
2169 }
2170 res = ice_state_rx.changed() => {
2171 if res.is_err() { break; }
2172 if *ice_state_rx.borrow() == crate::transports::ice::IceTransportState::Closed {
2173 break;
2174 }
2175 }
2176 }
2177 }
2178}
2179
2180async fn run_ice_dtls_loop(
2181 ice_transport: IceTransport,
2182 ice_connection_state_tx: watch::Sender<IceConnectionState>,
2183 mut dtls_role_rx: watch::Receiver<Option<bool>>,
2184 inner_weak: std::sync::Weak<PeerConnectionInner>,
2185) {
2186 let mut ice_state_rx = ice_transport.subscribe_state();
2187 loop {
2188 let ice_state = *ice_state_rx.borrow_and_update();
2189
2190 let pc_ice_state = match ice_state {
2191 crate::transports::ice::IceTransportState::New => IceConnectionState::New,
2192 crate::transports::ice::IceTransportState::Checking => IceConnectionState::Checking,
2193 crate::transports::ice::IceTransportState::Connected => IceConnectionState::Connected,
2194 crate::transports::ice::IceTransportState::Completed => IceConnectionState::Completed,
2195 crate::transports::ice::IceTransportState::Failed => IceConnectionState::Failed,
2196 crate::transports::ice::IceTransportState::Disconnected => {
2197 IceConnectionState::Disconnected
2198 }
2199 crate::transports::ice::IceTransportState::Closed => IceConnectionState::Closed,
2200 };
2201 let _ = ice_connection_state_tx.send(pc_ice_state);
2202 match ice_state {
2203 crate::transports::ice::IceTransportState::Connected
2204 | crate::transports::ice::IceTransportState::Completed => {
2205 let transport_mode = if let Some(inner) = inner_weak.upgrade() {
2207 inner.config.transport_mode.clone()
2208 } else {
2209 return;
2210 };
2211
2212 if transport_mode != TransportMode::WebRtc {
2213 if !handle_connected_state_no_dtls(&inner_weak, &mut ice_state_rx).await {
2214 return;
2215 }
2216 continue;
2217 }
2218
2219 if !handle_connected_state(
2220 &inner_weak,
2221 &ice_connection_state_tx,
2222 &mut dtls_role_rx,
2223 &mut ice_state_rx,
2224 )
2225 .await
2226 {
2227 return;
2228 }
2229 continue;
2230 }
2231 crate::transports::ice::IceTransportState::Failed => {
2232 if let Some(inner) = inner_weak.upgrade() {
2233 let _ = inner.peer_state.send(PeerConnectionState::Failed);
2234 }
2235 return;
2236 }
2237 crate::transports::ice::IceTransportState::Closed => {
2238 if let Some(inner) = inner_weak.upgrade() {
2239 let _ = inner.peer_state.send(PeerConnectionState::Closed);
2240 }
2241 return;
2242 }
2243 _ => {}
2244 }
2245
2246 if ice_state_rx.changed().await.is_err() {
2247 return;
2248 }
2249 }
2250}
2251
2252async fn handle_connected_state_no_dtls(
2253 inner_weak: &std::sync::Weak<PeerConnectionInner>,
2254 ice_state_rx: &mut watch::Receiver<crate::transports::ice::IceTransportState>,
2255) -> bool {
2256 if let Some(inner) = inner_weak.upgrade() {
2257 let pc_temp = PeerConnection {
2258 inner: inner.clone(),
2259 };
2260 match pc_temp.start_dtls(false).await {
2262 Err(e) => {
2263 debug!("Transport start failed: {}", e);
2264 let _ = inner.peer_state.send(PeerConnectionState::Failed);
2265 return false;
2266 }
2267 Ok(mut rtcp_loop) => {
2268 let _ = inner.peer_state.send(PeerConnectionState::Connected);
2269 loop {
2270 tokio::select! {
2271 _ = &mut rtcp_loop => {
2272 break;
2273 }
2274 res = ice_state_rx.changed() => {
2275 if res.is_err() { return false; }
2276 let new_state = *ice_state_rx.borrow();
2277 if is_ice_disconnected(new_state) {
2278 return true;
2279 }
2280 }
2281 }
2282 }
2283 }
2284 }
2285 }
2286 false
2287}
2288
2289async fn handle_connected_state(
2290 inner_weak: &std::sync::Weak<PeerConnectionInner>,
2291 ice_connection_state_tx: &watch::Sender<IceConnectionState>,
2292 dtls_role_rx: &mut watch::Receiver<Option<bool>>,
2293 ice_state_rx: &mut watch::Receiver<crate::transports::ice::IceTransportState>,
2294) -> bool {
2295 loop {
2296 let role = *dtls_role_rx.borrow_and_update();
2297 if let Some(is_client) = role {
2298 if let Some(inner) = inner_weak.upgrade() {
2299 let pc_temp = PeerConnection {
2300 inner: inner.clone(),
2301 };
2302
2303 match pc_temp.start_dtls(is_client).await {
2304 Err(e) => {
2305 debug!("DTLS start failed: {}", e);
2306 let _ = inner.peer_state.send(PeerConnectionState::Failed);
2307 return false;
2308 }
2309 Ok(mut rtcp_loop) => {
2310 let _ = inner.peer_state.send(PeerConnectionState::Connected);
2311
2312 let dtls_state_rx = {
2313 let dtls_guard = inner.dtls_transport.lock().unwrap();
2314 if let Some(dtls) = &*dtls_guard {
2315 Some(dtls.subscribe_state())
2316 } else {
2317 None
2318 }
2319 };
2320
2321 if let Some(mut dtls_rx) = dtls_state_rx {
2322 loop {
2323 tokio::select! {
2324 _ = &mut rtcp_loop => {
2325 break;
2326 }
2327 res = ice_state_rx.changed() => {
2328 if res.is_err() { return false; }
2329 let new_state = *ice_state_rx.borrow();
2330 if is_ice_disconnected(new_state) {
2331 return true;
2332 }
2333 }
2334 res = dtls_rx.changed() => {
2335 if res.is_ok() {
2336 let state = dtls_rx.borrow().clone();
2337 if state == crate::transports::dtls::DtlsState::Closed || state == crate::transports::dtls::DtlsState::Failed {
2338 debug!("DTLS closed/failed, disconnecting PC");
2339 let _ = inner.peer_state.send(PeerConnectionState::Disconnected);
2340 let _ = ice_connection_state_tx.send(IceConnectionState::Disconnected);
2341 return false;
2342 }
2343 } else {
2344 break;
2345 }
2346 }
2347 }
2348 }
2349 } else {
2350 loop {
2351 tokio::select! {
2352 _ = &mut rtcp_loop => {
2353 break;
2354 }
2355 res = ice_state_rx.changed() => {
2356 if res.is_err() { return false; }
2357 let new_state = *ice_state_rx.borrow();
2358 if is_ice_disconnected(new_state) {
2359 return true;
2360 }
2361 }
2362 }
2363 }
2364 }
2365 }
2366 }
2367 }
2368
2369 let state = *ice_state_rx.borrow();
2370 if is_ice_disconnected(state) {
2371 return true;
2372 }
2373 return false;
2374 }
2375
2376 tokio::select! {
2377 res = dtls_role_rx.changed() => {
2378 if res.is_err() { return false; }
2379 }
2380 res = ice_state_rx.changed() => {
2381 if res.is_err() { return false; }
2382 let new_state = *ice_state_rx.borrow();
2383 if is_ice_disconnected(new_state) {
2384 return true;
2385 }
2386 }
2387 }
2388 }
2389}
2390
2391fn is_ice_disconnected(state: crate::transports::ice::IceTransportState) -> bool {
2392 matches!(
2393 state,
2394 crate::transports::ice::IceTransportState::Failed
2395 | crate::transports::ice::IceTransportState::Closed
2396 | crate::transports::ice::IceTransportState::Disconnected
2397 )
2398}
2399
2400impl PeerConnectionInner {
2401 async fn build_description<F>(
2402 &self,
2403 sdp_type: SdpType,
2404 map_direction: F,
2405 ) -> RtcResult<SessionDescription>
2406 where
2407 F: Fn(TransceiverDirection) -> TransceiverDirection,
2408 {
2409 let transceivers = {
2410 let list = self.transceivers.lock().unwrap();
2411 list.iter().cloned().collect::<Vec<_>>()
2412 };
2413 if transceivers.is_empty() {
2414 return Err(RtcError::InvalidState(
2415 "cannot build SDP with no transceivers".into(),
2416 ));
2417 }
2418
2419 let mut remote_offered_bundle = false;
2420
2421 let ordered_transceivers = if sdp_type == SdpType::Answer {
2422 let remote_guard = self.remote_description.lock().unwrap();
2423 let remote = remote_guard.as_ref().ok_or_else(|| {
2424 RtcError::InvalidState("create_answer called without remote description".into())
2425 })?;
2426
2427 for attr in &remote.session.attributes {
2428 if attr.key == "group"
2429 && let Some(val) = &attr.value
2430 && val.starts_with("BUNDLE")
2431 {
2432 remote_offered_bundle = true;
2433 }
2434 }
2435
2436 let mut ordered = Vec::new();
2437 for section in &remote.media_sections {
2438 let mid = §ion.mid;
2439 let mut found = None;
2440 for t in &transceivers {
2441 if let Some(t_mid) = t.mid()
2442 && t_mid == *mid
2443 {
2444 found = Some(t.clone());
2445 break;
2446 }
2447 }
2448 if let Some(t) = found {
2449 ordered.push(t);
2450 } else {
2451 return Err(RtcError::Internal(format!(
2452 "No transceiver found for mid {} in answer generation",
2453 mid
2454 )));
2455 }
2456 }
2457 ordered
2458 } else {
2459 for t in &transceivers {
2463 self.ensure_mid(t);
2464 }
2465
2466 let mut ordered = transceivers.clone();
2467 ordered.sort_by(|a, b| {
2468 let mid_a = a.mid().unwrap_or_default();
2469 let mid_b = b.mid().unwrap_or_default();
2470
2471 match (mid_a.parse::<u64>(), mid_b.parse::<u64>()) {
2474 (Ok(na), Ok(nb)) => na.cmp(&nb),
2475 _ => mid_a.cmp(&mid_b),
2476 }
2477 });
2478 ordered
2479 };
2480
2481 self.ice_transport
2482 .start_gathering()
2483 .map_err(|err| RtcError::InvalidState(format!("ICE gathering failed: {err}")))?;
2484
2485 let mode = self.config.transport_mode.clone();
2486
2487 if mode != TransportMode::WebRtc {
2490 let mut candidates = self.ice_transport.local_candidates();
2491 if candidates.is_empty() {
2492 let mut rx = self.ice_transport.subscribe_candidates();
2493 let start = tokio::time::Instant::now();
2494 let timeout_dur = tokio::time::Duration::from_millis(500);
2495
2496 while candidates.is_empty() && start.elapsed() < timeout_dur {
2497 let _ = tokio::time::timeout(timeout_dur - start.elapsed(), rx.recv()).await;
2498 candidates = self.ice_transport.local_candidates();
2499 }
2500 }
2501 }
2502
2503 let ice_params = self.ice_transport.local_parameters();
2504 let ice_username = ice_params.username_fragment.clone();
2505 let ice_password = ice_params.password.clone();
2506 let candidate_lines: Vec<String> = self
2507 .ice_transport
2508 .local_candidates()
2509 .iter()
2510 .map(IceCandidate::to_sdp)
2511 .collect();
2512 let gather_complete = matches!(
2513 self.ice_transport.gather_state(),
2514 IceGathererState::Complete
2515 );
2516 let mut desc = SessionDescription::new(sdp_type);
2517 desc.session.origin = default_origin();
2518 if let Some(ext_ip) = &self.config.external_ip {
2519 desc.session.origin.unicast_address = ext_ip.clone();
2520 }
2521 desc.session.origin.session_version += 1;
2522 if !desc
2523 .session
2524 .attributes
2525 .iter()
2526 .any(|attr| attr.key == "msid-semantic")
2527 && self.config.transport_mode == TransportMode::WebRtc
2528 {
2529 desc.session
2530 .attributes
2531 .push(Attribute::new("msid-semantic", Some("WMS *".into())));
2532 }
2533
2534 let mode = self.config.transport_mode.clone();
2535
2536 if mode == TransportMode::Rtp || mode == TransportMode::Srtp {
2537 let local_ip = if let Some(ext_ip) = &self.config.external_ip {
2538 ext_ip
2539 .parse()
2540 .unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
2541 } else {
2542 get_local_ip().unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
2543 };
2544 if desc.session.connection.is_none() {
2545 desc.session.connection = Some(format!("IN IP4 {}", local_ip));
2546 }
2547 }
2548
2549 for transceiver in ordered_transceivers.into_iter() {
2550 let mid = self.ensure_mid(&transceiver);
2551 let mut direction = map_direction(transceiver.direction());
2552 let sender_info = if direction.sends() {
2553 transceiver.sender.lock().unwrap().clone()
2554 } else {
2555 None
2556 };
2557
2558 let remote_expects_media = if sdp_type == SdpType::Answer {
2560 let remote_guard = self.remote_description.lock().unwrap();
2561 if let Some(remote) = remote_guard.as_ref() {
2562 remote
2564 .media_sections
2565 .iter()
2566 .find(|section| section.mid == mid)
2567 .map(|section| {
2568 matches!(
2570 section.direction,
2571 crate::sdp::Direction::SendRecv | crate::sdp::Direction::SendOnly
2572 )
2573 })
2574 .unwrap_or(false)
2575 } else {
2576 false
2577 }
2578 } else {
2579 false
2580 };
2581
2582 let has_sender_ssrc = transceiver.sender_ssrc.lock().unwrap().is_some();
2585 if direction.sends()
2586 && sender_info.is_none()
2587 && !has_sender_ssrc
2588 && transceiver.kind() != MediaKind::Application
2589 && !remote_expects_media
2590 {
2591 direction = match direction {
2592 TransceiverDirection::SendRecv => TransceiverDirection::RecvOnly,
2593 TransceiverDirection::SendOnly => TransceiverDirection::Inactive,
2594 _ => direction,
2595 };
2596 }
2597
2598 let mut section = MediaSection::new(transceiver.kind(), mid);
2599 section.direction = direction.into();
2600
2601 if mode == TransportMode::Rtp {
2602 section.protocol = "RTP/AVP".to_string();
2603 }
2604
2605 if mode == TransportMode::WebRtc {
2606 section.connection = Some("IN IP4 0.0.0.0".to_string());
2607 section
2608 .attributes
2609 .push(Attribute::new("ice-ufrag", Some(ice_username.clone())));
2610 section
2611 .attributes
2612 .push(Attribute::new("ice-pwd", Some(ice_password.clone())));
2613 section
2614 .attributes
2615 .push(Attribute::new("ice-options", Some("trickle".into())));
2616 for candidate in &candidate_lines {
2617 section
2618 .attributes
2619 .push(Attribute::new("candidate", Some(candidate.clone())));
2620 }
2621 if gather_complete {
2622 section
2623 .attributes
2624 .push(Attribute::new("end-of-candidates", None));
2625 }
2626 } else {
2627 let candidates = self.ice_transport.local_candidates();
2630 if let Some(cand) = candidates
2631 .iter()
2632 .find(|c| !c.address.ip().is_loopback())
2633 .or(candidates.first())
2634 {
2635 section.port = cand.address.port();
2636 let conn = format!("IN IP4 {}", cand.address.ip());
2637 if Some(&conn) != desc.session.connection.as_ref() {
2638 section.connection = Some(conn);
2639 }
2640 }
2641 }
2642
2643 self.populate_media_capabilities(&mut section, transceiver.kind(), sdp_type);
2644 if let Some(sender) = sender_info {
2645 Self::attach_sender_attributes(
2646 &mut section,
2647 sender.ssrc(),
2648 sender.cname(),
2649 sender.stream_id(),
2650 sender.track_id(),
2651 &mode,
2652 );
2653 } else if direction.sends() {
2654 if let Some(ssrc) = *transceiver.sender_ssrc.lock().unwrap() {
2655 let cname = format!("rustrtc-cname-{ssrc}");
2656 let stream_id = transceiver
2657 .sender_stream_id
2658 .lock()
2659 .unwrap()
2660 .clone()
2661 .unwrap_or_else(|| "default".to_string());
2662 let track_id = transceiver
2663 .sender_track_id
2664 .lock()
2665 .unwrap()
2666 .clone()
2667 .unwrap_or_else(|| format!("track-{}", transceiver.id()));
2668 Self::attach_sender_attributes(
2669 &mut section,
2670 ssrc,
2671 &cname,
2672 &stream_id,
2673 &track_id,
2674 &mode,
2675 );
2676 }
2677 }
2678
2679 if self.config.transport_mode == TransportMode::Srtp {
2680 let mut suite = "AES_CM_128_HMAC_SHA1_80".to_string();
2681 if sdp_type == SdpType::Answer {
2682 let remote_desc = self.remote_description.lock().unwrap();
2683 if let Some(remote) = &*remote_desc {
2684 if let Some(c) = remote
2685 .media_sections
2686 .iter()
2687 .flat_map(|m| m.get_crypto_attributes())
2688 .find(|c| map_crypto_suite(&c.crypto_suite).is_ok())
2689 {
2690 suite = c.crypto_suite.clone();
2691 }
2692 }
2693 }
2694
2695 let key_params = generate_sdes_key_params();
2696 let crypto_val = format!("1 {} {}|2^31|1:1", suite, key_params);
2697 section
2698 .attributes
2699 .push(Attribute::new("crypto", Some(crypto_val)));
2700 }
2701
2702 desc.media_sections.push(section);
2703 }
2704
2705 if !desc.media_sections.is_empty() {
2706 let should_bundle = match sdp_type {
2707 SdpType::Offer => true,
2708 SdpType::Answer => remote_offered_bundle,
2709 _ => false,
2710 };
2711
2712 let should_bundle = should_bundle && desc.media_sections.len() > 1;
2713
2714 if should_bundle {
2715 let mids: Vec<String> = desc.media_sections.iter().map(|m| m.mid.clone()).collect();
2716 let value = format!("BUNDLE {}", mids.join(" "));
2717 desc.session
2718 .attributes
2719 .push(Attribute::new("group", Some(value)));
2720 }
2721 }
2722
2723 Ok(desc)
2724 }
2725
2726 fn attach_sender_attributes(
2727 section: &mut MediaSection,
2728 ssrc: u32,
2729 cname: &str,
2730 stream_id: &str,
2731 track_id: &str,
2732 mode: &TransportMode,
2733 ) {
2734 if *mode == TransportMode::WebRtc {
2735 section.attributes.push(Attribute::new(
2736 "msid",
2737 Some(format!("{} {}", stream_id, track_id)),
2738 ));
2739 }
2740
2741 section.attributes.push(Attribute::new(
2742 "ssrc",
2743 Some(format!("{} cname:{}", ssrc, cname)),
2744 ));
2745
2746 if *mode == TransportMode::WebRtc {
2747 section.attributes.push(Attribute::new(
2748 "ssrc",
2749 Some(format!("{} msid:{} {}", ssrc, stream_id, track_id)),
2750 ));
2751 }
2752 }
2753
2754 fn ensure_mid(&self, transceiver: &Arc<RtpTransceiver>) -> String {
2755 if let Some(mid) = transceiver.mid() {
2756 return mid;
2757 }
2758 let mid_value = self.allocate_mid();
2759 trace!(
2760 "Allocated MID: {} for transceiver kind={:?}",
2761 mid_value,
2762 transceiver.kind()
2763 );
2764 transceiver.set_mid(mid_value.clone());
2765 mid_value
2766 }
2767
2768 fn allocate_mid(&self) -> String {
2769 let mid = self.next_mid.fetch_add(1, Ordering::SeqCst);
2770 mid.to_string()
2771 }
2772
2773 fn validate_sdp_type(&self, sdp_type: &SdpType) -> RtcResult<()> {
2774 match sdp_type {
2775 SdpType::Offer | SdpType::Answer => Ok(()),
2776 _ => Err(RtcError::NotImplemented("pranswer/rollback")),
2777 }
2778 }
2779
2780 fn populate_media_capabilities(
2781 &self,
2782 section: &mut MediaSection,
2783 kind: MediaKind,
2784 sdp_type: SdpType,
2785 ) {
2786 section.apply_config(&self.config);
2787
2788 if kind == MediaKind::Video {
2790 let (mut rid_id, mut repaired_rid_id) = self.get_remote_video_extmap_ids(§ion.mid);
2791
2792 if sdp_type == SdpType::Offer && self.config.transport_mode != TransportMode::Rtp {
2793 if rid_id.is_none() {
2795 rid_id = Some("1".to_string());
2796 }
2797 if repaired_rid_id.is_none() {
2798 repaired_rid_id = Some("2".to_string());
2799 }
2800 }
2801
2802 section.add_video_extmaps(rid_id, repaired_rid_id);
2803 }
2804
2805 let mut abs_send_time_id =
2807 self.get_remote_extmap_id(§ion.mid, crate::sdp::ABS_SEND_TIME_URI);
2808 if sdp_type == SdpType::Offer
2809 && abs_send_time_id.is_none()
2810 && self.config.transport_mode != TransportMode::Rtp
2811 {
2812 abs_send_time_id = Some("3".to_string()); }
2814 if let Some(id) = abs_send_time_id {
2815 section.attributes.push(crate::sdp::Attribute::new(
2816 "extmap",
2817 Some(format!("{} {}", id, crate::sdp::ABS_SEND_TIME_URI)),
2818 ));
2819 }
2820
2821 if self.config.transport_mode != TransportMode::Rtp {
2822 let setup_value = match sdp_type {
2823 SdpType::Offer => "actpass",
2824 SdpType::Answer => {
2825 let role = *self.dtls_role.borrow();
2826 match role {
2827 Some(true) => "active",
2828 Some(false) => "passive",
2829 None => "active",
2830 }
2831 }
2832 _ => "actpass",
2833 };
2834 section.add_dtls_attributes(&self.dtls_fingerprint, setup_value);
2835 }
2836 }
2837
2838 fn get_remote_video_extmap_ids(&self, mid: &str) -> (Option<String>, Option<String>) {
2839 let rid_id =
2840 self.get_remote_extmap_id(mid, "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id");
2841 let repaired_rid_id = self.get_remote_extmap_id(
2842 mid,
2843 "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
2844 );
2845 (rid_id, repaired_rid_id)
2846 }
2847
2848 fn get_remote_extmap_id(&self, mid: &str, uri: &str) -> Option<String> {
2849 let remote = self.remote_description.lock().unwrap();
2850 if let Some(desc) = &*remote {
2851 let remote_section = desc.media_sections.iter().find(|s| s.mid == mid)?;
2852 for attr in &remote_section.attributes {
2853 if attr.key != "extmap" {
2854 continue;
2855 }
2856 let val = attr.value.as_ref()?;
2857 if val.contains(uri) {
2858 if let Some(id_str) = val.split_whitespace().next() {
2859 return Some(id_str.to_string());
2860 }
2861 }
2862 }
2863 }
2864 None
2865 }
2866
2867 fn close(&self) {
2868 if *self.peer_state.borrow() == PeerConnectionState::Closed {
2869 return;
2870 }
2871 let _ = self.signaling_state.send(SignalingState::Closed);
2872 let _ = self.peer_state.send(PeerConnectionState::Closed);
2873 let _ = self.ice_connection_state.send(IceConnectionState::Closed);
2874 let _ = self.ice_gathering_state.send(IceGatheringState::Complete);
2875
2876 {
2878 let transceivers = self.transceivers.lock().unwrap();
2879 for t in transceivers.iter() {
2880 if let Some(receiver) = t.receiver() {
2882 let track = receiver.track();
2883 track.stop();
2884 tracing::debug!(
2885 "PeerConnection.close: marked receiver track {} as ended",
2886 track.id()
2887 );
2888 }
2889 }
2890 }
2891
2892 let rtp_transport = self.rtp_transport.lock().unwrap().clone();
2894 if let Some(transport) = rtp_transport.as_ref() {
2895 let count = transport.clear_listeners();
2896 if count > 0 {
2897 tracing::debug!("PeerConnection.close: cleared {} listeners", count);
2898 }
2899
2900 let transceivers = self.transceivers.lock().unwrap();
2902 let mut ssrcs = Vec::new();
2903 for t in transceivers.iter() {
2904 if let Some(sender) = t.sender() {
2905 ssrcs.push(sender.ssrc());
2906 }
2907 }
2908 if !ssrcs.is_empty() {
2909 let bye = crate::rtp::RtcpPacket::Goodbye(crate::rtp::Goodbye {
2910 sources: ssrcs,
2911 reason: Some("PeerConnection closed".to_string()),
2912 });
2913 let transport_clone = transport.clone();
2914 tokio::spawn(async move {
2915 let _ = transport_clone.send_rtcp(&[bye]).await;
2916 });
2917 }
2918 }
2919
2920 if let Some(dtls) = self.dtls_transport.lock().unwrap().as_ref() {
2921 dtls.close();
2922 }
2923
2924 self.ice_transport.stop();
2925 }
2926}
2927
2928impl Drop for PeerConnectionInner {
2929 fn drop(&mut self) {
2930 debug!("PeerConnectionInner dropped, stopping ICE transport");
2931 self.close();
2932 }
2933}
2934
2935fn default_origin() -> Origin {
2936 let mut origin = Origin::default();
2937 let now = SystemTime::now()
2938 .duration_since(UNIX_EPOCH)
2939 .unwrap_or_default()
2940 .as_secs();
2941 origin.session_id = now;
2942 origin.session_version = now;
2943 if let Ok(ip) = get_local_ip() {
2944 origin.unicast_address = ip.to_string();
2945 }
2946 origin
2947}
2948
2949#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2950pub enum PeerConnectionState {
2951 New,
2952 Connecting,
2953 Connected,
2954 Disconnected,
2955 Failed,
2956 Closed,
2957}
2958
2959#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2960pub enum SignalingState {
2961 Stable,
2962 HaveLocalOffer,
2963 HaveRemoteOffer,
2964 Closed,
2965}
2966
2967#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2968pub enum IceConnectionState {
2969 New,
2970 Checking,
2971 Connected,
2972 Completed,
2973 Failed,
2974 Disconnected,
2975 Closed,
2976}
2977
2978#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2979pub enum IceGatheringState {
2980 New,
2981 Gathering,
2982 Complete,
2983}
2984
2985#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2986pub enum TransceiverDirection {
2987 #[default]
2988 SendRecv,
2989 SendOnly,
2990 RecvOnly,
2991 Inactive,
2992}
2993
2994impl TransceiverDirection {
2995 pub fn answer_direction(self) -> Self {
2996 match self {
2997 TransceiverDirection::SendRecv => TransceiverDirection::SendRecv,
2998 TransceiverDirection::SendOnly => TransceiverDirection::RecvOnly,
2999 TransceiverDirection::RecvOnly => TransceiverDirection::SendOnly,
3000 TransceiverDirection::Inactive => TransceiverDirection::Inactive,
3001 }
3002 }
3003
3004 pub fn sends(self) -> bool {
3005 matches!(
3006 self,
3007 TransceiverDirection::SendRecv | TransceiverDirection::SendOnly
3008 )
3009 }
3010}
3011
3012impl From<TransceiverDirection> for Direction {
3013 fn from(value: TransceiverDirection) -> Self {
3014 match value {
3015 TransceiverDirection::SendRecv => Direction::SendRecv,
3016 TransceiverDirection::SendOnly => Direction::SendOnly,
3017 TransceiverDirection::RecvOnly => Direction::RecvOnly,
3018 TransceiverDirection::Inactive => Direction::Inactive,
3019 }
3020 }
3021}
3022
3023impl From<Direction> for TransceiverDirection {
3024 fn from(value: Direction) -> Self {
3025 match value {
3026 Direction::SendRecv => TransceiverDirection::SendRecv,
3027 Direction::SendOnly => TransceiverDirection::SendOnly,
3028 Direction::RecvOnly => TransceiverDirection::RecvOnly,
3029 Direction::Inactive => TransceiverDirection::Inactive,
3030 }
3031 }
3032}
3033
3034static TRANSCEIVER_COUNTER: AtomicU64 = AtomicU64::new(1);
3035
3036#[derive(Debug, Clone, PartialEq)]
3037pub struct RtpCodecParameters {
3038 pub payload_type: u8,
3039 pub clock_rate: u32,
3040 pub channels: u8,
3041}
3042
3043impl Default for RtpCodecParameters {
3044 fn default() -> Self {
3045 Self {
3046 payload_type: 96,
3047 clock_rate: 90000,
3048 channels: 0,
3049 }
3050 }
3051}
3052
3053pub struct RtpTransceiver {
3054 id: u64,
3055 kind: MediaKind,
3056 direction: Mutex<TransceiverDirection>,
3057 mid: Mutex<Option<String>>,
3058 sender: Mutex<Option<Arc<RtpSender>>>,
3059 receiver: Mutex<Option<Arc<RtpReceiver>>>,
3060 rtp_transport: Mutex<Option<Weak<RtpTransport>>>,
3061 sender_ssrc: Mutex<Option<u32>>,
3062 sender_stream_id: Mutex<Option<String>>,
3063 sender_track_id: Mutex<Option<String>>,
3064 payload_map: Arc<std::sync::RwLock<HashMap<u8, RtpCodecParameters>>>,
3065 extmap: Arc<std::sync::RwLock<HashMap<u8, String>>>,
3066}
3067
3068impl RtpTransceiver {
3069 fn new(kind: MediaKind, direction: TransceiverDirection) -> Self {
3070 Self {
3071 id: TRANSCEIVER_COUNTER.fetch_add(1, Ordering::Relaxed),
3072 kind,
3073 direction: Mutex::new(direction),
3074 mid: Mutex::new(None),
3075 sender: Mutex::new(None),
3076 receiver: Mutex::new(None),
3077 rtp_transport: Mutex::new(None),
3078 sender_ssrc: Mutex::new(None),
3079 sender_stream_id: Mutex::new(None),
3080 sender_track_id: Mutex::new(None),
3081 payload_map: Arc::new(std::sync::RwLock::new(HashMap::new())),
3082 extmap: Arc::new(std::sync::RwLock::new(HashMap::new())),
3083 }
3084 }
3085
3086 #[doc(hidden)]
3088 pub fn new_for_test(kind: MediaKind, direction: TransceiverDirection) -> Self {
3089 Self::new(kind, direction)
3090 }
3091
3092 pub fn id(&self) -> u64 {
3093 self.id
3094 }
3095
3096 pub fn kind(&self) -> MediaKind {
3097 self.kind
3098 }
3099
3100 pub fn sender_ssrc(&self) -> Option<u32> {
3101 *self.sender_ssrc.lock().unwrap()
3102 }
3103
3104 pub fn sender_stream_id(&self) -> Option<String> {
3105 self.sender_stream_id.lock().unwrap().clone()
3106 }
3107
3108 pub fn sender_track_id(&self) -> Option<String> {
3109 self.sender_track_id.lock().unwrap().clone()
3110 }
3111
3112 pub fn direction(&self) -> TransceiverDirection {
3113 *self.direction.lock().unwrap()
3114 }
3115
3116 pub fn set_direction(&self, direction: TransceiverDirection) {
3117 *self.direction.lock().unwrap() = direction;
3118 }
3119
3120 pub fn mid(&self) -> Option<String> {
3121 self.mid.lock().unwrap().clone()
3122 }
3123
3124 fn set_mid(&self, mid: String) {
3125 *self.mid.lock().unwrap() = Some(mid);
3126 }
3127
3128 pub fn sender(&self) -> Option<Arc<RtpSender>> {
3129 self.sender.lock().unwrap().clone()
3130 }
3131
3132 pub fn set_sender(&self, sender: Option<Arc<RtpSender>>) {
3133 if let Some(ref s) = sender {
3134 if let Some(weak_transport) = self.rtp_transport.lock().unwrap().as_ref() {
3136 if let Some(transport) = weak_transport.upgrade() {
3137 debug!(
3138 "set_sender: connecting late sender ssrc={} to existing transport",
3139 s.ssrc()
3140 );
3141 s.set_transport(transport);
3142 }
3143 }
3144 *self.sender_ssrc.lock().unwrap() = Some(s.ssrc());
3146 *self.sender_stream_id.lock().unwrap() = Some(s.stream_id().to_string());
3147 *self.sender_track_id.lock().unwrap() = Some(s.track_id().to_string());
3148 }
3149 *self.sender.lock().unwrap() = sender;
3150 }
3151
3152 pub fn set_rtp_transport(&self, transport: Weak<RtpTransport>) {
3154 *self.rtp_transport.lock().unwrap() = Some(transport);
3155 }
3156
3157 pub fn receiver(&self) -> Option<Arc<RtpReceiver>> {
3158 self.receiver.lock().unwrap().clone()
3159 }
3160
3161 pub fn set_receiver(&self, receiver: Option<Arc<RtpReceiver>>) {
3162 *self.receiver.lock().unwrap() = receiver;
3163 }
3164
3165 pub fn update_payload_map(&self, new_map: HashMap<u8, RtpCodecParameters>) -> RtcResult<()> {
3167 let mut payload_map = self.payload_map.write().unwrap();
3168
3169 for (pt, codec) in &new_map {
3171 if !payload_map.contains_key(pt) || payload_map.get(pt) != Some(codec) {
3172 trace!(
3173 "Payload type {} remapped: clock_rate={}, channels={}",
3174 pt, codec.clock_rate, codec.channels
3175 );
3176 }
3177 }
3178
3179 *payload_map = new_map.clone();
3180
3181 if let Some(receiver) = self.receiver() {
3183 if let Some(transport_weak) = self.rtp_transport.lock().unwrap().clone() {
3184 if let Some(transport) = transport_weak.upgrade() {
3185 if let Some(tx) = receiver.packet_tx() {
3186 for (&pt, _) in &new_map {
3187 transport.register_pt_listener(pt, tx.clone());
3188 }
3189 }
3190 }
3191 }
3192 }
3193
3194 Ok(())
3195 }
3196
3197 pub fn update_extmap(&self, new_extmap: HashMap<u8, String>) -> RtcResult<()> {
3199 let mut extmap = self.extmap.write().unwrap();
3200
3201 for (id, uri) in &new_extmap {
3203 if !extmap.contains_key(id) || extmap.get(id) != Some(uri) {
3204 trace!("Extmap ID {} remapped to {}", id, uri);
3205 }
3206 }
3207
3208 *extmap = new_extmap;
3209
3210 if let Some(weak_transport) = self.rtp_transport.lock().unwrap().as_ref() {
3212 if let Some(transport) = weak_transport.upgrade() {
3213 let id = extmap
3214 .iter()
3215 .find(|(_, uri)| uri.as_str() == crate::sdp::ABS_SEND_TIME_URI)
3216 .map(|(id, _)| *id);
3217 transport.set_abs_send_time_extension_id(id);
3218
3219 let id = extmap
3220 .iter()
3221 .find(|(_, uri)| uri.contains("rtp-stream-id"))
3222 .map(|(id, _)| *id);
3223 transport.set_rid_extension_id(id);
3224 }
3225 }
3226
3227 Ok(())
3228 }
3229
3230 pub fn get_payload_map(&self) -> HashMap<u8, RtpCodecParameters> {
3232 self.payload_map.read().unwrap().clone()
3233 }
3234
3235 pub fn get_extmap(&self) -> HashMap<u8, String> {
3237 self.extmap.read().unwrap().clone()
3238 }
3239}
3240
3241pub struct RtpSender {
3242 track: Arc<dyn MediaStreamTrack>,
3243 transport: Mutex<Option<Arc<RtpTransport>>>,
3244 ssrc: u32,
3245 params: Arc<Mutex<RtpCodecParameters>>,
3246 track_id: Arc<str>,
3247 stream_id: Arc<str>,
3248 cname: Arc<str>,
3249 rtcp_tx: broadcast::Sender<RtcpPacket>,
3250 stop_tx: Arc<tokio::sync::Notify>,
3251 next_sequence_number: Arc<AtomicU16>,
3252 interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3253}
3254
3255pub struct RtpSenderBuilder {
3256 track: Arc<dyn MediaStreamTrack>,
3257 ssrc: u32,
3258 stream_id: String,
3259 params: RtpCodecParameters,
3260 interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3261}
3262
3263impl RtpSenderBuilder {
3264 pub fn new(track: Arc<dyn MediaStreamTrack>, ssrc: u32) -> Self {
3265 Self {
3266 track,
3267 ssrc,
3268 stream_id: "stream".to_string(),
3269 params: RtpCodecParameters::default(),
3270 interceptors: Vec::new(),
3271 }
3272 }
3273
3274 pub fn stream_id(mut self, id: String) -> Self {
3275 self.stream_id = id;
3276 self
3277 }
3278
3279 pub fn params(mut self, params: RtpCodecParameters) -> Self {
3280 self.params = params;
3281 self
3282 }
3283
3284 pub fn nack(mut self, buffer_size: usize) -> Self {
3285 self.interceptors
3286 .push(Arc::new(DefaultRtpSenderNackHandler::new(buffer_size)));
3287 self
3288 }
3289
3290 pub fn bitrate_controller(mut self) -> Self {
3291 self.interceptors
3292 .push(Arc::new(DefaultRtpSenderBitrateHandler::new()));
3293 self
3294 }
3295
3296 pub fn interceptor(mut self, interceptor: Arc<dyn RtpSenderInterceptor>) -> Self {
3297 self.interceptors.push(interceptor);
3298 self
3299 }
3300
3301 pub fn build(self) -> Arc<RtpSender> {
3302 Arc::new(RtpSender::new_internal(
3303 self.track,
3304 self.ssrc,
3305 self.stream_id,
3306 self.params,
3307 self.interceptors,
3308 ))
3309 }
3310}
3311
3312impl RtpSender {
3313 pub fn builder(track: Arc<dyn MediaStreamTrack>, ssrc: u32) -> RtpSenderBuilder {
3314 RtpSenderBuilder::new(track, ssrc)
3315 }
3316
3317 pub fn new(
3318 track: Arc<dyn MediaStreamTrack>,
3319 ssrc: u32,
3320 stream_id: String,
3321 params: RtpCodecParameters,
3322 interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3323 ) -> Self {
3324 Self::new_internal(track, ssrc, stream_id, params, interceptors)
3325 }
3326
3327 fn new_internal(
3328 track: Arc<dyn MediaStreamTrack>,
3329 ssrc: u32,
3330 stream_id: String,
3331 params: RtpCodecParameters,
3332 interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3333 ) -> Self {
3334 let track_label = track.id().to_string();
3335 let track_id = Arc::<str>::from(track_label.clone());
3336 let stream_id = Arc::<str>::from(stream_id);
3337 let cname = Arc::<str>::from(format!("rustrtc-cname-{ssrc}"));
3338 let (rtcp_tx, _) = broadcast::channel(100);
3339 Self {
3340 track,
3341 transport: Mutex::new(None),
3342 ssrc,
3343 params: Arc::new(Mutex::new(params)),
3344 track_id,
3345 stream_id,
3346 cname,
3347 rtcp_tx,
3348 stop_tx: Arc::new(tokio::sync::Notify::new()),
3349 next_sequence_number: Arc::new(AtomicU16::new(random_u32() as u16)),
3350 interceptors,
3351 }
3352 }
3353
3354 pub fn ssrc(&self) -> u32 {
3355 self.ssrc
3356 }
3357
3358 pub fn cname(&self) -> &str {
3359 &self.cname
3360 }
3361
3362 pub fn track_id(&self) -> &str {
3363 &self.track_id
3364 }
3365
3366 pub fn stream_id(&self) -> &str {
3367 &self.stream_id
3368 }
3369
3370 pub fn subscribe_rtcp(&self) -> broadcast::Receiver<RtcpPacket> {
3371 self.rtcp_tx.subscribe()
3372 }
3373
3374 pub(crate) fn deliver_rtcp(&self, packet: RtcpPacket) {
3375 let _ = self.rtcp_tx.send(packet);
3376 }
3377
3378 pub fn params(&self) -> RtpCodecParameters {
3379 self.params.lock().unwrap().clone()
3380 }
3381
3382 pub fn interceptors(&self) -> &[Arc<dyn RtpSenderInterceptor + Send + Sync>] {
3383 &self.interceptors
3384 }
3385
3386 pub fn nack_handler(&self) -> Option<Arc<dyn NackStats>> {
3387 for i in &self.interceptors {
3388 if let Some(stats) = i.clone().as_nack_stats() {
3389 return Some(stats);
3390 }
3391 }
3392 None
3393 }
3394
3395 pub fn set_transport(&self, transport: Arc<RtpTransport>) {
3396 *self.transport.lock().unwrap() = Some(transport.clone());
3397 let track = self.track.clone();
3398 let ssrc = self.ssrc;
3399 let params_lock = self.params.clone();
3400 let stop_rx = self.stop_tx.clone();
3401 let next_seq = self.next_sequence_number.clone();
3402 let interceptors = self.interceptors.clone();
3403 let mut rtcp_rx = self.rtcp_tx.subscribe();
3404
3405 tokio::spawn(async move {
3406 let mut sequence_number = next_seq.load(Ordering::SeqCst);
3407 let mut last_source_ts: Option<u32> = None;
3408 let mut timestamp_offset = random_u32(); let notified = stop_rx.notified();
3410 tokio::pin!(notified);
3411
3412 loop {
3413 tokio::select! {
3414 _ = &mut notified => break,
3415 rtcp = rtcp_rx.recv() => {
3416 match rtcp {
3417 Ok(packet) => {
3418 for interceptor in &interceptors {
3419 interceptor.on_rtcp_received(&packet, transport.clone()).await;
3420 }
3421 }
3422 _ => {}
3423 }
3424 }
3425 res = track.recv() => {
3426 match res {
3427 Ok(mut sample) => {
3428 let payload_type = {
3429 let p = params_lock.lock().unwrap();
3430 p.payload_type
3431 };
3432
3433 let app_controlled = match &sample {
3435 crate::media::MediaSample::Audio(f) => f.sequence_number.is_some(),
3436 crate::media::MediaSample::Video(f) => f.sequence_number.is_some(),
3437 };
3438
3439 match &mut sample {
3441 crate::media::MediaSample::Audio(f) => f.sequence_number = None,
3442 crate::media::MediaSample::Video(f) => f.sequence_number = None,
3443 }
3444
3445 let mut packet = sample.into_rtp_packet(
3446 ssrc,
3447 payload_type,
3448 &mut sequence_number,
3449 );
3450
3451 next_seq.store(sequence_number, Ordering::SeqCst);
3453
3454 if !app_controlled {
3455 let src_ts = packet.header.timestamp;
3458 if let Some(last_src) = last_source_ts {
3459 let delta = src_ts.wrapping_sub(last_src);
3460 if delta < 0x80000000 {
3462 if delta > 900_000 {
3465 timestamp_offset = last_src.wrapping_add(timestamp_offset).wrapping_add(3000).wrapping_sub(src_ts);
3475 }
3476 last_source_ts = Some(src_ts);
3477 }
3478 } else {
3481 last_source_ts = Some(src_ts);
3486 }
3487
3488 packet.header.timestamp = src_ts.wrapping_add(timestamp_offset);
3489
3490 packet.header.sequence_number = next_seq.fetch_add(1, Ordering::Relaxed);
3492 }
3493
3494 for interceptor in &interceptors {
3495 interceptor.on_packet_sent(&packet).await;
3496 }
3497
3498 if let Err(e) = transport.send_rtp(&packet).await {
3499 debug!("Failed to send RTP: {}", e);
3500 }
3501 }
3502 Err(crate::media::error::MediaError::Lagged) => {
3503 debug!("RtpSender: track lagged, skipping sample");
3504 continue;
3505 }
3506 Err(_) => break,
3507 }
3508 }
3509 }
3510 }
3511 });
3512 }
3513}
3514
3515impl Drop for RtpSender {
3516 fn drop(&mut self) {
3517 self.stop_tx.notify_waiters();
3518 }
3519}
3520
3521pub struct RtpReceiver {
3522 track: Arc<SampleStreamTrack>,
3523 source: Arc<SampleStreamSource>,
3524 ssrc: Mutex<u32>,
3525 params: Mutex<RtpCodecParameters>,
3526 transport: Mutex<Option<Arc<RtpTransport>>>,
3527 packet_tx: Mutex<Option<mpsc::Sender<(crate::rtp::RtpPacket, std::net::SocketAddr)>>>,
3528 rtcp_feedback_ssrc: Mutex<Option<u32>>,
3529 rtx_ssrc: Mutex<Option<u32>>,
3530 fir_seq: AtomicU8,
3531 feedback_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
3532 simulcast_tracks: Mutex<
3533 HashMap<
3534 String,
3535 (
3536 Arc<SampleStreamSource>,
3537 Arc<SampleStreamTrack>,
3538 Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
3539 Arc<Mutex<Option<u32>>>,
3540 ),
3541 >,
3542 >,
3543 runner_tx: Mutex<Option<mpsc::UnboundedSender<ReceiverCommand>>>,
3544 interceptors: Vec<Arc<dyn RtpReceiverInterceptor>>,
3545 track_ready_event_tx: Mutex<Option<mpsc::UnboundedSender<PeerConnectionEvent>>>,
3546 track_ready_transceiver: Mutex<Option<Weak<RtpTransceiver>>>,
3547 track_event_sent: AtomicBool,
3548 pub depacketizer_factory: Arc<dyn DepacketizerFactory>,
3549}
3550
3551pub struct RtpReceiverBuilder {
3552 kind: MediaKind,
3553 ssrc: u32,
3554 interceptors: Vec<Arc<dyn RtpReceiverInterceptor>>,
3555 depacketizer_factory: Option<Arc<dyn DepacketizerFactory>>,
3556}
3557
3558impl RtpReceiverBuilder {
3559 pub fn new(kind: MediaKind, ssrc: u32) -> Self {
3560 Self {
3561 kind,
3562 ssrc,
3563 interceptors: Vec::new(),
3564 depacketizer_factory: None,
3565 }
3566 }
3567
3568 pub fn depacketizer_factory(mut self, factory: Arc<dyn DepacketizerFactory>) -> Self {
3569 self.depacketizer_factory = Some(factory);
3570 self
3571 }
3572
3573 pub fn nack(mut self) -> Self {
3574 self.interceptors
3575 .push(Arc::new(DefaultRtpReceiverNackHandler::new()));
3576 self
3577 }
3578
3579 pub fn interceptor(mut self, interceptor: Arc<dyn RtpReceiverInterceptor>) -> Self {
3580 self.interceptors.push(interceptor);
3581 self
3582 }
3583
3584 pub fn build(self) -> Arc<RtpReceiver> {
3585 let media_kind = match self.kind {
3586 MediaKind::Audio => crate::media::frame::MediaKind::Audio,
3587 MediaKind::Video => crate::media::frame::MediaKind::Video,
3588 _ => crate::media::frame::MediaKind::Audio,
3589 };
3590 let (source, track, feedback_rx) = sample_track(media_kind, 100);
3591
3592 let params = match self.kind {
3593 MediaKind::Audio => RtpCodecParameters {
3594 payload_type: 111,
3595 clock_rate: 48000,
3596 channels: 2,
3597 },
3598 MediaKind::Video => RtpCodecParameters {
3599 payload_type: 96,
3600 clock_rate: 90000,
3601 channels: 0,
3602 },
3603 _ => RtpCodecParameters::default(),
3604 };
3605
3606 Arc::new(RtpReceiver {
3607 track,
3608 source: Arc::new(source),
3609 ssrc: Mutex::new(self.ssrc),
3610 params: Mutex::new(params),
3611 transport: Mutex::new(None),
3612 packet_tx: Mutex::new(None),
3613 rtcp_feedback_ssrc: Mutex::new(None),
3614 rtx_ssrc: Mutex::new(None),
3615 fir_seq: AtomicU8::new(0),
3616 feedback_rx: Arc::new(tokio::sync::Mutex::new(feedback_rx)),
3617 simulcast_tracks: Mutex::new(HashMap::new()),
3618 runner_tx: Mutex::new(None),
3619 interceptors: self.interceptors,
3620 track_ready_event_tx: Mutex::new(None),
3621 track_ready_transceiver: Mutex::new(None),
3622 track_event_sent: AtomicBool::new(false),
3623 depacketizer_factory: self.depacketizer_factory.unwrap_or_else(|| {
3624 Arc::new(crate::media::depacketizer::DefaultDepacketizerFactory)
3625 }),
3626 })
3627 }
3628}
3629
3630impl RtpReceiver {
3631 pub fn new(
3632 kind: MediaKind,
3633 ssrc: u32,
3634 interceptors: Vec<Arc<dyn RtpReceiverInterceptor>>,
3635 ) -> Self {
3636 let media_kind = match kind {
3637 MediaKind::Audio => crate::media::frame::MediaKind::Audio,
3638 MediaKind::Video => crate::media::frame::MediaKind::Video,
3639 _ => crate::media::frame::MediaKind::Audio, };
3641 let (source, track, feedback_rx) = sample_track(media_kind, 100);
3642
3643 let params = match kind {
3644 MediaKind::Audio => RtpCodecParameters {
3645 payload_type: 111,
3646 clock_rate: 48000,
3647 channels: 2,
3648 },
3649 MediaKind::Video => RtpCodecParameters {
3650 payload_type: 96,
3651 clock_rate: 90000,
3652 channels: 0,
3653 },
3654 _ => RtpCodecParameters::default(),
3655 };
3656
3657 Self {
3658 track,
3659 source: Arc::new(source),
3660 ssrc: Mutex::new(ssrc),
3661 params: Mutex::new(params),
3662 transport: Mutex::new(None),
3663 packet_tx: Mutex::new(None),
3664 rtcp_feedback_ssrc: Mutex::new(None),
3665 rtx_ssrc: Mutex::new(None),
3666 fir_seq: AtomicU8::new(0),
3667 feedback_rx: Arc::new(tokio::sync::Mutex::new(feedback_rx)),
3668 simulcast_tracks: Mutex::new(HashMap::new()),
3669 runner_tx: Mutex::new(None),
3670 interceptors,
3671 track_ready_event_tx: Mutex::new(None),
3672 track_ready_transceiver: Mutex::new(None),
3673 track_event_sent: AtomicBool::new(false),
3674 depacketizer_factory: Arc::new(crate::media::depacketizer::DefaultDepacketizerFactory),
3675 }
3676 }
3677
3678 pub fn add_simulcast_track(self: &Arc<Self>, rid: String) -> Arc<SampleStreamTrack> {
3679 let (source, track, feedback_rx) = sample_track(self.track.kind(), 100);
3680 let source = Arc::new(source);
3681 let feedback_rx = Arc::new(tokio::sync::Mutex::new(feedback_rx));
3682 let simulcast_ssrc = Arc::new(Mutex::new(None));
3683
3684 let runner_tx = self.runner_tx.lock().unwrap().clone();
3686 if let Some(tx) = runner_tx {
3687 let transport = self.transport.lock().unwrap().clone();
3688 if let Some(transport) = transport {
3689 let (packet_tx, packet_rx) = mpsc::channel(100);
3690 transport.register_rid_listener(rid.clone(), packet_tx);
3691
3692 let cmd = ReceiverCommand::AddTrack {
3693 rid: Some(rid.clone()),
3694 packet_rx,
3695 feedback_rx: feedback_rx.clone(),
3696 source: source.clone(),
3697 simulcast_ssrc: simulcast_ssrc.clone(),
3698 };
3699 let _ = tx.send(cmd);
3700 }
3701 }
3702
3703 self.simulcast_tracks
3704 .lock()
3705 .unwrap()
3706 .insert(rid, (source, track.clone(), feedback_rx, simulcast_ssrc));
3707
3708 track
3709 }
3710
3711 pub fn track(&self) -> Arc<SampleStreamTrack> {
3712 self.track.clone()
3713 }
3714
3715 pub fn nack_handler(&self) -> Option<Arc<dyn NackStats>> {
3716 for i in &self.interceptors {
3717 if let Some(stats) = i.clone().as_nack_stats() {
3718 return Some(stats);
3719 }
3720 }
3721 None
3722 }
3723
3724 pub fn simulcast_track(&self, rid: &str) -> Option<Arc<SampleStreamTrack>> {
3725 let tracks = self.simulcast_tracks.lock().unwrap();
3726 tracks.get(rid).map(|(_, track, _, _)| track.clone())
3727 }
3728
3729 pub fn get_simulcast_rids(&self) -> Vec<String> {
3730 let tracks = self.simulcast_tracks.lock().unwrap();
3731 tracks.keys().cloned().collect()
3732 }
3733
3734 pub fn set_params(&self, params: RtpCodecParameters) {
3735 *self.params.lock().unwrap() = params;
3736 }
3737
3738 pub fn ssrc(&self) -> u32 {
3739 *self.ssrc.lock().unwrap()
3740 }
3741
3742 pub fn packet_tx(&self) -> Option<mpsc::Sender<(crate::rtp::RtpPacket, std::net::SocketAddr)>> {
3743 self.packet_tx.lock().unwrap().clone()
3744 }
3745
3746 pub fn rtx_ssrc(&self) -> Option<u32> {
3747 *self.rtx_ssrc.lock().unwrap()
3748 }
3749
3750 pub fn set_ssrc(&self, ssrc: u32) {
3751 *self.ssrc.lock().unwrap() = ssrc;
3752 let transport = self.transport.lock().unwrap().clone();
3753 let packet_tx = self.packet_tx.lock().unwrap().clone();
3754
3755 if let Some(transport) = transport
3756 && let Some(tx) = packet_tx
3757 {
3758 transport.register_listener_sync(ssrc, tx);
3759 }
3760 }
3761
3762 pub fn ensure_provisional_listener(&self) {
3763 let transport = self.transport.lock().unwrap().clone();
3764 let packet_tx = self.packet_tx.lock().unwrap().clone();
3765
3766 if let Some(transport) = transport
3767 && let Some(tx) = packet_tx
3768 {
3769 transport.register_provisional_listener(tx);
3770 }
3771 }
3772
3773 pub fn set_rtx_ssrc(&self, ssrc: u32) {
3774 *self.rtx_ssrc.lock().unwrap() = Some(ssrc);
3775 }
3776
3777 pub fn set_transport(
3778 self: &Arc<Self>,
3779 transport: Arc<RtpTransport>,
3780 event_tx: Option<mpsc::UnboundedSender<PeerConnectionEvent>>,
3781 transceiver: Option<Weak<RtpTransceiver>>,
3782 ) {
3783 *self.transport.lock().unwrap() = Some(transport.clone());
3784 *self.track_ready_event_tx.lock().unwrap() = event_tx;
3785 *self.track_ready_transceiver.lock().unwrap() = transceiver;
3786
3787 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3788 *self.runner_tx.lock().unwrap() = Some(cmd_tx);
3789
3790 let mut initial_tracks = Vec::new();
3791
3792 let (tx, rx) = mpsc::channel(2000);
3794 let ssrc = *self.ssrc.lock().unwrap();
3795 transport.register_listener_sync(ssrc, tx.clone());
3796 transport.register_provisional_listener(tx.clone());
3797
3798 let pt = self.params.lock().unwrap().payload_type;
3800 transport.register_pt_listener(pt, tx.clone());
3801
3802 *self.packet_tx.lock().unwrap() = Some(tx);
3803
3804 initial_tracks.push(ReceiverCommand::AddTrack {
3805 rid: None,
3806 packet_rx: rx,
3807 feedback_rx: self.feedback_rx.clone(),
3808 source: self.source.clone(),
3809 simulcast_ssrc: Arc::new(Mutex::new(None)),
3810 });
3811
3812 let tracks_guard = self.simulcast_tracks.lock().unwrap();
3814 for (rid, (source, _, feedback_rx, simulcast_ssrc)) in tracks_guard.iter() {
3815 let (tx, rx) = mpsc::channel(2000);
3816 transport.register_rid_listener(rid.clone(), tx);
3817 initial_tracks.push(ReceiverCommand::AddTrack {
3818 rid: Some(rid.clone()),
3819 packet_rx: rx,
3820 feedback_rx: feedback_rx.clone(),
3821 source: source.clone(),
3822 simulcast_ssrc: simulcast_ssrc.clone(),
3823 });
3824 }
3825 drop(tracks_guard);
3826
3827 let weak_self = Arc::downgrade(self);
3828 tokio::spawn(async move {
3829 Self::run_loop(weak_self, cmd_rx, initial_tracks).await;
3830 });
3831 }
3832
3833 async fn run_loop(
3834 weak_self: Weak<Self>,
3835 mut cmd_rx: mpsc::UnboundedReceiver<ReceiverCommand>,
3836 initial_tracks: Vec<ReceiverCommand>,
3837 ) {
3838 let depacketizer_factory = if let Some(receiver) = weak_self.upgrade() {
3839 receiver.depacketizer_factory.clone()
3840 } else {
3841 Arc::new(crate::media::depacketizer::DefaultDepacketizerFactory)
3842 };
3843
3844 let mut futures = FuturesUnordered::new();
3845 let mut tracks = HashMap::new();
3846
3847 fn handle_add_track(
3848 cmd: ReceiverCommand,
3849 futures: &mut FuturesUnordered<Pin<Box<dyn Future<Output = LoopEvent> + Send>>>,
3850 tracks: &mut HashMap<
3851 Option<String>,
3852 (
3853 Arc<crate::media::track::SampleStreamSource>,
3854 Arc<Mutex<Option<u32>>>,
3855 Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
3856 ),
3857 >,
3858 depacketizer_factory: &Arc<dyn DepacketizerFactory>,
3859 ) {
3860 let ReceiverCommand::AddTrack {
3861 rid,
3862 packet_rx,
3863 feedback_rx,
3864 source,
3865 simulcast_ssrc,
3866 } = cmd;
3867
3868 tracks.insert(
3869 rid.clone(),
3870 (source.clone(), simulcast_ssrc, feedback_rx.clone()),
3871 );
3872
3873 let rid_clone = rid.clone();
3874 let depacketizer = depacketizer_factory.create(source.kind());
3876
3877 futures.push(Box::pin(async move {
3878 let mut rx = packet_rx;
3879 let packet = rx.recv().await;
3880 LoopEvent::Packet(packet, rid_clone, rx, depacketizer)
3881 }));
3882
3883 let rid_clone = rid.clone();
3884 futures.push(Box::pin(async move {
3885 let event = {
3886 let mut lock = feedback_rx.lock().await;
3887 lock.recv().await
3888 };
3889 LoopEvent::Feedback(event, rid_clone)
3890 }));
3891 }
3892
3893 for cmd in initial_tracks {
3894 handle_add_track(cmd, &mut futures, &mut tracks, &depacketizer_factory);
3895 }
3896
3897 loop {
3898 tokio::select! {
3899 cmd = cmd_rx.recv() => {
3900 match cmd {
3901 Some(cmd) => handle_add_track(cmd, &mut futures, &mut tracks, &depacketizer_factory),
3902 None => break,
3903 }
3904 }
3905 event = futures.next(), if !futures.is_empty() => {
3906 if let Some(event) = event {
3907 match event {
3908 LoopEvent::Packet(packet_opt, rid, packet_rx, mut depacketizer) => {
3909 if let Some((packet, addr)) = packet_opt {
3910 if let Some((source, simulcast_ssrc, _)) = tracks.get(&rid) {
3911 if rid.is_some() {
3912 let mut s = simulcast_ssrc.lock().unwrap();
3913 if s.is_none() {
3914 *s = Some(packet.header.ssrc);
3915 }
3916 } else {
3917 if let Some(this) = weak_self.upgrade() {
3919 let mut s = this.ssrc.lock().unwrap();
3920 let old_ssrc = *s;
3921 if old_ssrc != packet.header.ssrc {
3922 debug!(
3923 "RTP main track SSRC changed from {} to {}",
3924 old_ssrc, packet.header.ssrc
3925 );
3926 *s = packet.header.ssrc;
3927
3928 if old_ssrc >= 2000 && old_ssrc < 3000 {
3931 if !this.track_event_sent.swap(true, Ordering::SeqCst) {
3933 if let Some(ref event_tx) = *this.track_ready_event_tx.lock().unwrap() {
3934 let transceiver = this.track_ready_transceiver.lock().unwrap();
3935 if let Some(transceiver) = transceiver.as_ref().and_then(|t| t.upgrade()) {
3936 let _ = event_tx.send(PeerConnectionEvent::Track(transceiver.clone()));
3937 debug!("RTP mode: Sent Track event after SSRC latching complete");
3938 }
3939 }
3940 }
3941 }
3942 }
3943 }
3944 }
3945
3946 if let Some(this) = weak_self.upgrade() {
3947 for interceptor in &this.interceptors {
3948 if let Some(mut rtcp_packet) = interceptor.on_packet_received(&packet).await {
3949 if let RtcpPacket::GenericNack(ref mut nack) = rtcp_packet {
3950 let sender_ssrc = this.rtcp_feedback_ssrc.lock().unwrap().unwrap_or(0);
3951 if sender_ssrc != 0 {
3952 nack.sender_ssrc = sender_ssrc;
3953 } else {
3954 debug!("NACK: skipping sender_ssrc update because it is 0");
3955 }
3956 }
3957
3958 let transport = this.transport.lock().unwrap().clone();
3959 if let Some(transport) = transport {
3960 let _ = transport.send_rtcp(&[rtcp_packet]).await;
3961 }
3962 }
3963 }
3964
3965 let params = this.params.lock().unwrap().clone();
3966 let clock_rate = params.clock_rate;
3967
3968 if let Ok(samples) = depacketizer.push(packet.clone(), clock_rate, addr, source.kind()) {
3970 for sample in samples {
3971 if let Err(e) = source.send(sample).await {
3972 tracing::warn!("Failed to send media sample: {}", e);
3973 }
3974 }
3975 }
3976
3977 let rid_clone = rid.clone();
3978 futures.push(Box::pin(async move {
3979 let mut rx = packet_rx;
3980 let packet = rx.recv().await;
3981 LoopEvent::Packet(packet, rid_clone, rx, depacketizer)
3982 }));
3983 } else {
3984 break;
3985 }
3986 }
3987 }
3988 }
3989 LoopEvent::Feedback(event_opt, rid) => {
3990 if let Some(event) = event_opt {
3991 if let Some((_, simulcast_ssrc, feedback_rx)) = tracks.get(&rid) {
3992 if let Some(this) = weak_self.upgrade() {
3993 match event {
3994 crate::media::track::FeedbackEvent::RequestKeyFrame => {
3995 let media_ssrc = if rid.is_some() {
3996 *simulcast_ssrc.lock().unwrap()
3997 } else {
3998 Some(*this.ssrc.lock().unwrap())
3999 };
4000
4001 if let Some(ssrc) = media_ssrc {
4002 let sender_ssrc = *this.rtcp_feedback_ssrc.lock().unwrap();
4003 let pli = crate::rtp::PictureLossIndication {
4004 sender_ssrc: sender_ssrc.unwrap_or(0),
4005 media_ssrc: ssrc,
4006 };
4007 let packet = crate::rtp::RtcpPacket::PictureLossIndication(pli);
4008
4009 let transport = this.transport.lock().unwrap().clone();
4010 if let Some(transport) = transport {
4011 if let Err(e) = transport.send_rtcp(&[packet]).await {
4012 debug!("Failed to send PLI: {}", e);
4013 }
4014 }
4015 }
4016 }
4017 }
4018
4019 let rid_clone = rid.clone();
4020 let feedback_rx = feedback_rx.clone();
4021 futures.push(Box::pin(async move {
4022 let event = {
4023 let mut lock = feedback_rx.lock().await;
4024 lock.recv().await
4025 };
4026 LoopEvent::Feedback(event, rid_clone)
4027 }));
4028 } else {
4029 break;
4030 }
4031 }
4032 }
4033 }
4034 }
4035 }
4036 }
4037 }
4038 }
4039 }
4040
4041 pub fn set_feedback_ssrc(&self, ssrc: u32) {
4042 *self.rtcp_feedback_ssrc.lock().unwrap() = Some(ssrc);
4043 }
4044
4045 pub async fn send_nack(&self, lost_packets: Vec<u16>) -> RtcResult<()> {
4046 let transport = self.transport.lock().unwrap().clone();
4047 if let Some(transport) = transport {
4048 let media_ssrc = *self.ssrc.lock().unwrap();
4049 let sender_ssrc = (*self.rtcp_feedback_ssrc.lock().unwrap()).unwrap_or(media_ssrc);
4050
4051 let nack = crate::rtp::GenericNack {
4052 sender_ssrc,
4053 media_ssrc,
4054 lost_packets,
4055 };
4056 let packet = RtcpPacket::GenericNack(nack);
4057 transport
4058 .send_rtcp(&[packet])
4059 .await
4060 .map_err(|e| RtcError::Internal(format!("Failed to send NACK: {}", e)))?;
4061 Ok(())
4062 } else {
4063 Err(RtcError::InvalidState("Transport not set".into()))
4064 }
4065 }
4066
4067 pub async fn request_key_frame(&self) -> RtcResult<()> {
4068 let transport = self.transport.lock().unwrap().clone();
4069 if let Some(transport) = transport {
4070 let media_ssrc = *self.ssrc.lock().unwrap();
4071 let sender_ssrc = (*self.rtcp_feedback_ssrc.lock().unwrap()).unwrap_or(media_ssrc);
4072
4073 let seq = self.fir_seq.fetch_add(1, Ordering::Relaxed);
4075 let fir = FullIntraRequest {
4076 sender_ssrc,
4077 requests: vec![FirRequest {
4078 ssrc: media_ssrc,
4079 sequence_number: seq,
4080 }],
4081 };
4082 let packet_fir = RtcpPacket::FullIntraRequest(fir);
4083
4084 let pli = PictureLossIndication {
4085 sender_ssrc,
4086 media_ssrc,
4087 };
4088 let packet_pli = RtcpPacket::PictureLossIndication(pli);
4089 transport
4090 .send_rtcp(&[packet_fir, packet_pli])
4091 .await
4092 .map_err(|e| RtcError::Internal(format!("Failed to send PLI: {}", e)))?;
4093 Ok(())
4094 } else {
4095 Err(RtcError::InvalidState("Transport not set".into()))
4096 }
4097 }
4098}
4099
4100#[cfg(test)]
4101mod tests {
4102 use super::*;
4103 use crate::transports::ice::IceTransportState;
4104 use crate::{Direction, MediaKind, RtcConfiguration};
4105
4106 const AUDIO_PAYLOAD_TYPE: u8 = 111;
4107 const VIDEO_PAYLOAD_TYPE: u8 = 96;
4108 const SCTP_FORMAT: &str = "webrtc-datachannel";
4109 const SCTP_PORT: u16 = 5000;
4110
4111 #[tokio::test]
4112 async fn create_offer_contains_transceiver() {
4113 let pc = PeerConnection::new(RtcConfiguration::default());
4114 let transceiver = pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4115
4116 let (_, track, _) = sample_track(crate::media::frame::MediaKind::Audio, 48000);
4118 let params = RtpCodecParameters {
4119 payload_type: 111,
4120 clock_rate: 48000,
4121 channels: 2,
4122 };
4123 let sender = RtpSender::builder(track, 12345)
4124 .stream_id("stream".to_string())
4125 .params(params)
4126 .build();
4127 transceiver.set_sender(Some(sender));
4128
4129 let _ = pc.create_offer().await.unwrap();
4131
4132 pc.wait_for_gathering_complete().await;
4134
4135 let offer = pc.create_offer().await.unwrap();
4137
4138 assert_eq!(offer.media_sections.len(), 1);
4139 let section = &offer.media_sections[0];
4140 assert_eq!(section.kind, MediaKind::Audio);
4141 assert_eq!(section.direction, Direction::SendRecv);
4142 assert_eq!(section.formats, vec![AUDIO_PAYLOAD_TYPE.to_string()]);
4143 let attrs = §ion.attributes;
4144 assert!(attrs.iter().any(|attr| attr.key == "ice-ufrag"));
4145 assert!(attrs.iter().any(|attr| attr.key == "ice-pwd"));
4146
4147 assert!(
4149 offer
4150 .session
4151 .attributes
4152 .iter()
4153 .any(|a| a.key == "msid-semantic")
4154 );
4155
4156 assert!(attrs.iter().any(|a| a.key == "msid"));
4158
4159 assert!(attrs.iter().any(|a| a.key == "ssrc"));
4161 assert!(attrs.iter().any(|attr| attr.key == "ice-options"));
4162 assert!(attrs.iter().any(|attr| attr.key == "end-of-candidates"));
4163 assert!(attrs.iter().filter(|attr| attr.key == "candidate").count() >= 1);
4164 assert!(attrs.iter().any(|attr| {
4165 attr.key == "rtpmap"
4166 && attr
4167 .value
4168 .as_deref()
4169 .map(|v| v.contains("opus"))
4170 .unwrap_or(false)
4171 }));
4172 assert!(attrs.iter().any(|attr| attr.key == "fingerprint"));
4173 assert!(attrs.iter().any(|attr| {
4174 attr.key == "setup"
4175 && attr
4176 .value
4177 .as_deref()
4178 .map(|v| v == "actpass")
4179 .unwrap_or(false)
4180 }));
4181 assert_eq!(pc.signaling_state(), SignalingState::Stable);
4182 }
4183
4184 #[tokio::test]
4185 async fn offer_includes_video_capabilities() {
4186 let pc = PeerConnection::new(RtcConfiguration::default());
4187 pc.add_transceiver(MediaKind::Video, TransceiverDirection::SendRecv);
4188 let offer = pc.create_offer().await.unwrap();
4189 let section = &offer.media_sections[0];
4190 assert_eq!(section.kind, MediaKind::Video);
4191 assert_eq!(section.formats, vec![VIDEO_PAYLOAD_TYPE.to_string()]);
4192 let attrs = §ion.attributes;
4193 assert!(attrs.iter().any(|attr| attr.key == "rtcp-fb"));
4194 assert!(attrs.iter().any(|attr| {
4195 attr.key == "rtpmap"
4196 && attr
4197 .value
4198 .as_deref()
4199 .map(|v| v.contains("VP8"))
4200 .unwrap_or(false)
4201 }));
4202 }
4203
4204 #[tokio::test]
4205 async fn offer_includes_application_capabilities() {
4206 let pc = PeerConnection::new(RtcConfiguration::default());
4207 pc.add_transceiver(MediaKind::Application, TransceiverDirection::SendRecv);
4208 let offer = pc.create_offer().await.unwrap();
4209 let section = &offer.media_sections[0];
4210 assert_eq!(section.kind, MediaKind::Application);
4211 assert_eq!(section.protocol, "UDP/DTLS/SCTP");
4212 assert_eq!(section.formats, vec![SCTP_FORMAT.to_string()]);
4213 let attrs = §ion.attributes;
4214 let expected_port = SCTP_PORT.to_string();
4215 assert!(attrs.iter().any(|attr| {
4216 attr.key == "sctp-port"
4217 && attr
4218 .value
4219 .as_deref()
4220 .map(|v| v == expected_port)
4221 .unwrap_or(false)
4222 }));
4223 }
4224
4225 #[tokio::test]
4226 async fn test_simulcast_setup() {
4227 use crate::{SdpType, SessionDescription};
4228 let pc = PeerConnection::new(RtcConfiguration::default());
4229
4230 let sdp_str = "v=0\r\n\
4233 o=- 123456 0 IN IP4 127.0.0.1\r\n\
4234 s=-\r\n\
4235 t=0 0\r\n\
4236 a=extmap:3 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id\r\n\
4237 c=IN IP4 127.0.0.1\r\n\
4238 m=video 9 RTP/SAVPF 96\r\n\
4239 a=rtpmap:96 VP8/90000\r\n\
4240 a=rid:hi send\r\n\
4241 a=rid:mid send\r\n\
4242 a=rid:lo send\r\n\
4243 a=simulcast:send hi;mid;lo\r\n";
4244
4245 let desc = SessionDescription::parse(SdpType::Offer, sdp_str).unwrap();
4246 pc.set_remote_description(desc).await.unwrap();
4247
4248 let transceivers = pc.inner.transceivers.lock().unwrap();
4249 assert_eq!(transceivers.len(), 1);
4250 let t = &transceivers[0];
4251 let rx = t.receiver.lock().unwrap().as_ref().unwrap().clone();
4252
4253 let simulcast_tracks = rx.simulcast_tracks.lock().unwrap();
4255 assert!(simulcast_tracks.contains_key("hi"));
4256 assert!(simulcast_tracks.contains_key("mid"));
4257 assert!(simulcast_tracks.contains_key("lo"));
4258 assert_eq!(simulcast_tracks.len(), 3);
4259 }
4260
4261 #[tokio::test]
4262 async fn test_rtcp_mux_detection() {
4263 use crate::{SdpType, SessionDescription, TransportMode};
4264 let mut config = RtcConfiguration::default();
4266 config.transport_mode = TransportMode::Rtp;
4267 let pc = PeerConnection::new(config);
4268
4269 let sdp_str = "v=0\r\n\
4271 o=- 123456 0 IN IP4 127.0.0.1\r\n\
4272 s=-\r\n\
4273 t=0 0\r\n\
4274 c=IN IP4 127.0.0.1\r\n\
4275 m=audio 4000 RTP/AVP 111\r\n\
4276 a=rtpmap:111 opus/48000/2\r\n";
4277 let desc = SessionDescription::parse(SdpType::Offer, sdp_str).unwrap();
4278
4279 pc.set_remote_description(desc).await.unwrap();
4280
4281 let mut state_rx = pc.subscribe_peer_state();
4283 loop {
4284 if *state_rx.borrow() == PeerConnectionState::Connected {
4285 break;
4286 }
4287 state_rx.changed().await.unwrap();
4288 }
4289
4290 let rtp_transport = pc.inner.rtp_transport.lock().unwrap().clone().unwrap();
4292 let ice_conn = rtp_transport.ice_conn();
4293 let rtcp_addr = *ice_conn.remote_rtcp_addr.read().unwrap();
4294
4295 assert!(rtcp_addr.is_some());
4296 assert_eq!(rtcp_addr.unwrap().port(), 4001);
4297 }
4298
4299 #[tokio::test]
4300 async fn test_rtcp_mux_enabled() {
4301 use crate::{SdpType, SessionDescription, TransportMode};
4302 let mut config = RtcConfiguration::default();
4304 config.transport_mode = TransportMode::Rtp;
4305 let pc = PeerConnection::new(config);
4306
4307 let sdp_str = "v=0\r\n\
4309 o=- 123456 0 IN IP4 127.0.0.1\r\n\
4310 s=-\r\n\
4311 t=0 0\r\n\
4312 c=IN IP4 127.0.0.1\r\n\
4313 m=audio 4000 RTP/AVP 111\r\n\
4314 a=rtcp-mux\r\n\
4315 a=rtpmap:111 opus/48000/2\r\n";
4316 let desc = SessionDescription::parse(SdpType::Offer, sdp_str).unwrap();
4317
4318 pc.set_remote_description(desc).await.unwrap();
4319
4320 let mut state_rx = pc.subscribe_peer_state();
4321 loop {
4322 if *state_rx.borrow() == PeerConnectionState::Connected {
4323 break;
4324 }
4325 state_rx.changed().await.unwrap();
4326 }
4327
4328 let rtp_transport = pc.inner.rtp_transport.lock().unwrap().clone().unwrap();
4329 let ice_conn = rtp_transport.ice_conn();
4330 let rtcp_addr = *ice_conn.remote_rtcp_addr.read().unwrap();
4331
4332 assert!(rtcp_addr.is_none());
4333 }
4334
4335 #[tokio::test]
4336 async fn set_local_description_transitions_state() {
4337 let pc = PeerConnection::new(RtcConfiguration::default());
4338 pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4339 let offer = pc.create_offer().await.unwrap();
4340 pc.set_local_description(offer.clone()).unwrap();
4341 assert_eq!(pc.signaling_state(), SignalingState::HaveLocalOffer);
4342
4343 let mut answer = offer.clone();
4344 answer.sdp_type = SdpType::Answer;
4345 pc.set_remote_description(answer).await.unwrap();
4346 assert_eq!(pc.signaling_state(), SignalingState::Stable);
4347 }
4348
4349 #[tokio::test]
4350 async fn create_answer_requires_remote_offer() {
4351 let pc = PeerConnection::new(RtcConfiguration::default());
4352 pc.add_transceiver(MediaKind::Video, TransceiverDirection::SendOnly);
4353 let err = pc.create_answer().await.unwrap_err();
4354 assert!(matches!(err, RtcError::InvalidState(_)));
4355
4356 let offer = pc.create_offer().await.unwrap();
4357 pc.set_remote_description(offer.clone()).await.unwrap();
4358 let answer = pc.create_answer().await.unwrap();
4359 assert_eq!(answer.media_sections.len(), 1);
4360 assert_eq!(answer.media_sections[0].direction, Direction::RecvOnly);
4361 pc.set_local_description(answer).unwrap();
4362 assert_eq!(pc.signaling_state(), SignalingState::Stable);
4363 }
4364
4365 #[tokio::test]
4366 async fn remote_answer_without_local_offer_is_error() {
4367 let pc = PeerConnection::new(RtcConfiguration::default());
4368 pc.add_transceiver(MediaKind::Audio, TransceiverDirection::RecvOnly);
4369 let mut fake_answer = pc.create_offer().await.unwrap();
4370 fake_answer.sdp_type = SdpType::Answer;
4371 let err = pc.set_remote_description(fake_answer).await.unwrap_err();
4372 assert!(matches!(err, RtcError::InvalidState(_)));
4373 }
4374
4375 #[tokio::test]
4376 async fn peer_connection_exposes_ice_transport() {
4377 let pc = PeerConnection::new(RtcConfiguration::default());
4378 let ice = pc.ice_transport();
4379 assert_eq!(ice.state(), IceTransportState::New);
4380 assert_eq!(ice.config().ice_servers.len(), 0);
4381 }
4382
4383 #[tokio::test]
4384 async fn create_offer_rtp_mode() {
4385 use crate::TransportMode;
4386 let mut config = RtcConfiguration::default();
4387 config.transport_mode = TransportMode::Rtp;
4388 let pc = PeerConnection::new(config);
4389 let transceiver = pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4390
4391 let (_, track, _) = sample_track(crate::media::frame::MediaKind::Audio, 48000);
4393 let params = RtpCodecParameters {
4394 payload_type: 111,
4395 clock_rate: 48000,
4396 channels: 2,
4397 };
4398 let sender = RtpSender::builder(track, 12345)
4399 .stream_id("stream".to_string())
4400 .params(params)
4401 .build();
4402 transceiver.set_sender(Some(sender));
4403
4404 let offer = pc.create_offer().await.unwrap();
4405 let section = &offer.media_sections[0];
4406
4407 assert!(!section.attributes.iter().any(|a| a.key == "ice-ufrag"));
4409 assert!(!section.attributes.iter().any(|a| a.key == "candidate"));
4410
4411 assert!(!section.attributes.iter().any(|a| a.key == "fingerprint"));
4413
4414 assert!(
4416 !offer
4417 .session
4418 .attributes
4419 .iter()
4420 .any(|a| a.key == "msid-semantic")
4421 );
4422
4423 assert!(!section.attributes.iter().any(|a| a.key == "msid"));
4425
4426 assert!(section.attributes.iter().any(|a| a.key == "ssrc"));
4428
4429 assert_eq!(section.protocol, "RTP/AVP");
4431 }
4432
4433 #[tokio::test]
4434 async fn create_offer_srtp_mode() {
4435 use crate::TransportMode;
4436 let mut config = RtcConfiguration::default();
4437 config.transport_mode = TransportMode::Srtp;
4438 let pc = PeerConnection::new(config);
4439 pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4440
4441 let offer = pc.create_offer().await.unwrap();
4442 let section = &offer.media_sections[0];
4443
4444 assert!(!section.attributes.iter().any(|a| a.key == "ice-ufrag"));
4446 assert!(!section.attributes.iter().any(|a| a.key == "candidate"));
4447
4448 assert!(section.attributes.iter().any(|a| a.key == "fingerprint"));
4450
4451 assert_eq!(section.protocol, "UDP/TLS/RTP/SAVPF");
4453 }
4454
4455 #[tokio::test]
4456 async fn test_ssrc_parsing_with_fid_group() {
4457 let _ = env_logger::builder().is_test(true).try_init();
4458 let pc = PeerConnection::new(RtcConfiguration::default());
4459
4460 let sdp_str = "v=0\r\n\
4462o=- 123456 123456 IN IP4 127.0.0.1\r\n\
4463s=-\r\n\
4464t=0 0\r\n\
4465m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
4466c=IN IP4 127.0.0.1\r\n\
4467a=mid:0\r\n\
4468a=sendrecv\r\n\
4469a=rtpmap:96 VP8/90000\r\n\
4470a=ssrc:12345 cname:foo\r\n\
4471a=ssrc:67890 cname:foo\r\n\
4472a=ssrc-group:FID 12345 67890\r\n";
4473
4474 let sdp =
4475 crate::sdp::SessionDescription::parse(crate::sdp::SdpType::Offer, sdp_str).unwrap();
4476 pc.set_remote_description(sdp).await.unwrap();
4477
4478 let transceivers = pc.get_transceivers();
4479 assert_eq!(transceivers.len(), 1);
4480 let t = &transceivers[0];
4481 let receiver = t.receiver().unwrap();
4482
4483 assert_eq!(receiver.ssrc(), 12345);
4484 assert_eq!(receiver.rtx_ssrc(), Some(67890));
4485 }
4486
4487 #[tokio::test]
4488 async fn test_ssrc_parsing_with_fid_group_before_ssrc() {
4489 let _ = env_logger::builder().is_test(true).try_init();
4490 let pc = PeerConnection::new(RtcConfiguration::default());
4491
4492 let sdp_str = "v=0\r\n\
4494o=- 123456 123456 IN IP4 127.0.0.1\r\n\
4495s=-\r\n\
4496t=0 0\r\n\
4497m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
4498c=IN IP4 127.0.0.1\r\n\
4499a=mid:0\r\n\
4500a=sendrecv\r\n\
4501a=rtpmap:96 VP8/90000\r\n\
4502a=ssrc-group:FID 12345 67890\r\n\
4503a=ssrc:12345 cname:foo\r\n\
4504a=ssrc:67890 cname:foo\r\n";
4505
4506 let sdp =
4507 crate::sdp::SessionDescription::parse(crate::sdp::SdpType::Offer, sdp_str).unwrap();
4508 pc.set_remote_description(sdp).await.unwrap();
4509
4510 let transceivers = pc.get_transceivers();
4511 assert_eq!(transceivers.len(), 1);
4512 let t = &transceivers[0];
4513 let receiver = t.receiver().unwrap();
4514
4515 assert_eq!(receiver.ssrc(), 12345);
4516 assert_eq!(receiver.rtx_ssrc(), Some(67890));
4517 }
4518
4519 #[tokio::test]
4520 async fn test_ssrc_parsing_rtx_first_group_last() {
4521 let _ = env_logger::builder().is_test(true).try_init();
4522 let pc = PeerConnection::new(RtcConfiguration::default());
4523
4524 let sdp_str = "v=0\r\n\
4526o=- 123456 123456 IN IP4 127.0.0.1\r\n\
4527s=-\r\n\
4528t=0 0\r\n\
4529m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
4530c=IN IP4 127.0.0.1\r\n\
4531a=mid:0\r\n\
4532a=sendrecv\r\n\
4533a=rtpmap:96 VP8/90000\r\n\
4534a=ssrc:67890 cname:foo\r\n\
4535a=ssrc:12345 cname:foo\r\n\
4536a=ssrc-group:FID 12345 67890\r\n";
4537
4538 let sdp =
4539 crate::sdp::SessionDescription::parse(crate::sdp::SdpType::Offer, sdp_str).unwrap();
4540 pc.set_remote_description(sdp).await.unwrap();
4541
4542 let transceivers = pc.get_transceivers();
4543 assert_eq!(transceivers.len(), 1);
4544 let t = &transceivers[0];
4545 let receiver = t.receiver().unwrap();
4546
4547 println!("SSRC: {}", receiver.ssrc());
4548 println!("RTX SSRC: {:?}", receiver.rtx_ssrc());
4549 assert_eq!(receiver.ssrc(), 12345); assert_eq!(receiver.rtx_ssrc(), Some(67890));
4551 }
4552
4553 #[test]
4554 fn test_sdes_key_generation_and_parsing() {
4555 let params = generate_sdes_key_params();
4556 assert!(params.starts_with("inline:"));
4557
4558 let key = parse_sdes_key_params(¶ms).expect("Failed to parse generated params");
4559 assert_eq!(key.len(), 30); assert!(parse_sdes_key_params("invalid").is_err());
4563 assert!(parse_sdes_key_params("inline:invalid_base64").is_err());
4564 }
4565
4566 #[tokio::test]
4567 async fn create_offer_srtp_mode_includes_crypto() {
4568 use crate::TransportMode;
4569 let mut config = RtcConfiguration::default();
4570 config.transport_mode = TransportMode::Srtp;
4571 let pc = PeerConnection::new(config);
4572 pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4573
4574 let offer = pc.create_offer().await.unwrap();
4575 let section = &offer.media_sections[0];
4576
4577 let crypto = section.attributes.iter().find(|a| a.key == "crypto");
4579 assert!(crypto.is_some(), "Missing crypto attribute in SRTP mode");
4580
4581 let crypto_val = crypto.unwrap().value.as_ref().unwrap();
4582 assert!(crypto_val.starts_with("1 AES_CM_128_HMAC_SHA1_80 inline:"));
4583 }
4584
4585 #[tokio::test]
4586 async fn test_receiver_nack_handler() {
4587 use crate::rtp::RtpHeader;
4588 let handler = DefaultRtpReceiverNackHandler::new();
4589 let mut header = RtpHeader::new(96, 100, 0, 1234);
4590 let packet1 = RtpPacket::new(header.clone(), vec![1, 2, 3]);
4591
4592 assert!(handler.on_packet_received(&packet1).await.is_none());
4594
4595 header.sequence_number = 101;
4597 let packet2 = RtpPacket::new(header.clone(), vec![4, 5, 6]);
4598 assert!(handler.on_packet_received(&packet2).await.is_none());
4599
4600 header.sequence_number = 103;
4602 let packet3 = RtpPacket::new(header.clone(), vec![7, 8, 9]);
4603 let res = handler
4604 .on_packet_received(&packet3)
4605 .await
4606 .expect("Should generate NACK");
4607 if let RtcpPacket::GenericNack(nack) = res {
4608 assert_eq!(nack.lost_packets, vec![102]);
4609 assert_eq!(nack.media_ssrc, 1234);
4610 } else {
4611 panic!("Expected GenericNack");
4612 }
4613
4614 header.sequence_number = 106;
4616 let packet4 = RtpPacket::new(header.clone(), vec![10]);
4617 let res = handler
4618 .on_packet_received(&packet4)
4619 .await
4620 .expect("Should generate NACK");
4621 if let RtcpPacket::GenericNack(nack) = res {
4622 assert_eq!(nack.lost_packets, vec![104, 105]);
4623 } else {
4624 panic!("Expected GenericNack");
4625 }
4626 }
4627
4628 #[tokio::test]
4629 async fn test_sender_nack_handler() {
4630 use crate::rtp::RtpHeader;
4631 use crate::transports::ice::conn::IceConn;
4632 use crate::transports::rtp::RtpTransport;
4633 use std::net::{Ipv4Addr, SocketAddr};
4634
4635 let handler = DefaultRtpSenderNackHandler::new(10);
4636 let mut header = RtpHeader::new(96, 100, 0, 1234);
4637 let packet1 = RtpPacket::new(header.clone(), vec![1, 2, 3]);
4638
4639 handler.on_packet_sent(&packet1).await;
4640
4641 let (_, socket_rx) = tokio::sync::watch::channel(None);
4643 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
4644 let ice_conn = IceConn::new(socket_rx, addr);
4645 let transport = Arc::new(RtpTransport::new(ice_conn, false));
4646
4647 let nack = GenericNack {
4648 sender_ssrc: 0,
4649 media_ssrc: 1234,
4650 lost_packets: vec![100],
4651 };
4652
4653 handler
4655 .on_rtcp_received(&RtcpPacket::GenericNack(nack), transport)
4656 .await;
4657
4658 for i in 101..115 {
4660 header.sequence_number = i;
4661 handler
4662 .on_packet_sent(&RtpPacket::new(header.clone(), vec![0]))
4663 .await;
4664 }
4665
4666 let nack_old = GenericNack {
4668 sender_ssrc: 0,
4669 media_ssrc: 1234,
4670 lost_packets: vec![100],
4671 };
4672
4673 let (_, socket_rx2) = tokio::sync::watch::channel(None);
4676 let ice_conn2 = IceConn::new(socket_rx2, addr);
4677 let transport2 = Arc::new(RtpTransport::new(ice_conn2, false));
4678 handler
4679 .on_rtcp_received(&RtcpPacket::GenericNack(nack_old), transport2)
4680 .await;
4681 }
4682
4683 #[tokio::test]
4684 async fn test_nack_configuration() {
4685 let mut config = RtcConfiguration::default();
4686 config.nack_buffer_size = 200;
4687
4688 let pc = PeerConnection::new(config);
4689 let transceiver = pc.add_transceiver(MediaKind::Video, TransceiverDirection::SendRecv);
4690
4691 let receiver = transceiver.receiver().unwrap();
4693 assert!(receiver.nack_handler().is_some());
4694
4695 let (_, track, _) = sample_track(crate::media::frame::MediaKind::Video, 90000);
4697 let sender = pc
4698 .add_track_with_stream_id(track, "stream1".to_string(), RtpCodecParameters::default())
4699 .unwrap();
4700 assert!(sender.nack_handler().is_some());
4701 }
4702
4703 #[tokio::test]
4704 async fn rtp_mode_sends_track_event_after_ssrc_latching() {
4705 let mut config = RtcConfiguration::default();
4707 config.transport_mode = TransportMode::Rtp;
4708
4709 let pc = PeerConnection::new(config);
4710
4711 let transceiver = pc.add_transceiver(MediaKind::Audio, TransceiverDirection::RecvOnly);
4713
4714 let remote_sdp = "\
4716v=0
4717o=- 12345 12345 IN IP4 192.168.1.100
4718s=-
4719c=IN IP4 192.168.1.100
4720t=0 0
4721m=audio 9000 RTP/AVP 8
4722a=rtpmap:8 PCMA/8000
4723a=sendonly
4724a=mid:0
4725";
4726
4727 let remote_offer = SessionDescription::parse(SdpType::Offer, remote_sdp).unwrap();
4728 pc.set_remote_description(remote_offer).await.unwrap();
4729
4730 let receiver = transceiver.receiver().unwrap();
4732 let initial_ssrc = receiver.ssrc();
4733
4734 assert!(
4736 initial_ssrc >= 2000 && initial_ssrc < 3000,
4737 "Initial SSRC should be provisional, got {}",
4738 initial_ssrc
4739 );
4740
4741 println!(
4742 "✓ RTP mode test setup complete, initial provisional SSRC: {}",
4743 initial_ssrc
4744 );
4745 println!("✓ When real RTP packets arrive with actual SSRC, Track event will be sent");
4746 println!("✓ Track event sending logic is in place at SSRC latching point");
4747 }
4748
4749 #[tokio::test]
4750 async fn test_custom_depacketizer_strategy() {
4751 use crate::config::DepacketizerStrategy;
4752 use crate::media::depacketizer::{
4753 Depacketizer, DepacketizerFactory, PassThroughDepacketizer,
4754 };
4755 use crate::media::frame::MediaKind as FrameMediaKind;
4756
4757 #[derive(Debug)]
4758 struct MockFactory;
4759
4760 impl DepacketizerFactory for MockFactory {
4761 fn create(&self, _kind: FrameMediaKind) -> Box<dyn Depacketizer> {
4762 Box::new(PassThroughDepacketizer)
4763 }
4764 }
4765
4766 let factory: Arc<dyn DepacketizerFactory> = Arc::new(MockFactory);
4767 let mut config = RtcConfiguration::default();
4768 config.depacketizer_strategy = DepacketizerStrategy {
4769 factory: factory.clone(),
4770 };
4771
4772 let pc = PeerConnection::new(config);
4773
4774 let retrieved_config = pc.config();
4775 assert!(Arc::ptr_eq(
4776 &retrieved_config.depacketizer_strategy.factory,
4777 &factory
4778 ));
4779
4780 let transceiver = pc.add_transceiver(MediaKind::Video, TransceiverDirection::RecvOnly);
4782 assert_eq!(transceiver.kind(), MediaKind::Video);
4783 }
4784}