Skip to main content

rustrtc/
peer_connection.rs

1use crate::media::depacketizer::{Depacketizer, DepacketizerFactory};
2use crate::media::track::{MediaStreamTrack, SampleStreamSource, SampleStreamTrack, sample_track};
3use crate::rtp::{
4    FirRequest, FullIntraRequest, GenericNack, PictureLossIndication, RtcpPacket, RtpPacket,
5};
6use crate::stats::{StatsReport, gather_once};
7use crate::stats_collector::StatsCollector;
8use crate::transports::dtls::{self, DtlsTransport};
9use crate::transports::get_local_ip;
10use crate::transports::ice::stun::random_u32;
11use crate::transports::ice::{IceCandidate, IceGathererState, IceTransport, conn::IceConn};
12use crate::transports::rtp::RtpTransport;
13use crate::transports::sctp::SctpTransport;
14use crate::{
15    Attribute, AudioCapability, Direction, MediaKind, MediaSection, Origin, RtcConfiguration,
16    RtcError, RtcResult, SdpType, SessionDescription, TransportMode, VideoCapability,
17};
18use base64::prelude::*;
19use std::collections::{HashMap, VecDeque};
20use std::net::{IpAddr, Ipv4Addr};
21use std::{
22    sync::{
23        Arc, Mutex,
24        atomic::{AtomicBool, AtomicU8, AtomicU16, AtomicU32, AtomicU64, Ordering},
25    },
26    time::{SystemTime, UNIX_EPOCH},
27};
28use tokio::sync::{broadcast, mpsc, watch};
29use tracing::{debug, trace};
30
31use async_trait::async_trait;
32use futures::stream::{FuturesUnordered, StreamExt};
33use std::future::Future;
34use std::pin::Pin;
35use std::sync::Weak;
36
37#[async_trait]
38pub trait RtpSenderInterceptor: Send + Sync {
39    async fn on_packet_sent(&self, _packet: &RtpPacket) {}
40    async fn on_rtcp_received(&self, _packet: &RtcpPacket, _transport: Arc<RtpTransport>) {}
41    fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
42        None
43    }
44}
45
46#[async_trait]
47pub trait RtpReceiverInterceptor: Send + Sync {
48    async fn on_packet_received(&self, _packet: &RtpPacket) -> Option<RtcpPacket> {
49        None
50    }
51    async fn on_rtcp_received(&self, _packet: &RtcpPacket, _transport: Arc<RtpTransport>) {}
52    fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
53        None
54    }
55}
56
57pub trait NackStats: Send + Sync {
58    fn get_nack_count(&self) -> u64;
59    fn get_recovered_count(&self) -> u64 {
60        0
61    }
62}
63
64pub struct DefaultRtpSenderNackHandler {
65    buffer: Mutex<VecDeque<RtpPacket>>,
66    max_size: usize,
67    pub nack_recv_count: AtomicU64,
68}
69
70pub struct DefaultRtpSenderBitrateHandler;
71
72impl DefaultRtpSenderBitrateHandler {
73    pub fn new() -> Self {
74        Self
75    }
76}
77
78#[async_trait]
79impl RtpSenderInterceptor for DefaultRtpSenderBitrateHandler {
80    async fn on_rtcp_received(&self, packet: &RtcpPacket, _transport: Arc<RtpTransport>) {
81        if let RtcpPacket::RemoteBitrateEstimate(remb) = packet {
82            debug!("Received REMB: {} bps", remb.bitrate_bps);
83        }
84    }
85}
86
87impl DefaultRtpSenderNackHandler {
88    pub fn new(max_size: usize) -> Self {
89        Self {
90            buffer: Mutex::new(VecDeque::with_capacity(max_size)),
91            max_size,
92            nack_recv_count: AtomicU64::new(0),
93        }
94    }
95}
96
97#[async_trait]
98impl RtpSenderInterceptor for DefaultRtpSenderNackHandler {
99    async fn on_packet_sent(&self, packet: &RtpPacket) {
100        let mut buffer = self.buffer.lock().unwrap();
101        buffer.push_back(packet.clone());
102        if buffer.len() > self.max_size {
103            buffer.pop_front();
104        }
105    }
106
107    async fn on_rtcp_received(&self, packet: &RtcpPacket, transport: Arc<RtpTransport>) {
108        if let RtcpPacket::GenericNack(nack) = packet {
109            debug!(
110                "NACK: received NACK for {} packets",
111                nack.lost_packets.len()
112            );
113            self.nack_recv_count
114                .fetch_add(nack.lost_packets.len() as u64, Ordering::Relaxed);
115
116            let to_resend = {
117                let buffer = self.buffer.lock().unwrap();
118                let mut packets = Vec::new();
119                for seq in &nack.lost_packets {
120                    if let Some(packet) = buffer.iter().find(|p| p.header.sequence_number == *seq) {
121                        packets.push(packet.clone());
122                    }
123                }
124                packets
125            };
126
127            for packet in to_resend {
128                let seq_num = packet.header.sequence_number;
129                debug!("NACK: retransmitting packet seq={}", seq_num);
130                let _ = transport.send_rtp(&packet).await;
131            }
132        }
133    }
134
135    fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
136        Some(self)
137    }
138}
139
140impl NackStats for DefaultRtpSenderNackHandler {
141    fn get_nack_count(&self) -> u64 {
142        self.nack_recv_count.load(Ordering::Relaxed)
143    }
144}
145
146pub struct DefaultRtpReceiverNackHandler {
147    last_seq: AtomicU16,
148    last_ssrc: AtomicU32,
149    initialized: std::sync::atomic::AtomicBool,
150    pub nack_sent_count: AtomicU64,
151    pub nack_recovered_count: AtomicU64,
152}
153
154impl DefaultRtpReceiverNackHandler {
155    pub fn new() -> Self {
156        Self {
157            last_seq: AtomicU16::new(0),
158            last_ssrc: AtomicU32::new(0),
159            initialized: std::sync::atomic::AtomicBool::new(false),
160            nack_sent_count: AtomicU64::new(0),
161            nack_recovered_count: AtomicU64::new(0),
162        }
163    }
164}
165
166#[async_trait]
167impl RtpReceiverInterceptor for DefaultRtpReceiverNackHandler {
168    async fn on_packet_received(&self, packet: &RtpPacket) -> Option<RtcpPacket> {
169        let seq = packet.header.sequence_number;
170        let ssrc = packet.header.ssrc;
171
172        // Check if SSRC changed - indicates stream switch
173        let last_ssrc = self.last_ssrc.load(Ordering::SeqCst);
174        if last_ssrc != 0 && last_ssrc != ssrc {
175            debug!(
176                "NACK: SSRC changed from {} to {}, resetting state",
177                last_ssrc, ssrc
178            );
179            self.last_ssrc.store(ssrc, Ordering::SeqCst);
180            self.last_seq.store(seq, Ordering::SeqCst);
181            return None; // Don't send NACK on stream switch
182        }
183
184        if !self.initialized.swap(true, Ordering::SeqCst) {
185            self.last_ssrc.store(ssrc, Ordering::SeqCst);
186            self.last_seq.store(seq, Ordering::SeqCst);
187            return None;
188        }
189
190        let last = self.last_seq.load(Ordering::SeqCst);
191        let diff = seq.wrapping_sub(last);
192
193        if diff > 1 && diff < 32768 {
194            let mut lost = Vec::new();
195            let mut s = last.wrapping_add(1);
196            while s != seq {
197                lost.push(s);
198                s = s.wrapping_add(1);
199            }
200            debug!(
201                "NACK: detected gap from {} to {}, lost {} packets",
202                last,
203                seq,
204                lost.len()
205            );
206            self.nack_sent_count
207                .fetch_add(lost.len() as u64, Ordering::Relaxed);
208            self.last_seq.store(seq, Ordering::SeqCst);
209            return Some(RtcpPacket::GenericNack(GenericNack {
210                sender_ssrc: 0, // Will be filled by receiver
211                media_ssrc: packet.header.ssrc,
212                lost_packets: lost,
213            }));
214        }
215
216        if diff < 32768 {
217            self.last_seq.store(seq, Ordering::SeqCst);
218        } else if diff > 32768 {
219            debug!("NACK: received old packet seq={}, last={}", seq, last);
220            self.nack_recovered_count.fetch_add(1, Ordering::Relaxed);
221        }
222        None
223    }
224
225    fn as_nack_stats(self: Arc<Self>) -> Option<Arc<dyn NackStats>> {
226        Some(self)
227    }
228}
229
230impl NackStats for DefaultRtpReceiverNackHandler {
231    fn get_nack_count(&self) -> u64 {
232        self.nack_sent_count.load(Ordering::Relaxed)
233    }
234
235    fn get_recovered_count(&self) -> u64 {
236        self.nack_recovered_count.load(Ordering::Relaxed)
237    }
238}
239
240enum ReceiverCommand {
241    AddTrack {
242        rid: Option<String>,
243        packet_rx: mpsc::Receiver<(crate::rtp::RtpPacket, std::net::SocketAddr)>,
244        feedback_rx:
245            std::sync::Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
246        source: std::sync::Arc<crate::media::track::SampleStreamSource>,
247        simulcast_ssrc: std::sync::Arc<std::sync::Mutex<Option<u32>>>,
248    },
249}
250
251enum LoopEvent {
252    Packet(
253        Option<(crate::rtp::RtpPacket, std::net::SocketAddr)>,
254        Option<String>,
255        mpsc::Receiver<(crate::rtp::RtpPacket, std::net::SocketAddr)>,
256        Box<dyn Depacketizer>,
257    ),
258    Feedback(Option<crate::media::track::FeedbackEvent>, Option<String>),
259}
260
261#[derive(Clone)]
262pub enum PeerConnectionEvent {
263    DataChannel(Arc<crate::transports::sctp::DataChannel>),
264    Track(Arc<RtpTransceiver>),
265}
266
267#[derive(Clone)]
268pub struct PeerConnection {
269    inner: Arc<PeerConnectionInner>,
270}
271
272struct PeerConnectionInner {
273    config: RtcConfiguration,
274    signaling_state: watch::Sender<SignalingState>,
275    _signaling_state_rx: watch::Receiver<SignalingState>,
276    peer_state: watch::Sender<PeerConnectionState>,
277    _peer_state_rx: watch::Receiver<PeerConnectionState>,
278    ice_connection_state: watch::Sender<IceConnectionState>,
279    _ice_connection_state_rx: watch::Receiver<IceConnectionState>,
280    ice_gathering_state: watch::Sender<IceGatheringState>,
281    _ice_gathering_state_rx: watch::Receiver<IceGatheringState>,
282    local_description: Mutex<Option<SessionDescription>>,
283    remote_description: Mutex<Option<SessionDescription>>,
284    transceivers: Mutex<Vec<Arc<RtpTransceiver>>>,
285    next_mid: AtomicU16,
286    ice_transport: IceTransport,
287    certificate: Arc<dtls::Certificate>,
288    dtls_fingerprint: String,
289    dtls_transport: Mutex<Option<Arc<DtlsTransport>>>,
290    rtp_transport: Mutex<Option<Arc<RtpTransport>>>,
291    sctp_transport: Mutex<Option<Arc<SctpTransport>>>,
292    data_channels: Arc<Mutex<Vec<std::sync::Weak<crate::transports::sctp::DataChannel>>>>,
293    event_tx: mpsc::UnboundedSender<PeerConnectionEvent>,
294    event_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<PeerConnectionEvent>>,
295    dtls_role: watch::Sender<Option<bool>>,
296    _dtls_role_rx: watch::Receiver<Option<bool>>,
297    stats_collector: Arc<StatsCollector>,
298    ssrc_generator: AtomicU32,
299}
300
301fn generate_sdes_key_params() -> String {
302    let mut key_salt = [0u8; 30];
303    rand::fill(&mut key_salt);
304    let encoded = BASE64_STANDARD.encode(&key_salt);
305    format!("inline:{}", encoded)
306}
307
308fn parse_sdes_key_params(params: &str) -> RtcResult<Vec<u8>> {
309    if !params.starts_with("inline:") {
310        return Err(RtcError::Internal("Unsupported key params".into()));
311    }
312    let key_salt_base64 = &params[7..];
313    let key_salt_base64 = key_salt_base64.split('|').next().unwrap();
314    BASE64_STANDARD
315        .decode(key_salt_base64)
316        .map_err(|e| RtcError::Internal(format!("Invalid base64 key: {}", e)))
317}
318
319fn map_crypto_suite(suite: &str) -> RtcResult<crate::srtp::SrtpProfile> {
320    match suite {
321        "AES_CM_128_HMAC_SHA1_80" => Ok(crate::srtp::SrtpProfile::Aes128Sha1_80),
322        "AES_CM_128_HMAC_SHA1_32" => Ok(crate::srtp::SrtpProfile::Aes128Sha1_32),
323        "AEAD_AES_128_GCM" => Ok(crate::srtp::SrtpProfile::AeadAes128Gcm),
324        _ => Err(RtcError::Internal(format!(
325            "Unsupported crypto suite: {}",
326            suite
327        ))),
328    }
329}
330
331impl PeerConnection {
332    pub fn new(config: RtcConfiguration) -> Self {
333        let (ice_transport, ice_runner) = IceTransport::new(config.clone());
334        let certificate =
335            Arc::new(dtls::generate_certificate().expect("failed to generate certificate"));
336        let dtls_fingerprint = dtls::fingerprint(&certificate);
337
338        let (signaling_state_tx, signaling_state_rx) = watch::channel(SignalingState::Stable);
339        let (peer_state_tx, peer_state_rx) = watch::channel(PeerConnectionState::New);
340        let (ice_connection_state_tx, ice_connection_state_rx) =
341            watch::channel(IceConnectionState::New);
342        let (ice_gathering_state_tx, ice_gathering_state_rx) =
343            watch::channel(IceGatheringState::New);
344        let (dtls_role_tx, dtls_role_rx) = watch::channel(None);
345
346        let ssrc_generator = AtomicU32::new(config.ssrc_start);
347
348        let (event_tx, event_rx) = mpsc::unbounded_channel();
349
350        let inner = PeerConnectionInner {
351            config,
352            signaling_state: signaling_state_tx,
353            _signaling_state_rx: signaling_state_rx,
354            peer_state: peer_state_tx,
355            _peer_state_rx: peer_state_rx,
356            ice_connection_state: ice_connection_state_tx,
357            _ice_connection_state_rx: ice_connection_state_rx,
358            ice_gathering_state: ice_gathering_state_tx,
359            _ice_gathering_state_rx: ice_gathering_state_rx,
360            local_description: Mutex::new(None),
361            remote_description: Mutex::new(None),
362            transceivers: Mutex::new(Vec::new()),
363            next_mid: AtomicU16::new(0),
364            ice_transport,
365            certificate,
366            dtls_fingerprint,
367            dtls_transport: Mutex::new(None),
368            rtp_transport: Mutex::new(None),
369            sctp_transport: Mutex::new(None),
370            data_channels: Arc::new(Mutex::new(Vec::new())),
371            event_tx,
372            event_rx: tokio::sync::Mutex::new(event_rx),
373            dtls_role: dtls_role_tx,
374            _dtls_role_rx: dtls_role_rx.clone(),
375            stats_collector: Arc::new(StatsCollector::new()),
376            ssrc_generator,
377        };
378        let pc = Self {
379            inner: Arc::new(inner),
380        };
381
382        let inner_weak = Arc::downgrade(&pc.inner);
383        let ice_transport = pc.inner.ice_transport.clone();
384        let dtls_role_rx = dtls_role_rx;
385        let ice_connection_state_tx = pc.inner.ice_connection_state.clone();
386
387        let ice_transport_gathering = ice_transport.clone();
388        let ice_gathering_state_tx = pc.inner.ice_gathering_state.clone();
389        let inner_weak_gathering = inner_weak.clone();
390        tokio::spawn(async move {
391            let gathering_loop = run_gathering_loop(
392                ice_transport_gathering,
393                ice_gathering_state_tx,
394                inner_weak_gathering,
395            );
396
397            let dtls_loop = run_ice_dtls_loop(
398                ice_transport,
399                ice_connection_state_tx,
400                dtls_role_rx,
401                inner_weak,
402            );
403
404            tokio::join!(gathering_loop, dtls_loop, ice_runner);
405        });
406        pc
407    }
408
409    pub fn config(&self) -> &RtcConfiguration {
410        &self.inner.config
411    }
412
413    pub fn ice_transport(&self) -> IceTransport {
414        self.inner.ice_transport.clone()
415    }
416
417    pub fn add_transceiver(
418        &self,
419        kind: MediaKind,
420        direction: TransceiverDirection,
421    ) -> Arc<RtpTransceiver> {
422        let index = self
423            .inner
424            .transceivers
425            .lock()
426            .map(|list| list.len())
427            .unwrap_or(0);
428        let ssrc = 2000 + index as u32;
429        let mut builder = RtpReceiverBuilder::new(kind, ssrc)
430            .interceptor(self.inner.stats_collector.clone())
431            .depacketizer_factory(self.inner.config.depacketizer_strategy.factory.clone());
432
433        let nack_enabled = if let Some(caps) = &self.inner.config.media_capabilities {
434            match kind {
435                MediaKind::Audio => caps
436                    .audio
437                    .iter()
438                    .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
439                MediaKind::Video => caps
440                    .video
441                    .iter()
442                    .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
443                MediaKind::Application => false,
444            }
445        } else {
446            match kind {
447                MediaKind::Audio => AudioCapability::default()
448                    .rtcp_fbs
449                    .contains(&"nack".to_string()),
450                MediaKind::Video => VideoCapability::default()
451                    .rtcp_fbs
452                    .contains(&"nack".to_string()),
453                MediaKind::Application => false,
454            }
455        };
456
457        if nack_enabled {
458            builder = builder.nack();
459        }
460        let receiver = builder.build();
461
462        let transceiver = Arc::new(RtpTransceiver::new(kind, direction));
463        if direction.sends() {
464            let rand_val = random_u32();
465            let ssrc = self
466                .inner
467                .ssrc_generator
468                .fetch_add(1 + rand_val, Ordering::Relaxed);
469            *transceiver.sender_ssrc.lock().unwrap() = Some(ssrc);
470            *transceiver.sender_stream_id.lock().unwrap() = Some("default".to_string());
471            *transceiver.sender_track_id.lock().unwrap() =
472                Some(format!("track-{}", transceiver.id()));
473        }
474        *transceiver.receiver.lock().unwrap() = Some(receiver);
475
476        let mut list = self.inner.transceivers.lock().unwrap();
477        list.push(transceiver.clone());
478        transceiver
479    }
480
481    pub fn add_track(
482        &self,
483        track: Arc<dyn MediaStreamTrack>,
484        params: RtpCodecParameters,
485    ) -> RtcResult<Arc<RtpSender>> {
486        let stream_id = format!("{}", track.id());
487        self.add_track_with_stream_id(track, stream_id, params)
488    }
489
490    pub fn add_track_with_stream_id(
491        &self,
492        track: Arc<dyn MediaStreamTrack>,
493        stream_id: String,
494        params: RtpCodecParameters,
495    ) -> RtcResult<Arc<RtpSender>> {
496        let kind = match track.kind() {
497            crate::media::frame::MediaKind::Audio => MediaKind::Audio,
498            crate::media::frame::MediaKind::Video => MediaKind::Video,
499        };
500        let transceiver = self.add_transceiver(kind, TransceiverDirection::SendRecv);
501        let ssrc = transceiver
502            .sender_ssrc
503            .lock()
504            .unwrap()
505            .unwrap_or_else(|| self.inner.ssrc_generator.fetch_add(1, Ordering::Relaxed));
506
507        let mut builder = RtpSenderBuilder::new(track, ssrc)
508            .stream_id(stream_id)
509            .params(params)
510            .interceptor(self.inner.stats_collector.clone());
511
512        let nack_enabled = if let Some(caps) = &self.inner.config.media_capabilities {
513            match kind {
514                MediaKind::Audio => caps
515                    .audio
516                    .iter()
517                    .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
518                MediaKind::Video => caps
519                    .video
520                    .iter()
521                    .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
522                MediaKind::Application => false,
523            }
524        } else {
525            match kind {
526                MediaKind::Audio => AudioCapability::default()
527                    .rtcp_fbs
528                    .contains(&"nack".to_string()),
529                MediaKind::Video => VideoCapability::default()
530                    .rtcp_fbs
531                    .contains(&"nack".to_string()),
532                MediaKind::Application => false,
533            }
534        };
535
536        if nack_enabled {
537            builder = builder
538                .nack(self.inner.config.nack_buffer_size)
539                .bitrate_controller();
540        }
541
542        let sender = builder.build();
543
544        // Update transceiver's pre-allocated info to match the actual sender
545        *transceiver.sender_ssrc.lock().unwrap() = Some(sender.ssrc());
546        *transceiver.sender_stream_id.lock().unwrap() = Some(sender.stream_id().to_string());
547        *transceiver.sender_track_id.lock().unwrap() = Some(sender.track_id().to_string());
548
549        // If transport is already established, set it on the sender immediately
550        if let Some(transport) = self.inner.rtp_transport.lock().unwrap().as_ref() {
551            sender.set_transport(transport.clone());
552        }
553
554        transceiver.set_sender(Some(sender.clone()));
555        Ok(sender)
556    }
557
558    pub fn get_transceivers(&self) -> Vec<Arc<RtpTransceiver>> {
559        self.inner.transceivers.lock().unwrap().clone()
560    }
561
562    pub async fn create_offer(&self) -> RtcResult<SessionDescription> {
563        let state = &self.inner.signaling_state;
564        if *state.borrow() != SignalingState::Stable {
565            return Err(RtcError::InvalidState(format!(
566                "cannot create offer while in state {:?}",
567                *state.borrow()
568            )));
569        }
570        let should_set_controlling = {
571            let local = self.inner.local_description.lock().unwrap();
572            let remote = self.inner.remote_description.lock().unwrap();
573            local.is_none() && remote.is_none()
574        };
575
576        if should_set_controlling {
577            self.inner
578                .ice_transport
579                .set_role(crate::transports::ice::IceRole::Controlling);
580        }
581        self.inner
582            .build_description(SdpType::Offer, |dir| dir)
583            .await
584    }
585
586    pub async fn create_answer(&self) -> RtcResult<SessionDescription> {
587        let state = &self.inner.signaling_state;
588        if *state.borrow() != SignalingState::HaveRemoteOffer {
589            return Err(RtcError::InvalidState(
590                "create_answer requires remote offer".into(),
591            ));
592        }
593        self.inner
594            .ice_transport
595            .set_role(crate::transports::ice::IceRole::Controlled);
596        self.inner
597            .build_description(SdpType::Answer, |dir| dir.answer_direction())
598            .await
599    }
600
601    pub fn set_local_description(&self, desc: SessionDescription) -> RtcResult<()> {
602        self.inner.validate_sdp_type(&desc.sdp_type)?;
603
604        // For Offerer: extract parameters from local offer (our intended changes)
605        // This allows Offerer to immediately update transceivers with new parameters
606        // that will be confirmed when answer is received
607        if desc.sdp_type == SdpType::Offer {
608            let is_reinvite = {
609                let local = self.inner.local_description.lock().unwrap();
610                local.is_some()
611            };
612            if is_reinvite {
613                debug!("Offerer: extracting parameters from local reinvite offer");
614                // Extract parameters from our offer for transceivers
615                let transceivers = self.inner.transceivers.lock().unwrap().clone();
616                for section in &desc.media_sections {
617                    let mut matched_transceiver = transceivers
618                        .iter()
619                        .find(|t| t.mid().as_ref() == Some(&section.mid))
620                        .map(|t| t.clone());
621
622                    // If not found by MID, try to match with mid-less transceiver (e.g. manual SDP)
623                    if matched_transceiver.is_none() {
624                        if let Some(t) = transceivers
625                            .iter()
626                            .find(|t| t.mid().is_none() && t.kind() == section.kind)
627                        {
628                            t.set_mid(section.mid.clone());
629                            matched_transceiver = Some(t.clone());
630                        }
631                    }
632
633                    if let Some(t) = matched_transceiver {
634                        let payload_map = Self::extract_payload_map(section);
635                        if !payload_map.is_empty() {
636                            let _ = t.update_payload_map(payload_map);
637                        }
638                        let extmap = Self::extract_extmap(section);
639                        let _ = t.update_extmap(extmap);
640                    }
641                }
642            } else {
643                // Initial offer: ensure MIDs are assigned if we match unassigned transceivers
644                // This covers manual SDP creation (skipped create_offer)
645                let transceivers = self.inner.transceivers.lock().unwrap().clone();
646                for section in &desc.media_sections {
647                    if transceivers
648                        .iter()
649                        .any(|t| t.mid().as_ref() == Some(&section.mid))
650                    {
651                        continue;
652                    }
653                    // Assign to first matching unassigned transceiver
654                    if let Some(t) = transceivers
655                        .iter()
656                        .find(|t| t.mid().is_none() && t.kind() == section.kind)
657                    {
658                        t.set_mid(section.mid.clone());
659                    }
660                }
661            }
662        }
663
664        {
665            let state = &self.inner.signaling_state;
666            match desc.sdp_type {
667                SdpType::Offer => {
668                    if *state.borrow() != SignalingState::Stable {
669                        return Err(RtcError::InvalidState(
670                            "set_local_description(offer) requires stable signaling state".into(),
671                        ));
672                    }
673                    let _ = state.send(SignalingState::HaveLocalOffer);
674                }
675                SdpType::Answer => {
676                    if *state.borrow() != SignalingState::HaveRemoteOffer {
677                        return Err(RtcError::InvalidState(
678                            "set_local_description(answer) requires remote offer".into(),
679                        ));
680                    }
681                    let _ = state.send(SignalingState::Stable);
682                }
683                SdpType::Rollback | SdpType::Pranswer => {
684                    return Err(RtcError::NotImplemented("pranswer/rollback"));
685                }
686            }
687        }
688        let mut local = self.inner.local_description.lock().unwrap();
689        *local = Some(desc);
690        Ok(())
691    }
692
693    pub async fn set_remote_description(&self, desc: SessionDescription) -> RtcResult<()> {
694        self.inner.validate_sdp_type(&desc.sdp_type)?;
695
696        // Check if this is a reinvite (not first negotiation)
697        let is_reinvite = {
698            let remote = self.inner.remote_description.lock().unwrap();
699            remote.is_some()
700        };
701
702        if is_reinvite {
703            // Apply reinvite at correct timing based on role
704            let current_state = *self.inner.signaling_state.borrow();
705            match (desc.sdp_type, current_state) {
706                // Answerer receiving offer: apply immediately
707                (SdpType::Offer, SignalingState::Stable) => {
708                    debug!("Answerer: applying reinvite from offer");
709                    self.handle_reinvite(&desc).await?;
710                }
711                // Offerer receiving answer: apply now (was pending since we sent offer)
712                (SdpType::Answer, SignalingState::HaveLocalOffer) => {
713                    debug!("Offerer: applying reinvite from answer");
714                    self.handle_reinvite(&desc).await?;
715                }
716                // Invalid states for reinvite
717                (SdpType::Offer, _) => {
718                    return Err(RtcError::InvalidState(
719                        "Cannot handle reinvite offer in non-stable state (glare?)".into(),
720                    ));
721                }
722                _ => {}
723            }
724        }
725
726        // Update next_mid to avoid collisions with remote MIDs
727        for section in &desc.media_sections {
728            if let Ok(mid_val) = section.mid.parse::<u16>() {
729                self.inner.next_mid.fetch_max(mid_val + 1, Ordering::SeqCst);
730            }
731        }
732
733        {
734            let state = &self.inner.signaling_state;
735            match desc.sdp_type {
736                SdpType::Offer => {
737                    if *state.borrow() != SignalingState::Stable {
738                        return Err(RtcError::InvalidState(
739                            "set_remote_description(offer) requires stable signaling state".into(),
740                        ));
741                    }
742                    let _ = state.send(SignalingState::HaveRemoteOffer);
743                }
744                SdpType::Answer => {
745                    if *state.borrow() != SignalingState::HaveLocalOffer {
746                        return Err(RtcError::InvalidState(
747                            "set_remote_description(answer) requires local offer".into(),
748                        ));
749                    }
750                    let _ = state.send(SignalingState::Stable);
751                }
752                SdpType::Rollback | SdpType::Pranswer => {
753                    return Err(RtcError::NotImplemented("pranswer/rollback"));
754                }
755            }
756        }
757
758        {
759            let current_role = *self.inner.dtls_role.borrow();
760            if current_role.is_none() {
761                let mut new_role = None;
762                if self.config().transport_mode == TransportMode::Rtp
763                    || self.config().transport_mode == TransportMode::Srtp
764                {
765                    new_role = Some(true);
766                } else {
767                    for section in &desc.media_sections {
768                        for attr in &section.attributes {
769                            if attr.key == "setup"
770                                && let Some(val) = &attr.value
771                            {
772                                let is_client = match val.as_str() {
773                                    "active" => false,
774                                    "passive" => true,
775                                    "actpass" => false,
776                                    _ => true,
777                                };
778                                new_role = Some(is_client);
779                                break;
780                            }
781                        }
782                        if new_role.is_some() {
783                            break;
784                        }
785                    }
786                }
787                if let Some(r) = new_role {
788                    let _ = self.inner.dtls_role.send(Some(r));
789                }
790            }
791        }
792
793        // Start ICE
794        let mut ufrag = None;
795        let mut pwd = None;
796        let mut candidates = Vec::new();
797        let mut remote_addr = None;
798
799        // Check session-level attributes for ICE credentials
800        for attr in &desc.session.attributes {
801            if attr.key == "ice-ufrag" {
802                ufrag = attr.value.clone();
803            } else if attr.key == "ice-pwd" {
804                pwd = attr.value.clone();
805            }
806        }
807
808        for section in &desc.media_sections {
809            if self.config().transport_mode != TransportMode::WebRtc {
810                let conn_opt = section
811                    .connection
812                    .as_ref()
813                    .or(desc.session.connection.as_ref());
814                if let Some(conn) = conn_opt {
815                    let parts: Vec<&str> = conn.split_whitespace().collect();
816                    if parts.len() >= 3
817                        && parts[0] == "IN"
818                        && parts[1] == "IP4"
819                        && let Ok(ip) = parts[2].parse::<std::net::IpAddr>()
820                    {
821                        remote_addr = Some(std::net::SocketAddr::new(ip, section.port));
822                    }
823                }
824            }
825
826            for attr in &section.attributes {
827                if attr.key == "ice-ufrag" {
828                    ufrag = attr.value.clone();
829                } else if attr.key == "ice-pwd" {
830                    pwd = attr.value.clone();
831                } else if attr.key == "candidate"
832                    && let Some(val) = &attr.value
833                    && let Ok(c) = crate::transports::ice::IceCandidate::from_sdp(val)
834                {
835                    candidates.push(c);
836                }
837            }
838        }
839
840        if self.config().transport_mode == TransportMode::WebRtc {
841            if let (Some(u), Some(p)) = (ufrag, pwd) {
842                let params = crate::transports::ice::IceParameters {
843                    username_fragment: u,
844                    password: p,
845                    ice_lite: false,
846                    tie_breaker: 0,
847                };
848                self.inner
849                    .ice_transport
850                    .start(params)
851                    .map_err(|e| crate::RtcError::Internal(format!("ICE error: {}", e)))?;
852
853                for candidate in candidates {
854                    self.inner.ice_transport.add_remote_candidate(candidate);
855                }
856            }
857        } else if let Some(addr) = remote_addr {
858            self.inner
859                .ice_transport
860                .start_direct(addr)
861                .await
862                .map_err(|e| crate::RtcError::Internal(format!("ICE direct error: {}", e)))?;
863        }
864
865        // Create transceivers for new media sections in Offer
866        if desc.sdp_type == SdpType::Offer {
867            let mut transceivers = self.inner.transceivers.lock().unwrap();
868            for section in &desc.media_sections {
869                let mid = &section.mid;
870                let mut found_transceiver = None;
871                let mut newly_matched = false;
872
873                for t in transceivers.iter() {
874                    if let Some(t_mid) = t.mid()
875                        && t_mid == *mid
876                    {
877                        found_transceiver = Some(t.clone());
878                        break;
879                    }
880                }
881
882                if found_transceiver.is_none() {
883                    // Try to find a transceiver with no MID and same kind
884                    for t in transceivers.iter() {
885                        if t.mid().is_none() && t.kind() == section.kind {
886                            t.set_mid(mid.clone());
887                            found_transceiver = Some(t.clone());
888                            newly_matched = true;
889                            break;
890                        }
891                    }
892                }
893
894                let mut ssrc = None;
895                let mut simulcast = None;
896                let mut rids = Vec::new();
897                let mut rid_ext_id = None;
898                let mut abs_send_time_ext_id = None;
899                let mut fid_group = None;
900                let mut rtx_ssrc = None;
901
902                // First pass: check for ssrc-group FID
903                for attr in &section.attributes {
904                    if attr.key == "ssrc-group"
905                        && let Some(val) = &attr.value
906                        && val.starts_with("FID")
907                    {
908                        // Format: FID <primary> <rtx>
909                        let parts: Vec<&str> = val.split_whitespace().collect();
910                        if parts.len() >= 3 {
911                            if let Ok(primary) = parts[1].parse::<u32>() {
912                                fid_group = Some(primary);
913                                if let Ok(rtx) = parts[2].parse::<u32>() {
914                                    rtx_ssrc = Some(rtx);
915                                }
916                            }
917                        }
918                    }
919                }
920
921                for attr in &section.attributes {
922                    if attr.key == "ssrc" {
923                        if let Some(val) = &attr.value
924                            && let Some(ssrc_str) = val.split_whitespace().next()
925                            && let Ok(parsed) = ssrc_str.parse::<u32>()
926                        {
927                            // If we found a FID group, only accept the primary SSRC
928                            if let Some(primary) = fid_group {
929                                if parsed == primary {
930                                    ssrc = Some(parsed);
931                                }
932                            } else if ssrc.is_none() {
933                                // No FID group, take the first one
934                                ssrc = Some(parsed);
935                            }
936                        }
937                    } else if attr.key == "simulcast"
938                        && let Some(val) = &attr.value
939                    {
940                        simulcast = crate::sdp::Simulcast::parse(val);
941                    } else if attr.key == "rid"
942                        && let Some(val) = &attr.value
943                    {
944                        if let Some(rid) = crate::sdp::Rid::parse(val) {
945                            rids.push(rid);
946                        }
947                    } else if attr.key == "extmap"
948                        && let Some(val) = &attr.value
949                    {
950                        if val.contains("urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id") {
951                            if let Some(id_str) = val.split_whitespace().next() {
952                                if let Ok(id) = id_str.parse::<u8>() {
953                                    rid_ext_id = Some(id);
954                                }
955                            }
956                        } else if val.contains(crate::sdp::ABS_SEND_TIME_URI) {
957                            if let Some(id_str) = val.split_whitespace().next() {
958                                if let Ok(id) = id_str.parse::<u8>() {
959                                    abs_send_time_ext_id = Some(id);
960                                }
961                            }
962                        }
963                    }
964                }
965
966                if let Some(id) = rid_ext_id {
967                    if let Some(transport) = self.inner.rtp_transport.lock().unwrap().as_ref() {
968                        transport.set_rid_extension_id(Some(id));
969                    }
970                }
971
972                if let Some(id) = abs_send_time_ext_id {
973                    if let Some(transport) = self.inner.rtp_transport.lock().unwrap().as_ref() {
974                        transport.set_abs_send_time_extension_id(Some(id));
975                    }
976                }
977
978                if let Some(t) = found_transceiver {
979                    // Update transceiver parameters
980                    let payload_map = Self::extract_payload_map(section);
981                    if !payload_map.is_empty() {
982                        let _ = t.update_payload_map(payload_map);
983                    }
984                    let extmap = Self::extract_extmap(section);
985                    let _ = t.update_extmap(extmap);
986                    let direction: TransceiverDirection = section.direction.into();
987                    t.set_direction(direction);
988
989                    if let Some(ssrc_val) = ssrc {
990                        if let Some(rx) = t.receiver.lock().unwrap().as_ref() {
991                            rx.set_ssrc(ssrc_val);
992                            if let Some(rtx) = rtx_ssrc {
993                                rx.set_rtx_ssrc(rtx);
994                            }
995
996                            // Handle Simulcast
997                            if let Some(sim) = &simulcast {
998                                // For Offer, we look at 'send' direction (remote sends to us)
999                                for rid_id in &sim.send {
1000                                    let _ = rx.add_simulcast_track(rid_id.clone());
1001                                }
1002                            }
1003                        }
1004                    }
1005
1006                    if newly_matched {
1007                        if ssrc.is_some() {
1008                            if let Some(r) = t.receiver.lock().unwrap().as_ref() {
1009                                r.track_event_sent.store(true, Ordering::SeqCst);
1010                            }
1011                            let _ = self.inner.event_tx.send(PeerConnectionEvent::Track(t));
1012                        }
1013                    }
1014                } else {
1015                    let kind = section.kind;
1016                    let direction = if kind == MediaKind::Application {
1017                        TransceiverDirection::SendRecv
1018                    } else {
1019                        TransceiverDirection::RecvOnly
1020                    };
1021                    let t = Arc::new(RtpTransceiver::new(kind, direction));
1022                    t.set_mid(mid.clone());
1023
1024                    let receiver_ssrc = ssrc.unwrap_or_else(|| 2000 + transceivers.len() as u32);
1025
1026                    let mut builder = RtpReceiverBuilder::new(kind, receiver_ssrc)
1027                        .interceptor(self.inner.stats_collector.clone());
1028
1029                    let nack_enabled = if let Some(caps) = &self.inner.config.media_capabilities {
1030                        match kind {
1031                            MediaKind::Audio => caps
1032                                .audio
1033                                .iter()
1034                                .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
1035                            MediaKind::Video => caps
1036                                .video
1037                                .iter()
1038                                .any(|c| c.rtcp_fbs.contains(&"nack".to_string())),
1039                            _ => false,
1040                        }
1041                    } else {
1042                        match kind {
1043                            MediaKind::Audio => AudioCapability::default()
1044                                .rtcp_fbs
1045                                .contains(&"nack".to_string()),
1046                            MediaKind::Video => VideoCapability::default()
1047                                .rtcp_fbs
1048                                .contains(&"nack".to_string()),
1049                            _ => false,
1050                        }
1051                    };
1052
1053                    if nack_enabled {
1054                        debug!("NACK: enabled for new receiver mid={}", mid);
1055                        builder = builder.nack();
1056                    } else {
1057                        debug!("NACK: disabled for new receiver mid={}", mid);
1058                    }
1059                    let receiver = builder.build();
1060                    if let Some(rtx) = rtx_ssrc {
1061                        receiver.set_rtx_ssrc(rtx);
1062                    }
1063
1064                    // If transport is already active (renegotiation), attach it to the new receiver
1065                    {
1066                        let transport_guard = self.inner.rtp_transport.lock().unwrap();
1067                        if let Some(transport) = &*transport_guard {
1068                            receiver.set_transport(
1069                                transport.clone(),
1070                                Some(self.inner.event_tx.clone()),
1071                                Some(Arc::downgrade(&t)),
1072                            );
1073                        } else {
1074                            debug!(
1075                                "No existing transport to attach to new receiver mid={}",
1076                                mid
1077                            );
1078                        }
1079                    }
1080
1081                    // Handle Simulcast for new transceiver
1082                    if let Some(sim) = &simulcast {
1083                        for rid_id in &sim.send {
1084                            let _ = receiver.add_simulcast_track(rid_id.clone());
1085                        }
1086                    }
1087
1088                    *t.receiver.lock().unwrap() = Some(receiver);
1089
1090                    transceivers.push(t.clone());
1091
1092                    if ssrc.is_some() {
1093                        if let Some(r) = t.receiver.lock().unwrap().as_ref() {
1094                            r.track_event_sent.store(true, Ordering::SeqCst);
1095                        }
1096                        let _ = self.inner.event_tx.send(PeerConnectionEvent::Track(t));
1097                    }
1098                }
1099            }
1100        } else if desc.sdp_type == SdpType::Answer {
1101            let transceivers = self.inner.transceivers.lock().unwrap();
1102            for section in &desc.media_sections {
1103                let mid = &section.mid;
1104                let mut found_transceiver = None;
1105                for t in transceivers.iter() {
1106                    if let Some(t_mid) = t.mid()
1107                        && t_mid == *mid
1108                    {
1109                        found_transceiver = Some(t);
1110                        break;
1111                    }
1112                }
1113
1114                if let Some(t) = found_transceiver {
1115                    // Update transceiver parameters
1116                    let payload_map = Self::extract_payload_map(section);
1117                    if !payload_map.is_empty() {
1118                        let _ = t.update_payload_map(payload_map);
1119                    }
1120                    let extmap = Self::extract_extmap(section);
1121                    let _ = t.update_extmap(extmap);
1122                    let direction: TransceiverDirection = section.direction.into();
1123                    t.set_direction(direction);
1124
1125                    let mut ssrc = None;
1126                    for attr in &section.attributes {
1127                        if attr.key == "ssrc"
1128                            && ssrc.is_none()
1129                            && let Some(val) = &attr.value
1130                            && let Some(ssrc_str) = val.split_whitespace().next()
1131                            && let Ok(parsed) = ssrc_str.parse::<u32>()
1132                        {
1133                            ssrc = Some(parsed);
1134                            break;
1135                        }
1136                    }
1137
1138                    if let Some(ssrc_val) = ssrc
1139                        && let Some(rx) = t.receiver.lock().unwrap().as_ref()
1140                    {
1141                        rx.set_ssrc(ssrc_val);
1142                    }
1143                }
1144            }
1145        }
1146
1147        let mut remote = self.inner.remote_description.lock().unwrap();
1148        *remote = Some(desc);
1149
1150        Ok(())
1151    }
1152
1153    pub(crate) async fn start_dtls(
1154        &self,
1155        is_client: bool,
1156    ) -> RtcResult<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>> {
1157        debug!("start_dtls: starting with is_client={}", is_client);
1158        let pair = self
1159            .inner
1160            .ice_transport
1161            .get_selected_pair()
1162            .await
1163            .ok_or(RtcError::Internal("No selected pair".into()))?;
1164
1165        let socket_rx = self.inner.ice_transport.subscribe_selected_socket();
1166
1167        // Create IceConn and register it immediately to avoid dropping packets
1168        let ice_conn = IceConn::new(socket_rx.clone(), pair.remote.address);
1169
1170        // Monitor selected pair changes to update remote address
1171        let mut pair_rx = self.inner.ice_transport.subscribe_selected_pair();
1172        let ice_conn_monitor = ice_conn.clone();
1173
1174        if self.config().transport_mode != TransportMode::WebRtc {
1175            let rtcp_addr = {
1176                let remote_desc = self.inner.remote_description.lock().unwrap();
1177                if let Some(desc) = &*remote_desc {
1178                    // Check if rtcp-mux is enabled in the first media section
1179                    // If not, set rtcp_addr = remote_addr port + 1
1180                    // Note: This assumes all media sections follow the same mux policy or we only care about the first one for now.
1181                    // In a proper implementation, we might need per-transceiver transport or bundle handling.
1182                    if let Some(section) = desc.media_sections.first() {
1183                        let has_mux = section.attributes.iter().any(|a| a.key == "rtcp-mux");
1184                        if !has_mux {
1185                            let mut addr = pair.remote.address;
1186                            addr.set_port(addr.port() + 1);
1187                            Some(addr)
1188                        } else {
1189                            None
1190                        }
1191                    } else {
1192                        None
1193                    }
1194                } else {
1195                    None
1196                }
1197            };
1198
1199            if let Some(addr) = rtcp_addr {
1200                ice_conn.set_remote_rtcp_addr(Some(addr));
1201                debug!("RTCP-MUX not detected, setting RTCP address to {}", addr);
1202            }
1203        }
1204
1205        let srtp_required = self.config().transport_mode != TransportMode::Rtp;
1206        let allow_ssrc_change = self.config().enable_latching;
1207        let rtp_transport = Arc::new(RtpTransport::new_with_ssrc_change(
1208            ice_conn.clone(),
1209            srtp_required,
1210            allow_ssrc_change,
1211        ));
1212        {
1213            let mut rx = ice_conn.rtp_receiver.write().unwrap();
1214            *rx = Some(Arc::downgrade(&rtp_transport)
1215                as std::sync::Weak<dyn crate::transports::PacketReceiver>);
1216        }
1217        *self.inner.rtp_transport.lock().unwrap() = Some(rtp_transport.clone());
1218
1219        {
1220            let transceivers = self.inner.transceivers.lock().unwrap();
1221            for t in transceivers.iter() {
1222                // Store transport reference for late senders
1223                t.set_rtp_transport(Arc::downgrade(&rtp_transport));
1224
1225                let receiver_arc = t.receiver.lock().unwrap().clone();
1226                if let Some(receiver) = &receiver_arc {
1227                    receiver.set_transport(
1228                        rtp_transport.clone(),
1229                        Some(self.inner.event_tx.clone()),
1230                        Some(Arc::downgrade(&t)),
1231                    );
1232                }
1233            }
1234        }
1235
1236        self.inner
1237            .ice_transport
1238            .set_data_receiver(ice_conn.clone())
1239            .await;
1240
1241        if self.config().transport_mode == TransportMode::Srtp {
1242            self.setup_sdes(&rtp_transport)?;
1243            let rtcp_loop = Self::create_rtcp_loop(
1244                rtp_transport.clone(),
1245                Arc::downgrade(&self.inner),
1246                self.inner.stats_collector.clone(),
1247            );
1248            let pair_monitor = Self::create_pair_monitor(pair_rx.clone(), ice_conn_monitor.clone());
1249            let combined_loop = async move {
1250                tokio::select! {
1251                    _ = rtcp_loop => {},
1252                    _ = pair_monitor => {},
1253                }
1254            };
1255            return Ok(Box::pin(combined_loop) as Pin<Box<dyn Future<Output = ()> + Send>>);
1256        }
1257
1258        if self.config().transport_mode == TransportMode::Rtp {
1259            let rtcp_loop = Self::create_rtcp_loop(
1260                rtp_transport.clone(),
1261                Arc::downgrade(&self.inner),
1262                self.inner.stats_collector.clone(),
1263            );
1264
1265            let transceivers = self.inner.transceivers.lock().unwrap();
1266            for t in transceivers.iter() {
1267                let sender_arc = t.sender.lock().unwrap().clone();
1268                let receiver_arc = t.receiver.lock().unwrap().clone();
1269
1270                // Set sender transport
1271                if let Some(sender) = &sender_arc {
1272                    let mid_opt = t.mid();
1273                    trace!(
1274                        "start_dtls: transceiver kind={:?} mid={:?}",
1275                        t.kind(),
1276                        mid_opt
1277                    );
1278                    sender.set_transport(rtp_transport.clone());
1279                }
1280
1281                // Set feedback SSRC (receiver transport already set above)
1282                if let Some(receiver) = &receiver_arc {
1283                    if let Some(sender) = &sender_arc {
1284                        receiver.set_feedback_ssrc(sender.ssrc());
1285                    }
1286                }
1287            }
1288            let pair_monitor = Self::create_pair_monitor(pair_rx.clone(), ice_conn_monitor.clone());
1289            let combined_loop = async move {
1290                tokio::select! {
1291                    _ = rtcp_loop => {},
1292                    _ = pair_monitor => {},
1293                }
1294            };
1295            return Ok(Box::pin(combined_loop) as Pin<Box<dyn Future<Output = ()> + Send>>);
1296        }
1297
1298        let (dtls, incoming_data_rx, dtls_runner) = DtlsTransport::new(
1299            ice_conn,
1300            self.inner.certificate.as_ref().clone(),
1301            is_client,
1302            self.config().dtls_buffer_size,
1303        )
1304        .await
1305        .map_err(|e| RtcError::Internal(format!("DTLS failed: {}", e)))?;
1306
1307        let sctp_port = if let Some(caps) = &self.config().media_capabilities {
1308            if let Some(app) = &caps.application {
1309                app.sctp_port
1310            } else {
1311                5000
1312            }
1313        } else {
1314            5000
1315        };
1316
1317        let sctp_needed = {
1318            let remote = self.inner.remote_description.lock().unwrap();
1319            if let Some(desc) = &*remote {
1320                desc.media_sections
1321                    .iter()
1322                    .any(|m| m.kind == MediaKind::Application)
1323            } else {
1324                false
1325            }
1326        };
1327
1328        let (dc_tx, mut dc_rx) = mpsc::unbounded_channel();
1329
1330        let mut sctp_runner: Pin<Box<dyn Future<Output = ()> + Send>>;
1331
1332        if sctp_needed {
1333            let (sctp, runner) = SctpTransport::new(
1334                dtls.clone(),
1335                incoming_data_rx,
1336                self.inner.data_channels.clone(),
1337                sctp_port,
1338                sctp_port,
1339                Some(dc_tx),
1340                is_client,
1341                self.config(),
1342            );
1343            *self.inner.sctp_transport.lock().unwrap() = Some(sctp);
1344            sctp_runner = Box::pin(runner);
1345        } else {
1346            drop(incoming_data_rx);
1347            sctp_runner = Box::pin(std::future::pending());
1348        }
1349
1350        *self.inner.dtls_transport.lock().unwrap() = Some(dtls.clone());
1351
1352        let dtls_clone = dtls.clone();
1353        let rtp_transport_clone = rtp_transport.clone();
1354        let inner_weak = Arc::downgrade(&self.inner);
1355        let stats_collector = self.inner.stats_collector.clone();
1356
1357        let mut dtls_runner: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(dtls_runner);
1358
1359        let inner_weak_dc = inner_weak.clone();
1360        let dc_listener = async move {
1361            while let Some(dc) = dc_rx.recv().await {
1362                if let Some(inner) = inner_weak_dc.upgrade() {
1363                    let _ = inner.event_tx.send(PeerConnectionEvent::DataChannel(dc));
1364                } else {
1365                    break;
1366                }
1367            }
1368        };
1369        let mut dc_listener: Pin<Box<dyn Future<Output = ()> + Send>> = if sctp_needed {
1370            Box::pin(dc_listener)
1371        } else {
1372            Box::pin(std::future::pending())
1373        };
1374
1375        let mut state_rx = dtls_clone.subscribe_state();
1376        loop {
1377            let state = state_rx.borrow().clone();
1378            match state {
1379                crate::transports::dtls::DtlsState::Connected(_, profile_opt) => {
1380                    self.setup_srtp(&dtls_clone, is_client, profile_opt, &rtp_transport_clone);
1381
1382                    let rtcp_loop = Self::create_rtcp_loop(
1383                        rtp_transport_clone.clone(),
1384                        inner_weak.clone(),
1385                        stats_collector.clone(),
1386                    );
1387
1388                    let pair_monitor =
1389                        Self::create_pair_monitor(pair_rx.clone(), ice_conn_monitor.clone());
1390
1391                    let combined: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
1392                        tokio::select! {
1393                            _ = rtcp_loop => {},
1394                            _ = dtls_runner => {},
1395                            _ = sctp_runner => {},
1396                            _ = dc_listener => {},
1397                            _ = pair_monitor => {},
1398                        }
1399                    });
1400                    return Ok(combined);
1401                }
1402                crate::transports::dtls::DtlsState::Failed => {
1403                    return Err(RtcError::Internal("DTLS handshake failed".into()));
1404                }
1405                _ => {}
1406            }
1407
1408            tokio::select! {
1409                _ = &mut dtls_runner => {
1410                     return Err(RtcError::Internal("DTLS runner stopped unexpectedly".into()));
1411                }
1412                _ = &mut sctp_runner => {
1413                     return Err(RtcError::Internal("SCTP runner stopped unexpectedly".into()));
1414                }
1415                _ = &mut dc_listener => {
1416                     debug!("DataChannel listener stopped unexpectedly");
1417                     return Err(RtcError::Internal("DataChannel listener stopped unexpectedly".into()));
1418                }
1419                res = state_rx.changed() => {
1420                    if res.is_err() { break; }
1421                }
1422                res = pair_rx.changed() => {
1423                    if res.is_ok() {
1424                        if let Some(pair) = pair_rx.borrow().clone() {
1425                            if let Ok(mut addr) = ice_conn_monitor.remote_addr.write() {
1426                                *addr = pair.remote.address;
1427                            }
1428                        }
1429                    }
1430                }
1431            }
1432        }
1433
1434        Ok(Box::pin(async {}) as Pin<Box<dyn Future<Output = ()> + Send>>)
1435    }
1436
1437    fn setup_sdes(&self, rtp_transport: &Arc<RtpTransport>) -> RtcResult<()> {
1438        let (tx_keying, rx_keying, profile) = {
1439            let remote_desc = self.inner.remote_description.lock().unwrap();
1440            let local_desc = self.inner.local_description.lock().unwrap();
1441
1442            let remote_crypto = remote_desc
1443                .as_ref()
1444                .and_then(|d| d.media_sections.first())
1445                .and_then(|m| m.get_crypto_attributes().into_iter().next());
1446
1447            let local_crypto = local_desc
1448                .as_ref()
1449                .and_then(|d| d.media_sections.first())
1450                .and_then(|m| m.get_crypto_attributes().into_iter().next());
1451
1452            if let (Some(remote), Some(local)) = (remote_crypto, local_crypto) {
1453                let profile = map_crypto_suite(&remote.crypto_suite)?;
1454                if profile != map_crypto_suite(&local.crypto_suite)? {
1455                    return Err(RtcError::Internal("Crypto suite mismatch".into()));
1456                }
1457
1458                let rx_key_salt = parse_sdes_key_params(&remote.key_params)?;
1459                let tx_key_salt = parse_sdes_key_params(&local.key_params)?;
1460
1461                let (key_len, salt_len) = match profile {
1462                    crate::srtp::SrtpProfile::Aes128Sha1_80
1463                    | crate::srtp::SrtpProfile::Aes128Sha1_32 => (16, 14),
1464                    crate::srtp::SrtpProfile::AeadAes128Gcm => (16, 12),
1465                    _ => (16, 14),
1466                };
1467
1468                if rx_key_salt.len() < key_len + salt_len || tx_key_salt.len() < key_len + salt_len
1469                {
1470                    return Err(RtcError::Internal("Invalid key length".into()));
1471                }
1472
1473                let rx_keying = crate::srtp::SrtpKeyingMaterial::new(
1474                    rx_key_salt[..key_len].to_vec(),
1475                    rx_key_salt[key_len..key_len + salt_len].to_vec(),
1476                );
1477                let tx_keying = crate::srtp::SrtpKeyingMaterial::new(
1478                    tx_key_salt[..key_len].to_vec(),
1479                    tx_key_salt[key_len..key_len + salt_len].to_vec(),
1480                );
1481
1482                (tx_keying, rx_keying, profile)
1483            } else {
1484                return Err(RtcError::Internal(
1485                    "Missing crypto attributes for SDES".into(),
1486                ));
1487            }
1488        };
1489
1490        let session = crate::srtp::SrtpSession::new(profile, tx_keying, rx_keying)
1491            .map_err(|e| RtcError::Internal(format!("SRTP error: {}", e)))?;
1492
1493        rtp_transport.start_srtp(session);
1494
1495        let transceivers = self.inner.transceivers.lock().unwrap();
1496        for t in transceivers.iter() {
1497            let sender_arc = t.sender.lock().unwrap().clone();
1498            let receiver_arc = t.receiver.lock().unwrap().clone();
1499
1500            if let Some(sender) = &sender_arc {
1501                sender.set_transport(rtp_transport.clone());
1502            }
1503
1504            if let Some(receiver) = &receiver_arc {
1505                receiver.set_transport(
1506                    rtp_transport.clone(),
1507                    Some(self.inner.event_tx.clone()),
1508                    Some(Arc::downgrade(&t)),
1509                );
1510                if let Some(sender) = &sender_arc {
1511                    receiver.set_feedback_ssrc(sender.ssrc());
1512                }
1513            }
1514        }
1515
1516        *self.inner.rtp_transport.lock().unwrap() = Some(rtp_transport.clone());
1517        Ok(())
1518    }
1519
1520    fn setup_srtp(
1521        &self,
1522        dtls: &DtlsTransport,
1523        is_client: bool,
1524        profile_opt: Option<u16>,
1525        rtp_transport: &Arc<RtpTransport>,
1526    ) {
1527        // Default to Aes128Sha1_80 if not specified or unknown
1528        let profile = match profile_opt {
1529            Some(0x0001) => crate::srtp::SrtpProfile::Aes128Sha1_80,
1530            Some(0x0002) => crate::srtp::SrtpProfile::Aes128Sha1_32,
1531            Some(0x0007) => crate::srtp::SrtpProfile::AeadAes128Gcm,
1532            _ => crate::srtp::SrtpProfile::Aes128Sha1_80,
1533        };
1534
1535        let key_len = match profile {
1536            crate::srtp::SrtpProfile::AeadAes128Gcm => 16,
1537            _ => 16,
1538        };
1539        let salt_len = match profile {
1540            crate::srtp::SrtpProfile::AeadAes128Gcm => 12,
1541            _ => 14,
1542        };
1543
1544        let total_len = 2 * (key_len + salt_len);
1545
1546        if let Ok(mat) = dtls.export_keying_material("EXTRACTOR-dtls_srtp", total_len) {
1547            let client_key = &mat[0..key_len];
1548            let server_key = &mat[key_len..2 * key_len];
1549            let client_salt = &mat[2 * key_len..2 * key_len + salt_len];
1550            let server_salt = &mat[2 * key_len + salt_len..];
1551
1552            let (tx_key, tx_salt, rx_key, rx_salt) = if is_client {
1553                (client_key, client_salt, server_key, server_salt)
1554            } else {
1555                (server_key, server_salt, client_key, client_salt)
1556            };
1557
1558            let tx_keying = crate::srtp::SrtpKeyingMaterial::new(tx_key.to_vec(), tx_salt.to_vec());
1559            let rx_keying = crate::srtp::SrtpKeyingMaterial::new(rx_key.to_vec(), rx_salt.to_vec());
1560
1561            match crate::srtp::SrtpSession::new(profile, tx_keying, rx_keying) {
1562                Ok(session) => {
1563                    rtp_transport.start_srtp(session);
1564
1565                    let transceivers = self.inner.transceivers.lock().unwrap();
1566                    for t in transceivers.iter() {
1567                        let sender_arc = t.sender.lock().unwrap().clone();
1568                        let receiver_arc = t.receiver.lock().unwrap().clone();
1569
1570                        if let Some(sender) = &sender_arc {
1571                            let mid_opt = t.mid();
1572                            trace!(
1573                                "start_dtls: transceiver kind={:?} mid={:?}",
1574                                t.kind(),
1575                                mid_opt
1576                            );
1577                            sender.set_transport(rtp_transport.clone());
1578                        }
1579
1580                        if let Some(receiver) = &receiver_arc {
1581                            receiver.set_transport(
1582                                rtp_transport.clone(),
1583                                Some(self.inner.event_tx.clone()),
1584                                Some(Arc::downgrade(&t)),
1585                            );
1586                            if let Some(sender) = &sender_arc {
1587                                receiver.set_feedback_ssrc(sender.ssrc());
1588                            }
1589                        }
1590                    }
1591
1592                    // Update the inner transport to ensure future transceivers get the correct one
1593                    *self.inner.rtp_transport.lock().unwrap() = Some(rtp_transport.clone());
1594                }
1595                Err(e) => {
1596                    debug!("Failed to create SRTP session: {}", e);
1597                }
1598            }
1599        } else {
1600            debug!(
1601                "Failed to export keying material - DTLS state: {}",
1602                dtls.get_state()
1603            );
1604        }
1605    }
1606
1607    fn create_rtcp_loop(
1608        rtp_transport: Arc<RtpTransport>,
1609        inner_weak: Weak<PeerConnectionInner>,
1610        stats_collector: Arc<StatsCollector>,
1611    ) -> impl Future<Output = ()> + Send {
1612        let (rtcp_tx, mut rtcp_rx) = mpsc::channel(2000);
1613        rtp_transport.register_rtcp_listener(rtcp_tx);
1614
1615        async move {
1616            while let Some(packets) = rtcp_rx.recv().await {
1617                for packet in packets {
1618                    // Log every RTCP packet to debug
1619                    match &packet {
1620                        RtcpPacket::PictureLossIndication(_) => {}
1621                        RtcpPacket::GenericNack(n) => {
1622                            trace!("RTCP Loop: Got NACK for SSRC {}", n.media_ssrc)
1623                        }
1624                        RtcpPacket::ReceiverReport(rr) => trace!(
1625                            "RTCP Loop: Got RR for SSRC count {}",
1626                            rr.report_blocks.len()
1627                        ),
1628                        RtcpPacket::SenderReport(sr) => {
1629                            trace!("RTCP Loop: Got SR for SSRC {}", sr.sender_ssrc)
1630                        }
1631                        _ => trace!("RTCP Loop: Got packet {:?}", packet),
1632                    }
1633
1634                    stats_collector.process_rtcp(&packet);
1635                    let Some(inner) = inner_weak.upgrade() else {
1636                        return;
1637                    };
1638                    {
1639                        let transceivers = inner.transceivers.lock().unwrap();
1640                        for t in transceivers.iter() {
1641                            if let Some(sender) = &*t.sender.lock().unwrap() {
1642                                let is_for_sender = match &packet {
1643                                    RtcpPacket::PictureLossIndication(p) => {
1644                                        if p.media_ssrc == sender.ssrc() {
1645                                            debug!("Received PLI for SSRC: {}", p.media_ssrc);
1646                                            true
1647                                        } else {
1648                                            false
1649                                        }
1650                                    }
1651                                    RtcpPacket::GenericNack(n) => n.media_ssrc == sender.ssrc(),
1652                                    _ => false,
1653                                };
1654
1655                                if is_for_sender {
1656                                    sender.deliver_rtcp(packet.clone());
1657                                }
1658                            }
1659                        }
1660                    }
1661                }
1662            }
1663        }
1664    }
1665
1666    fn create_pair_monitor(
1667        mut pair_rx: watch::Receiver<Option<crate::transports::ice::IceCandidatePair>>,
1668        ice_conn_monitor: Arc<IceConn>,
1669    ) -> impl Future<Output = ()> + Send {
1670        async move {
1671            if let Some(pair) = pair_rx.borrow().clone() {
1672                if let Ok(mut addr) = ice_conn_monitor.remote_addr.write() {
1673                    trace!(
1674                        "PeerConnection: pair_monitor initial update: {} -> {}",
1675                        *addr, pair.remote.address
1676                    );
1677                    *addr = pair.remote.address;
1678                }
1679            }
1680            while pair_rx.changed().await.is_ok() {
1681                if let Some(pair) = pair_rx.borrow().clone() {
1682                    let old_addr = if let Ok(addr) = ice_conn_monitor.remote_addr.read() {
1683                        *addr
1684                    } else {
1685                        "0.0.0.0:0".parse().unwrap()
1686                    };
1687                    if let Ok(mut addr_guard) = ice_conn_monitor.remote_addr.write() {
1688                        trace!(
1689                            "PeerConnection: pair_monitor update: {} -> {}",
1690                            old_addr, pair.remote.address
1691                        );
1692                        *addr_guard = pair.remote.address;
1693                    }
1694                }
1695            }
1696        }
1697    }
1698
1699    pub fn signaling_state(&self) -> SignalingState {
1700        *self.inner.signaling_state.borrow()
1701    }
1702
1703    pub fn subscribe_signaling_state(&self) -> watch::Receiver<SignalingState> {
1704        self.inner.signaling_state.subscribe()
1705    }
1706
1707    pub fn subscribe_peer_state(&self) -> watch::Receiver<PeerConnectionState> {
1708        self.inner.peer_state.subscribe()
1709    }
1710
1711    pub async fn wait_for_connected(&self) -> RtcResult<()> {
1712        let mut peer_state_rx = self.subscribe_peer_state();
1713        loop {
1714            let state = *peer_state_rx.borrow_and_update();
1715            if state == PeerConnectionState::Connected {
1716                return Ok(());
1717            }
1718            if state == PeerConnectionState::Failed || state == PeerConnectionState::Closed {
1719                return Err(RtcError::Internal(format!(
1720                    "Peer connection failed or closed: {:?}",
1721                    state
1722                )));
1723            }
1724            if peer_state_rx.changed().await.is_err() {
1725                return Err(RtcError::Internal("Peer state channel closed".into()));
1726            }
1727        }
1728    }
1729
1730    pub fn subscribe_ice_connection_state(&self) -> watch::Receiver<IceConnectionState> {
1731        self.inner.ice_connection_state.subscribe()
1732    }
1733
1734    pub fn subscribe_ice_gathering_state(&self) -> watch::Receiver<IceGatheringState> {
1735        self.inner.ice_gathering_state.subscribe()
1736    }
1737
1738    pub fn local_description(&self) -> Option<SessionDescription> {
1739        self.inner.local_description.lock().unwrap().clone()
1740    }
1741
1742    pub fn remote_description(&self) -> Option<SessionDescription> {
1743        self.inner.remote_description.lock().unwrap().clone()
1744    }
1745
1746    pub fn close(&self) {
1747        self.inner.close();
1748    }
1749
1750    pub async fn recv(&self) -> Option<PeerConnectionEvent> {
1751        let mut rx = self.inner.event_rx.lock().await;
1752        rx.recv().await
1753    }
1754
1755    pub fn create_data_channel(
1756        &self,
1757        label: &str,
1758        config: Option<crate::transports::sctp::DataChannelConfig>,
1759    ) -> RtcResult<Arc<crate::transports::sctp::DataChannel>> {
1760        // Ensure we have an application transceiver for negotiation
1761        let has_app_transceiver = {
1762            let transceivers = self.inner.transceivers.lock().unwrap();
1763            transceivers
1764                .iter()
1765                .any(|t| t.kind() == MediaKind::Application)
1766        };
1767
1768        if !has_app_transceiver {
1769            self.add_transceiver(MediaKind::Application, TransceiverDirection::SendRecv);
1770        }
1771
1772        let mut config = config.unwrap_or_default();
1773        config.label = label.to_string();
1774
1775        let id = if let Some(negotiated_id) = config.negotiated {
1776            negotiated_id
1777        } else {
1778            let is_client = self.inner.dtls_role.borrow().unwrap_or(true);
1779            let offset = if is_client { 0 } else { 1 };
1780
1781            let channels = self.inner.data_channels.lock().unwrap();
1782            let mut id = offset;
1783            loop {
1784                let mut used = false;
1785                for weak_dc in channels.iter() {
1786                    if let Some(dc) = weak_dc.upgrade() {
1787                        if dc.id == id {
1788                            used = true;
1789                            break;
1790                        }
1791                    }
1792                }
1793                if !used {
1794                    break;
1795                }
1796                id += 2;
1797            }
1798            id
1799        };
1800
1801        let dc = Arc::new(crate::transports::sctp::DataChannel::new(
1802            id,
1803            config.clone(),
1804        ));
1805
1806        self.inner
1807            .data_channels
1808            .lock()
1809            .unwrap()
1810            .push(Arc::downgrade(&dc));
1811
1812        if !dc.negotiated {
1813            let transport = self.inner.sctp_transport.lock().unwrap().clone();
1814            if let Some(transport) = transport {
1815                let dc_clone = dc.clone();
1816                tokio::spawn(async move {
1817                    if let Err(e) = transport.send_dcep_open(&dc_clone).await {
1818                        debug!("Failed to send DCEP OPEN: {}", e);
1819                    }
1820                });
1821            }
1822        }
1823
1824        Ok(dc)
1825    }
1826
1827    pub async fn send_data(&self, channel_id: u16, data: &[u8]) -> RtcResult<()> {
1828        let transport = self.inner.sctp_transport.lock().unwrap().clone();
1829        if let Some(transport) = transport {
1830            transport
1831                .send_data(channel_id, data)
1832                .await
1833                .map_err(|e| RtcError::Internal(format!("SCTP send failed: {}", e)))
1834        } else {
1835            Err(RtcError::InvalidState("SCTP not connected".into()))
1836        }
1837    }
1838
1839    pub async fn send_text(&self, channel_id: u16, data: impl AsRef<str>) -> RtcResult<()> {
1840        let transport = self.inner.sctp_transport.lock().unwrap().clone();
1841        if let Some(transport) = transport {
1842            transport
1843                .send_text(channel_id, data)
1844                .await
1845                .map_err(|e| RtcError::Internal(format!("SCTP send failed: {}", e)))
1846        } else {
1847            Err(RtcError::InvalidState("SCTP not connected".into()))
1848        }
1849    }
1850
1851    pub async fn sctp_buffered_amount(&self) -> usize {
1852        let transport = self.inner.sctp_transport.lock().unwrap().clone();
1853        if let Some(transport) = transport {
1854            transport.buffered_amount()
1855        } else {
1856            0
1857        }
1858    }
1859
1860    pub async fn get_stats(&self) -> RtcResult<StatsReport> {
1861        gather_once(&[self.inner.stats_collector.clone()]).await
1862    }
1863
1864    pub async fn wait_for_gathering_complete(&self) {
1865        let _ = self.inner.ice_transport.start_gathering();
1866        let mut rx = self.subscribe_ice_gathering_state();
1867        loop {
1868            if *rx.borrow_and_update() == IceGatheringState::Complete {
1869                return;
1870            }
1871            if rx.changed().await.is_err() {
1872                return;
1873            }
1874        }
1875    }
1876
1877    pub fn subscribe_ice_candidates(&self) -> broadcast::Receiver<IceCandidate> {
1878        self.inner.ice_transport.subscribe_candidates()
1879    }
1880
1881    pub fn add_ice_candidate(&self, candidate: IceCandidate) -> RtcResult<()> {
1882        self.inner.ice_transport.add_remote_candidate(candidate);
1883        Ok(())
1884    }
1885
1886    /// Handle reinvite - update RTP parameters without recreating tracks
1887    async fn handle_reinvite(&self, new_desc: &SessionDescription) -> RtcResult<()> {
1888        debug!("Handling reinvite: updating RTP parameters");
1889
1890        let transceivers = self.inner.transceivers.lock().unwrap().clone();
1891
1892        // Extract RTP parameter changes for each media section
1893        for section in &new_desc.media_sections {
1894            // Find matching transceiver by mid
1895            let transceiver = transceivers
1896                .iter()
1897                .find(|t| t.mid().as_ref() == Some(&section.mid));
1898
1899            if let Some(t) = transceiver {
1900                // Check SSRC change (indicates new track, not reinvite)
1901                if let Some(receiver) = t.receiver() {
1902                    let new_ssrc = Self::extract_ssrc_from_section(section);
1903                    if let Some(new_ssrc) = new_ssrc {
1904                        let old_ssrc = receiver.ssrc();
1905                        if old_ssrc != new_ssrc {
1906                            if old_ssrc != 0 {
1907                                debug!(
1908                                    "SSRC changed for mid={} ({} -> {}), updating listener",
1909                                    section.mid, old_ssrc, new_ssrc
1910                                );
1911                            } else {
1912                                debug!(
1913                                    "SSRC learned for mid={} (-> {}), updating listener",
1914                                    section.mid, new_ssrc
1915                                );
1916                            }
1917                            receiver.set_ssrc(new_ssrc);
1918                        }
1919                    } else {
1920                        // If no SSRC in SDP, re-enable provisional listener
1921                        // to handle potential SSRC changes during reinvite
1922                        receiver.ensure_provisional_listener();
1923                    }
1924                }
1925
1926                // Extract and validate payload type mapping
1927                let payload_map = Self::extract_payload_map(section);
1928                if !payload_map.is_empty() {
1929                    // Basic validation: check if we support these codecs
1930                    for (pt, params) in &payload_map {
1931                        trace!("Validating PT {}: clock_rate={}", pt, params.clock_rate);
1932                        // TODO: Add full codec capability check against local capabilities
1933                    }
1934                    t.update_payload_map(payload_map)?;
1935                }
1936
1937                // Extract and update extension mapping
1938                let extmap = Self::extract_extmap(section);
1939                t.update_extmap(extmap)?;
1940
1941                // Handle direction changes
1942                let new_direction: TransceiverDirection = section.direction.into();
1943                let old_direction = t.direction();
1944                if new_direction != old_direction {
1945                    debug!(
1946                        "Direction changed for mid={}: {:?} -> {:?}",
1947                        section.mid, old_direction, new_direction
1948                    );
1949                    t.set_direction(new_direction);
1950                    Self::apply_direction_change(t, old_direction, new_direction).await?;
1951                }
1952            }
1953        }
1954
1955        // Update remote description
1956        *self.inner.remote_description.lock().unwrap() = Some(new_desc.clone());
1957
1958        debug!("Reinvite completed successfully");
1959        Ok(())
1960    }
1961
1962    /// Extract payload type to codec parameters mapping from media section
1963    fn extract_payload_map(section: &crate::MediaSection) -> HashMap<u8, RtpCodecParameters> {
1964        let mut payload_map = HashMap::new();
1965
1966        // Parse rtpmap attributes: "96 opus/48000/2"
1967        for attr in &section.attributes {
1968            if attr.key == "rtpmap" {
1969                if let Some(val) = &attr.value {
1970                    let parts: Vec<&str> = val.split_whitespace().collect();
1971                    if parts.len() >= 2 {
1972                        if let Ok(pt) = parts[0].parse::<u8>() {
1973                            // Parse codec/rate/channels
1974                            let codec_parts: Vec<&str> = parts[1].split('/').collect();
1975                            if codec_parts.len() >= 2 {
1976                                let clock_rate = codec_parts[1].parse().unwrap_or(90000);
1977                                let channels = if codec_parts.len() >= 3 {
1978                                    codec_parts[2].parse().unwrap_or(0)
1979                                } else {
1980                                    0
1981                                };
1982
1983                                payload_map.insert(
1984                                    pt,
1985                                    RtpCodecParameters {
1986                                        payload_type: pt,
1987                                        clock_rate,
1988                                        channels,
1989                                    },
1990                                );
1991                            }
1992                        }
1993                    }
1994                }
1995            }
1996        }
1997
1998        payload_map
1999    }
2000
2001    /// Extract extension header mapping from media section
2002    fn extract_extmap(section: &crate::MediaSection) -> HashMap<u8, String> {
2003        let mut extmap = HashMap::new();
2004
2005        // Parse extmap attributes: "1 urn:ietf:params:rtp-hdrext:ssrc-audio-level"
2006        for attr in &section.attributes {
2007            if attr.key == "extmap" {
2008                if let Some(val) = &attr.value {
2009                    let parts: Vec<&str> = val.split_whitespace().collect();
2010                    if parts.len() >= 2 {
2011                        if let Ok(id) = parts[0].parse::<u8>() {
2012                            extmap.insert(id, parts[1].to_string());
2013                        }
2014                    }
2015                }
2016            }
2017        }
2018
2019        extmap
2020    }
2021
2022    /// Extract SSRC from media section
2023    fn extract_ssrc_from_section(section: &crate::MediaSection) -> Option<u32> {
2024        // Parse a=ssrc:<ssrc> <attribute>:<value>
2025        for attr in &section.attributes {
2026            if attr.key == "ssrc" {
2027                if let Some(val) = &attr.value {
2028                    if let Some(ssrc_str) = val.split_whitespace().next() {
2029                        if let Ok(ssrc) = ssrc_str.parse::<u32>() {
2030                            return Some(ssrc);
2031                        }
2032                    }
2033                }
2034            }
2035        }
2036        None
2037    }
2038
2039    /// Apply direction change side effects
2040    async fn apply_direction_change(
2041        transceiver: &RtpTransceiver,
2042        old_direction: TransceiverDirection,
2043        new_direction: TransceiverDirection,
2044    ) -> RtcResult<()> {
2045        let old_sends = match old_direction {
2046            TransceiverDirection::SendRecv | TransceiverDirection::SendOnly => true,
2047            _ => false,
2048        };
2049        let new_sends = match new_direction {
2050            TransceiverDirection::SendRecv | TransceiverDirection::SendOnly => true,
2051            _ => false,
2052        };
2053
2054        let old_receives = match old_direction {
2055            TransceiverDirection::SendRecv | TransceiverDirection::RecvOnly => true,
2056            _ => false,
2057        };
2058        let new_receives = match new_direction {
2059            TransceiverDirection::SendRecv | TransceiverDirection::RecvOnly => true,
2060            _ => false,
2061        };
2062
2063        // Handle send direction changes
2064        if old_sends != new_sends {
2065            if new_sends {
2066                debug!("Transceiver {} starting to send", transceiver.id());
2067                // Resume sender if available
2068                if let Some(sender) = transceiver.sender() {
2069                    // In full implementation: sender.resume()
2070                    trace!("Sender {} would resume", sender.ssrc());
2071                }
2072            } else {
2073                debug!("Transceiver {} stopping send", transceiver.id());
2074                // Pause sender if available
2075                if let Some(sender) = transceiver.sender() {
2076                    // In full implementation: sender.pause()
2077                    trace!("Sender {} would pause", sender.ssrc());
2078                }
2079            }
2080        }
2081
2082        // Handle receive direction changes
2083        if old_receives != new_receives {
2084            if new_receives {
2085                debug!("Transceiver {} starting to receive", transceiver.id());
2086                // In full implementation: activate receiver
2087            } else {
2088                debug!("Transceiver {} stopping receive", transceiver.id());
2089                // In full implementation: deactivate receiver or discard packets
2090            }
2091        }
2092
2093        Ok(())
2094    }
2095}
2096
2097async fn run_gathering_loop(
2098    ice_transport: IceTransport,
2099    ice_gathering_state_tx: watch::Sender<IceGatheringState>,
2100    inner_weak: std::sync::Weak<PeerConnectionInner>,
2101) {
2102    let mut rx = ice_transport.subscribe_gathering_state();
2103    let mut ice_state_rx = ice_transport.subscribe_state();
2104    loop {
2105        let state = *rx.borrow_and_update();
2106        if state == crate::transports::ice::IceGathererState::Complete {
2107            if let Some(inner) = inner_weak.upgrade() {
2108                let update_local_description = || {
2109                    if inner.config.transport_mode == TransportMode::WebRtc {
2110                        let candidates = ice_transport.local_candidates();
2111                        let candidate_strs: Vec<String> =
2112                            candidates.iter().map(|c| c.to_sdp()).collect();
2113
2114                        let mut local_guard = inner.local_description.lock().unwrap();
2115                        if let Some(desc) = local_guard.as_mut() {
2116                            desc.add_candidates(&candidate_strs);
2117                        }
2118                        true
2119                    } else {
2120                        let candidates = ice_transport.local_candidates();
2121                        if let Some(candidate) = candidates.first() {
2122                            let mut local_guard = inner.local_description.lock().unwrap();
2123                            if let Some(desc) = local_guard.as_mut() {
2124                                for media in &mut desc.media_sections {
2125                                    media.port = candidate.address.port();
2126                                    let ip_str = candidate.address.ip().to_string();
2127                                    let ip_ver = if candidate.address.is_ipv4() {
2128                                        "IP4"
2129                                    } else {
2130                                        "IP6"
2131                                    };
2132                                    media.connection = Some(format!("IN {} {}", ip_ver, ip_str));
2133                                }
2134                            }
2135                        }
2136                        true
2137                    }
2138                };
2139
2140                if !update_local_description() {
2141                    let mut sig_rx = inner.signaling_state.subscribe();
2142                    loop {
2143                        if update_local_description() {
2144                            break;
2145                        }
2146                        if sig_rx.changed().await.is_err() {
2147                            break;
2148                        }
2149                    }
2150                }
2151            }
2152        }
2153
2154        let pc_state = match state {
2155            crate::transports::ice::IceGathererState::New => IceGatheringState::New,
2156            crate::transports::ice::IceGathererState::Gathering => IceGatheringState::Gathering,
2157            crate::transports::ice::IceGathererState::Complete => IceGatheringState::Complete,
2158        };
2159
2160        if ice_gathering_state_tx.send(pc_state).is_err() {
2161            break;
2162        }
2163        if state == crate::transports::ice::IceGathererState::Complete {
2164            break;
2165        }
2166        tokio::select! {
2167            res = rx.changed() => {
2168                if res.is_err() { break; }
2169            }
2170            res = ice_state_rx.changed() => {
2171                if res.is_err() { break; }
2172                if *ice_state_rx.borrow() == crate::transports::ice::IceTransportState::Closed {
2173                    break;
2174                }
2175            }
2176        }
2177    }
2178}
2179
2180async fn run_ice_dtls_loop(
2181    ice_transport: IceTransport,
2182    ice_connection_state_tx: watch::Sender<IceConnectionState>,
2183    mut dtls_role_rx: watch::Receiver<Option<bool>>,
2184    inner_weak: std::sync::Weak<PeerConnectionInner>,
2185) {
2186    let mut ice_state_rx = ice_transport.subscribe_state();
2187    loop {
2188        let ice_state = *ice_state_rx.borrow_and_update();
2189
2190        let pc_ice_state = match ice_state {
2191            crate::transports::ice::IceTransportState::New => IceConnectionState::New,
2192            crate::transports::ice::IceTransportState::Checking => IceConnectionState::Checking,
2193            crate::transports::ice::IceTransportState::Connected => IceConnectionState::Connected,
2194            crate::transports::ice::IceTransportState::Completed => IceConnectionState::Completed,
2195            crate::transports::ice::IceTransportState::Failed => IceConnectionState::Failed,
2196            crate::transports::ice::IceTransportState::Disconnected => {
2197                IceConnectionState::Disconnected
2198            }
2199            crate::transports::ice::IceTransportState::Closed => IceConnectionState::Closed,
2200        };
2201        let _ = ice_connection_state_tx.send(pc_ice_state);
2202        match ice_state {
2203            crate::transports::ice::IceTransportState::Connected
2204            | crate::transports::ice::IceTransportState::Completed => {
2205                // For RTP/SRTP mode, we don't need DTLS role to start
2206                let transport_mode = if let Some(inner) = inner_weak.upgrade() {
2207                    inner.config.transport_mode.clone()
2208                } else {
2209                    return;
2210                };
2211
2212                if transport_mode != TransportMode::WebRtc {
2213                    if !handle_connected_state_no_dtls(&inner_weak, &mut ice_state_rx).await {
2214                        return;
2215                    }
2216                    continue;
2217                }
2218
2219                if !handle_connected_state(
2220                    &inner_weak,
2221                    &ice_connection_state_tx,
2222                    &mut dtls_role_rx,
2223                    &mut ice_state_rx,
2224                )
2225                .await
2226                {
2227                    return;
2228                }
2229                continue;
2230            }
2231            crate::transports::ice::IceTransportState::Failed => {
2232                if let Some(inner) = inner_weak.upgrade() {
2233                    let _ = inner.peer_state.send(PeerConnectionState::Failed);
2234                }
2235                return;
2236            }
2237            crate::transports::ice::IceTransportState::Closed => {
2238                if let Some(inner) = inner_weak.upgrade() {
2239                    let _ = inner.peer_state.send(PeerConnectionState::Closed);
2240                }
2241                return;
2242            }
2243            _ => {}
2244        }
2245
2246        if ice_state_rx.changed().await.is_err() {
2247            return;
2248        }
2249    }
2250}
2251
2252async fn handle_connected_state_no_dtls(
2253    inner_weak: &std::sync::Weak<PeerConnectionInner>,
2254    ice_state_rx: &mut watch::Receiver<crate::transports::ice::IceTransportState>,
2255) -> bool {
2256    if let Some(inner) = inner_weak.upgrade() {
2257        let pc_temp = PeerConnection {
2258            inner: inner.clone(),
2259        };
2260        // For RTP/SRTP, we pass false as is_client, but it doesn't matter as start_dtls handles it
2261        match pc_temp.start_dtls(false).await {
2262            Err(e) => {
2263                debug!("Transport start failed: {}", e);
2264                let _ = inner.peer_state.send(PeerConnectionState::Failed);
2265                return false;
2266            }
2267            Ok(mut rtcp_loop) => {
2268                let _ = inner.peer_state.send(PeerConnectionState::Connected);
2269                loop {
2270                    tokio::select! {
2271                        _ = &mut rtcp_loop => {
2272                            break;
2273                        }
2274                        res = ice_state_rx.changed() => {
2275                            if res.is_err() { return false; }
2276                            let new_state = *ice_state_rx.borrow();
2277                            if is_ice_disconnected(new_state) {
2278                                return true;
2279                            }
2280                        }
2281                    }
2282                }
2283            }
2284        }
2285    }
2286    false
2287}
2288
2289async fn handle_connected_state(
2290    inner_weak: &std::sync::Weak<PeerConnectionInner>,
2291    ice_connection_state_tx: &watch::Sender<IceConnectionState>,
2292    dtls_role_rx: &mut watch::Receiver<Option<bool>>,
2293    ice_state_rx: &mut watch::Receiver<crate::transports::ice::IceTransportState>,
2294) -> bool {
2295    loop {
2296        let role = *dtls_role_rx.borrow_and_update();
2297        if let Some(is_client) = role {
2298            if let Some(inner) = inner_weak.upgrade() {
2299                let pc_temp = PeerConnection {
2300                    inner: inner.clone(),
2301                };
2302
2303                match pc_temp.start_dtls(is_client).await {
2304                    Err(e) => {
2305                        debug!("DTLS start failed: {}", e);
2306                        let _ = inner.peer_state.send(PeerConnectionState::Failed);
2307                        return false;
2308                    }
2309                    Ok(mut rtcp_loop) => {
2310                        let _ = inner.peer_state.send(PeerConnectionState::Connected);
2311
2312                        let dtls_state_rx = {
2313                            let dtls_guard = inner.dtls_transport.lock().unwrap();
2314                            if let Some(dtls) = &*dtls_guard {
2315                                Some(dtls.subscribe_state())
2316                            } else {
2317                                None
2318                            }
2319                        };
2320
2321                        if let Some(mut dtls_rx) = dtls_state_rx {
2322                            loop {
2323                                tokio::select! {
2324                                    _ = &mut rtcp_loop => {
2325                                        break;
2326                                    }
2327                                    res = ice_state_rx.changed() => {
2328                                        if res.is_err() { return false; }
2329                                        let new_state = *ice_state_rx.borrow();
2330                                        if is_ice_disconnected(new_state) {
2331                                            return true;
2332                                        }
2333                                    }
2334                                    res = dtls_rx.changed() => {
2335                                        if res.is_ok() {
2336                                            let state = dtls_rx.borrow().clone();
2337                                            if state == crate::transports::dtls::DtlsState::Closed || state == crate::transports::dtls::DtlsState::Failed {
2338                                                debug!("DTLS closed/failed, disconnecting PC");
2339                                                let _ = inner.peer_state.send(PeerConnectionState::Disconnected);
2340                                                let _ = ice_connection_state_tx.send(IceConnectionState::Disconnected);
2341                                                return false;
2342                                            }
2343                                        } else {
2344                                            break;
2345                                        }
2346                                    }
2347                                }
2348                            }
2349                        } else {
2350                            loop {
2351                                tokio::select! {
2352                                    _ = &mut rtcp_loop => {
2353                                        break;
2354                                    }
2355                                    res = ice_state_rx.changed() => {
2356                                        if res.is_err() { return false; }
2357                                        let new_state = *ice_state_rx.borrow();
2358                                        if is_ice_disconnected(new_state) {
2359                                            return true;
2360                                        }
2361                                    }
2362                                }
2363                            }
2364                        }
2365                    }
2366                }
2367            }
2368
2369            let state = *ice_state_rx.borrow();
2370            if is_ice_disconnected(state) {
2371                return true;
2372            }
2373            return false;
2374        }
2375
2376        tokio::select! {
2377            res = dtls_role_rx.changed() => {
2378                if res.is_err() { return false; }
2379            }
2380            res = ice_state_rx.changed() => {
2381                if res.is_err() { return false; }
2382                let new_state = *ice_state_rx.borrow();
2383                if is_ice_disconnected(new_state) {
2384                    return true;
2385                }
2386            }
2387        }
2388    }
2389}
2390
2391fn is_ice_disconnected(state: crate::transports::ice::IceTransportState) -> bool {
2392    matches!(
2393        state,
2394        crate::transports::ice::IceTransportState::Failed
2395            | crate::transports::ice::IceTransportState::Closed
2396            | crate::transports::ice::IceTransportState::Disconnected
2397    )
2398}
2399
2400impl PeerConnectionInner {
2401    async fn build_description<F>(
2402        &self,
2403        sdp_type: SdpType,
2404        map_direction: F,
2405    ) -> RtcResult<SessionDescription>
2406    where
2407        F: Fn(TransceiverDirection) -> TransceiverDirection,
2408    {
2409        let transceivers = {
2410            let list = self.transceivers.lock().unwrap();
2411            list.iter().cloned().collect::<Vec<_>>()
2412        };
2413        if transceivers.is_empty() {
2414            return Err(RtcError::InvalidState(
2415                "cannot build SDP with no transceivers".into(),
2416            ));
2417        }
2418
2419        let mut remote_offered_bundle = false;
2420
2421        let ordered_transceivers = if sdp_type == SdpType::Answer {
2422            let remote_guard = self.remote_description.lock().unwrap();
2423            let remote = remote_guard.as_ref().ok_or_else(|| {
2424                RtcError::InvalidState("create_answer called without remote description".into())
2425            })?;
2426
2427            for attr in &remote.session.attributes {
2428                if attr.key == "group"
2429                    && let Some(val) = &attr.value
2430                    && val.starts_with("BUNDLE")
2431                {
2432                    remote_offered_bundle = true;
2433                }
2434            }
2435
2436            let mut ordered = Vec::new();
2437            for section in &remote.media_sections {
2438                let mid = &section.mid;
2439                let mut found = None;
2440                for t in &transceivers {
2441                    if let Some(t_mid) = t.mid()
2442                        && t_mid == *mid
2443                    {
2444                        found = Some(t.clone());
2445                        break;
2446                    }
2447                }
2448                if let Some(t) = found {
2449                    ordered.push(t);
2450                } else {
2451                    return Err(RtcError::Internal(format!(
2452                        "No transceiver found for mid {} in answer generation",
2453                        mid
2454                    )));
2455                }
2456            }
2457            ordered
2458        } else {
2459            // For Offer, we must ensure MIDs and sort by them to maintain m-line stability
2460            // This handles cases where transceivers were added out-of-order relative to their
2461            // assigned MIDs (e.g. reused from previous negotiations)
2462            for t in &transceivers {
2463                self.ensure_mid(t);
2464            }
2465
2466            let mut ordered = transceivers.clone();
2467            ordered.sort_by(|a, b| {
2468                let mid_a = a.mid().unwrap_or_default();
2469                let mid_b = b.mid().unwrap_or_default();
2470
2471                // Try to sort numerically if possible ("0", "1", "10")
2472                // otherwise lexicographically ("0", "1", "a")
2473                match (mid_a.parse::<u64>(), mid_b.parse::<u64>()) {
2474                    (Ok(na), Ok(nb)) => na.cmp(&nb),
2475                    _ => mid_a.cmp(&mid_b),
2476                }
2477            });
2478            ordered
2479        };
2480
2481        self.ice_transport
2482            .start_gathering()
2483            .map_err(|err| RtcError::InvalidState(format!("ICE gathering failed: {err}")))?;
2484
2485        let mode = self.config.transport_mode.clone();
2486
2487        // For non-WebRTC, wait for at least one candidate if none are available.
2488        // This ensures the SDP doesn't default to port 9 when no candidates are gathered yet.
2489        if mode != TransportMode::WebRtc {
2490            let mut candidates = self.ice_transport.local_candidates();
2491            if candidates.is_empty() {
2492                let mut rx = self.ice_transport.subscribe_candidates();
2493                let start = tokio::time::Instant::now();
2494                let timeout_dur = tokio::time::Duration::from_millis(500);
2495
2496                while candidates.is_empty() && start.elapsed() < timeout_dur {
2497                    let _ = tokio::time::timeout(timeout_dur - start.elapsed(), rx.recv()).await;
2498                    candidates = self.ice_transport.local_candidates();
2499                }
2500            }
2501        }
2502
2503        let ice_params = self.ice_transport.local_parameters();
2504        let ice_username = ice_params.username_fragment.clone();
2505        let ice_password = ice_params.password.clone();
2506        let candidate_lines: Vec<String> = self
2507            .ice_transport
2508            .local_candidates()
2509            .iter()
2510            .map(IceCandidate::to_sdp)
2511            .collect();
2512        let gather_complete = matches!(
2513            self.ice_transport.gather_state(),
2514            IceGathererState::Complete
2515        );
2516        let mut desc = SessionDescription::new(sdp_type);
2517        desc.session.origin = default_origin();
2518        if let Some(ext_ip) = &self.config.external_ip {
2519            desc.session.origin.unicast_address = ext_ip.clone();
2520        }
2521        desc.session.origin.session_version += 1;
2522        if !desc
2523            .session
2524            .attributes
2525            .iter()
2526            .any(|attr| attr.key == "msid-semantic")
2527            && self.config.transport_mode == TransportMode::WebRtc
2528        {
2529            desc.session
2530                .attributes
2531                .push(Attribute::new("msid-semantic", Some("WMS *".into())));
2532        }
2533
2534        let mode = self.config.transport_mode.clone();
2535
2536        if mode == TransportMode::Rtp || mode == TransportMode::Srtp {
2537            let local_ip = if let Some(ext_ip) = &self.config.external_ip {
2538                ext_ip
2539                    .parse()
2540                    .unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
2541            } else {
2542                get_local_ip().unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
2543            };
2544            if desc.session.connection.is_none() {
2545                desc.session.connection = Some(format!("IN IP4 {}", local_ip));
2546            }
2547        }
2548
2549        for transceiver in ordered_transceivers.into_iter() {
2550            let mid = self.ensure_mid(&transceiver);
2551            let mut direction = map_direction(transceiver.direction());
2552            let sender_info = if direction.sends() {
2553                transceiver.sender.lock().unwrap().clone()
2554            } else {
2555                None
2556            };
2557
2558            // Check if remote side expects us to send (for B2BUA scenarios)
2559            let remote_expects_media = if sdp_type == SdpType::Answer {
2560                let remote_guard = self.remote_description.lock().unwrap();
2561                if let Some(remote) = remote_guard.as_ref() {
2562                    // Find the matching remote section by mid
2563                    remote
2564                        .media_sections
2565                        .iter()
2566                        .find(|section| section.mid == mid)
2567                        .map(|section| {
2568                            // Remote expects media if their direction is sendrecv or sendonly
2569                            matches!(
2570                                section.direction,
2571                                crate::sdp::Direction::SendRecv | crate::sdp::Direction::SendOnly
2572                            )
2573                        })
2574                        .unwrap_or(false)
2575                } else {
2576                    false
2577                }
2578            } else {
2579                false
2580            };
2581
2582            // If we are supposed to send, but have no sender (and it's not Application),
2583            // we must downgrade direction to avoid ghost tracks.
2584            let has_sender_ssrc = transceiver.sender_ssrc.lock().unwrap().is_some();
2585            if direction.sends()
2586                && sender_info.is_none()
2587                && !has_sender_ssrc
2588                && transceiver.kind() != MediaKind::Application
2589                && !remote_expects_media
2590            {
2591                direction = match direction {
2592                    TransceiverDirection::SendRecv => TransceiverDirection::RecvOnly,
2593                    TransceiverDirection::SendOnly => TransceiverDirection::Inactive,
2594                    _ => direction,
2595                };
2596            }
2597
2598            let mut section = MediaSection::new(transceiver.kind(), mid);
2599            section.direction = direction.into();
2600
2601            if mode == TransportMode::Rtp {
2602                section.protocol = "RTP/AVP".to_string();
2603            }
2604
2605            if mode == TransportMode::WebRtc {
2606                section.connection = Some("IN IP4 0.0.0.0".to_string());
2607                section
2608                    .attributes
2609                    .push(Attribute::new("ice-ufrag", Some(ice_username.clone())));
2610                section
2611                    .attributes
2612                    .push(Attribute::new("ice-pwd", Some(ice_password.clone())));
2613                section
2614                    .attributes
2615                    .push(Attribute::new("ice-options", Some("trickle".into())));
2616                for candidate in &candidate_lines {
2617                    section
2618                        .attributes
2619                        .push(Attribute::new("candidate", Some(candidate.clone())));
2620                }
2621                if gather_complete {
2622                    section
2623                        .attributes
2624                        .push(Attribute::new("end-of-candidates", None));
2625                }
2626            } else {
2627                // For RTP/SRTP, use the first candidate's address for c= and m= port
2628                // Prefer non-loopback candidates
2629                let candidates = self.ice_transport.local_candidates();
2630                if let Some(cand) = candidates
2631                    .iter()
2632                    .find(|c| !c.address.ip().is_loopback())
2633                    .or(candidates.first())
2634                {
2635                    section.port = cand.address.port();
2636                    let conn = format!("IN IP4 {}", cand.address.ip());
2637                    if Some(&conn) != desc.session.connection.as_ref() {
2638                        section.connection = Some(conn);
2639                    }
2640                }
2641            }
2642
2643            self.populate_media_capabilities(&mut section, transceiver.kind(), sdp_type);
2644            if let Some(sender) = sender_info {
2645                Self::attach_sender_attributes(
2646                    &mut section,
2647                    sender.ssrc(),
2648                    sender.cname(),
2649                    sender.stream_id(),
2650                    sender.track_id(),
2651                    &mode,
2652                );
2653            } else if direction.sends() {
2654                if let Some(ssrc) = *transceiver.sender_ssrc.lock().unwrap() {
2655                    let cname = format!("rustrtc-cname-{ssrc}");
2656                    let stream_id = transceiver
2657                        .sender_stream_id
2658                        .lock()
2659                        .unwrap()
2660                        .clone()
2661                        .unwrap_or_else(|| "default".to_string());
2662                    let track_id = transceiver
2663                        .sender_track_id
2664                        .lock()
2665                        .unwrap()
2666                        .clone()
2667                        .unwrap_or_else(|| format!("track-{}", transceiver.id()));
2668                    Self::attach_sender_attributes(
2669                        &mut section,
2670                        ssrc,
2671                        &cname,
2672                        &stream_id,
2673                        &track_id,
2674                        &mode,
2675                    );
2676                }
2677            }
2678
2679            if self.config.transport_mode == TransportMode::Srtp {
2680                let mut suite = "AES_CM_128_HMAC_SHA1_80".to_string();
2681                if sdp_type == SdpType::Answer {
2682                    let remote_desc = self.remote_description.lock().unwrap();
2683                    if let Some(remote) = &*remote_desc {
2684                        if let Some(c) = remote
2685                            .media_sections
2686                            .iter()
2687                            .flat_map(|m| m.get_crypto_attributes())
2688                            .find(|c| map_crypto_suite(&c.crypto_suite).is_ok())
2689                        {
2690                            suite = c.crypto_suite.clone();
2691                        }
2692                    }
2693                }
2694
2695                let key_params = generate_sdes_key_params();
2696                let crypto_val = format!("1 {} {}|2^31|1:1", suite, key_params);
2697                section
2698                    .attributes
2699                    .push(Attribute::new("crypto", Some(crypto_val)));
2700            }
2701
2702            desc.media_sections.push(section);
2703        }
2704
2705        if !desc.media_sections.is_empty() {
2706            let should_bundle = match sdp_type {
2707                SdpType::Offer => true,
2708                SdpType::Answer => remote_offered_bundle,
2709                _ => false,
2710            };
2711
2712            let should_bundle = should_bundle && desc.media_sections.len() > 1;
2713
2714            if should_bundle {
2715                let mids: Vec<String> = desc.media_sections.iter().map(|m| m.mid.clone()).collect();
2716                let value = format!("BUNDLE {}", mids.join(" "));
2717                desc.session
2718                    .attributes
2719                    .push(Attribute::new("group", Some(value)));
2720            }
2721        }
2722
2723        Ok(desc)
2724    }
2725
2726    fn attach_sender_attributes(
2727        section: &mut MediaSection,
2728        ssrc: u32,
2729        cname: &str,
2730        stream_id: &str,
2731        track_id: &str,
2732        mode: &TransportMode,
2733    ) {
2734        if *mode == TransportMode::WebRtc {
2735            section.attributes.push(Attribute::new(
2736                "msid",
2737                Some(format!("{} {}", stream_id, track_id)),
2738            ));
2739        }
2740
2741        section.attributes.push(Attribute::new(
2742            "ssrc",
2743            Some(format!("{} cname:{}", ssrc, cname)),
2744        ));
2745
2746        if *mode == TransportMode::WebRtc {
2747            section.attributes.push(Attribute::new(
2748                "ssrc",
2749                Some(format!("{} msid:{} {}", ssrc, stream_id, track_id)),
2750            ));
2751        }
2752    }
2753
2754    fn ensure_mid(&self, transceiver: &Arc<RtpTransceiver>) -> String {
2755        if let Some(mid) = transceiver.mid() {
2756            return mid;
2757        }
2758        let mid_value = self.allocate_mid();
2759        trace!(
2760            "Allocated MID: {} for transceiver kind={:?}",
2761            mid_value,
2762            transceiver.kind()
2763        );
2764        transceiver.set_mid(mid_value.clone());
2765        mid_value
2766    }
2767
2768    fn allocate_mid(&self) -> String {
2769        let mid = self.next_mid.fetch_add(1, Ordering::SeqCst);
2770        mid.to_string()
2771    }
2772
2773    fn validate_sdp_type(&self, sdp_type: &SdpType) -> RtcResult<()> {
2774        match sdp_type {
2775            SdpType::Offer | SdpType::Answer => Ok(()),
2776            _ => Err(RtcError::NotImplemented("pranswer/rollback")),
2777        }
2778    }
2779
2780    fn populate_media_capabilities(
2781        &self,
2782        section: &mut MediaSection,
2783        kind: MediaKind,
2784        sdp_type: SdpType,
2785    ) {
2786        section.apply_config(&self.config);
2787
2788        // Add extmap for Video
2789        if kind == MediaKind::Video {
2790            let (mut rid_id, mut repaired_rid_id) = self.get_remote_video_extmap_ids(&section.mid);
2791
2792            if sdp_type == SdpType::Offer && self.config.transport_mode != TransportMode::Rtp {
2793                // If not found in remote (new transceiver), use defaults
2794                if rid_id.is_none() {
2795                    rid_id = Some("1".to_string());
2796                }
2797                if repaired_rid_id.is_none() {
2798                    repaired_rid_id = Some("2".to_string());
2799                }
2800            }
2801
2802            section.add_video_extmaps(rid_id, repaired_rid_id);
2803        }
2804
2805        // Add abs-send-time extmap
2806        let mut abs_send_time_id =
2807            self.get_remote_extmap_id(&section.mid, crate::sdp::ABS_SEND_TIME_URI);
2808        if sdp_type == SdpType::Offer
2809            && abs_send_time_id.is_none()
2810            && self.config.transport_mode != TransportMode::Rtp
2811        {
2812            abs_send_time_id = Some("3".to_string()); // Default ID for abs-send-time
2813        }
2814        if let Some(id) = abs_send_time_id {
2815            section.attributes.push(crate::sdp::Attribute::new(
2816                "extmap",
2817                Some(format!("{} {}", id, crate::sdp::ABS_SEND_TIME_URI)),
2818            ));
2819        }
2820
2821        if self.config.transport_mode != TransportMode::Rtp {
2822            let setup_value = match sdp_type {
2823                SdpType::Offer => "actpass",
2824                SdpType::Answer => {
2825                    let role = *self.dtls_role.borrow();
2826                    match role {
2827                        Some(true) => "active",
2828                        Some(false) => "passive",
2829                        None => "active",
2830                    }
2831                }
2832                _ => "actpass",
2833            };
2834            section.add_dtls_attributes(&self.dtls_fingerprint, setup_value);
2835        }
2836    }
2837
2838    fn get_remote_video_extmap_ids(&self, mid: &str) -> (Option<String>, Option<String>) {
2839        let rid_id =
2840            self.get_remote_extmap_id(mid, "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id");
2841        let repaired_rid_id = self.get_remote_extmap_id(
2842            mid,
2843            "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
2844        );
2845        (rid_id, repaired_rid_id)
2846    }
2847
2848    fn get_remote_extmap_id(&self, mid: &str, uri: &str) -> Option<String> {
2849        let remote = self.remote_description.lock().unwrap();
2850        if let Some(desc) = &*remote {
2851            let remote_section = desc.media_sections.iter().find(|s| s.mid == mid)?;
2852            for attr in &remote_section.attributes {
2853                if attr.key != "extmap" {
2854                    continue;
2855                }
2856                let val = attr.value.as_ref()?;
2857                if val.contains(uri) {
2858                    if let Some(id_str) = val.split_whitespace().next() {
2859                        return Some(id_str.to_string());
2860                    }
2861                }
2862            }
2863        }
2864        None
2865    }
2866
2867    fn close(&self) {
2868        if *self.peer_state.borrow() == PeerConnectionState::Closed {
2869            return;
2870        }
2871        let _ = self.signaling_state.send(SignalingState::Closed);
2872        let _ = self.peer_state.send(PeerConnectionState::Closed);
2873        let _ = self.ice_connection_state.send(IceConnectionState::Closed);
2874        let _ = self.ice_gathering_state.send(IceGatheringState::Complete);
2875
2876        // Clean up all tracks to prevent audio bleeding into new connections
2877        {
2878            let transceivers = self.transceivers.lock().unwrap();
2879            for t in transceivers.iter() {
2880                // Stop receiver tracks by marking them as ended
2881                if let Some(receiver) = t.receiver() {
2882                    let track = receiver.track();
2883                    track.stop();
2884                    tracing::debug!(
2885                        "PeerConnection.close: marked receiver track {} as ended",
2886                        track.id()
2887                    );
2888                }
2889            }
2890        }
2891
2892        // Clear RTP transport listeners to stop receiving packets
2893        let rtp_transport = self.rtp_transport.lock().unwrap().clone();
2894        if let Some(transport) = rtp_transport.as_ref() {
2895            let count = transport.clear_listeners();
2896            if count > 0 {
2897                tracing::debug!("PeerConnection.close: cleared {} listeners", count);
2898            }
2899
2900            // Send RTCP BYE
2901            let transceivers = self.transceivers.lock().unwrap();
2902            let mut ssrcs = Vec::new();
2903            for t in transceivers.iter() {
2904                if let Some(sender) = t.sender() {
2905                    ssrcs.push(sender.ssrc());
2906                }
2907            }
2908            if !ssrcs.is_empty() {
2909                let bye = crate::rtp::RtcpPacket::Goodbye(crate::rtp::Goodbye {
2910                    sources: ssrcs,
2911                    reason: Some("PeerConnection closed".to_string()),
2912                });
2913                let transport_clone = transport.clone();
2914                tokio::spawn(async move {
2915                    let _ = transport_clone.send_rtcp(&[bye]).await;
2916                });
2917            }
2918        }
2919
2920        if let Some(dtls) = self.dtls_transport.lock().unwrap().as_ref() {
2921            dtls.close();
2922        }
2923
2924        self.ice_transport.stop();
2925    }
2926}
2927
2928impl Drop for PeerConnectionInner {
2929    fn drop(&mut self) {
2930        debug!("PeerConnectionInner dropped, stopping ICE transport");
2931        self.close();
2932    }
2933}
2934
2935fn default_origin() -> Origin {
2936    let mut origin = Origin::default();
2937    let now = SystemTime::now()
2938        .duration_since(UNIX_EPOCH)
2939        .unwrap_or_default()
2940        .as_secs();
2941    origin.session_id = now;
2942    origin.session_version = now;
2943    if let Ok(ip) = get_local_ip() {
2944        origin.unicast_address = ip.to_string();
2945    }
2946    origin
2947}
2948
2949#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2950pub enum PeerConnectionState {
2951    New,
2952    Connecting,
2953    Connected,
2954    Disconnected,
2955    Failed,
2956    Closed,
2957}
2958
2959#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2960pub enum SignalingState {
2961    Stable,
2962    HaveLocalOffer,
2963    HaveRemoteOffer,
2964    Closed,
2965}
2966
2967#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2968pub enum IceConnectionState {
2969    New,
2970    Checking,
2971    Connected,
2972    Completed,
2973    Failed,
2974    Disconnected,
2975    Closed,
2976}
2977
2978#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2979pub enum IceGatheringState {
2980    New,
2981    Gathering,
2982    Complete,
2983}
2984
2985#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2986pub enum TransceiverDirection {
2987    #[default]
2988    SendRecv,
2989    SendOnly,
2990    RecvOnly,
2991    Inactive,
2992}
2993
2994impl TransceiverDirection {
2995    pub fn answer_direction(self) -> Self {
2996        match self {
2997            TransceiverDirection::SendRecv => TransceiverDirection::SendRecv,
2998            TransceiverDirection::SendOnly => TransceiverDirection::RecvOnly,
2999            TransceiverDirection::RecvOnly => TransceiverDirection::SendOnly,
3000            TransceiverDirection::Inactive => TransceiverDirection::Inactive,
3001        }
3002    }
3003
3004    pub fn sends(self) -> bool {
3005        matches!(
3006            self,
3007            TransceiverDirection::SendRecv | TransceiverDirection::SendOnly
3008        )
3009    }
3010}
3011
3012impl From<TransceiverDirection> for Direction {
3013    fn from(value: TransceiverDirection) -> Self {
3014        match value {
3015            TransceiverDirection::SendRecv => Direction::SendRecv,
3016            TransceiverDirection::SendOnly => Direction::SendOnly,
3017            TransceiverDirection::RecvOnly => Direction::RecvOnly,
3018            TransceiverDirection::Inactive => Direction::Inactive,
3019        }
3020    }
3021}
3022
3023impl From<Direction> for TransceiverDirection {
3024    fn from(value: Direction) -> Self {
3025        match value {
3026            Direction::SendRecv => TransceiverDirection::SendRecv,
3027            Direction::SendOnly => TransceiverDirection::SendOnly,
3028            Direction::RecvOnly => TransceiverDirection::RecvOnly,
3029            Direction::Inactive => TransceiverDirection::Inactive,
3030        }
3031    }
3032}
3033
3034static TRANSCEIVER_COUNTER: AtomicU64 = AtomicU64::new(1);
3035
3036#[derive(Debug, Clone, PartialEq)]
3037pub struct RtpCodecParameters {
3038    pub payload_type: u8,
3039    pub clock_rate: u32,
3040    pub channels: u8,
3041}
3042
3043impl Default for RtpCodecParameters {
3044    fn default() -> Self {
3045        Self {
3046            payload_type: 96,
3047            clock_rate: 90000,
3048            channels: 0,
3049        }
3050    }
3051}
3052
3053pub struct RtpTransceiver {
3054    id: u64,
3055    kind: MediaKind,
3056    direction: Mutex<TransceiverDirection>,
3057    mid: Mutex<Option<String>>,
3058    sender: Mutex<Option<Arc<RtpSender>>>,
3059    receiver: Mutex<Option<Arc<RtpReceiver>>>,
3060    rtp_transport: Mutex<Option<Weak<RtpTransport>>>,
3061    sender_ssrc: Mutex<Option<u32>>,
3062    sender_stream_id: Mutex<Option<String>>,
3063    sender_track_id: Mutex<Option<String>>,
3064    payload_map: Arc<std::sync::RwLock<HashMap<u8, RtpCodecParameters>>>,
3065    extmap: Arc<std::sync::RwLock<HashMap<u8, String>>>,
3066}
3067
3068impl RtpTransceiver {
3069    fn new(kind: MediaKind, direction: TransceiverDirection) -> Self {
3070        Self {
3071            id: TRANSCEIVER_COUNTER.fetch_add(1, Ordering::Relaxed),
3072            kind,
3073            direction: Mutex::new(direction),
3074            mid: Mutex::new(None),
3075            sender: Mutex::new(None),
3076            receiver: Mutex::new(None),
3077            rtp_transport: Mutex::new(None),
3078            sender_ssrc: Mutex::new(None),
3079            sender_stream_id: Mutex::new(None),
3080            sender_track_id: Mutex::new(None),
3081            payload_map: Arc::new(std::sync::RwLock::new(HashMap::new())),
3082            extmap: Arc::new(std::sync::RwLock::new(HashMap::new())),
3083        }
3084    }
3085
3086    /// Create transceiver for testing purposes
3087    #[doc(hidden)]
3088    pub fn new_for_test(kind: MediaKind, direction: TransceiverDirection) -> Self {
3089        Self::new(kind, direction)
3090    }
3091
3092    pub fn id(&self) -> u64 {
3093        self.id
3094    }
3095
3096    pub fn kind(&self) -> MediaKind {
3097        self.kind
3098    }
3099
3100    pub fn sender_ssrc(&self) -> Option<u32> {
3101        *self.sender_ssrc.lock().unwrap()
3102    }
3103
3104    pub fn sender_stream_id(&self) -> Option<String> {
3105        self.sender_stream_id.lock().unwrap().clone()
3106    }
3107
3108    pub fn sender_track_id(&self) -> Option<String> {
3109        self.sender_track_id.lock().unwrap().clone()
3110    }
3111
3112    pub fn direction(&self) -> TransceiverDirection {
3113        *self.direction.lock().unwrap()
3114    }
3115
3116    pub fn set_direction(&self, direction: TransceiverDirection) {
3117        *self.direction.lock().unwrap() = direction;
3118    }
3119
3120    pub fn mid(&self) -> Option<String> {
3121        self.mid.lock().unwrap().clone()
3122    }
3123
3124    fn set_mid(&self, mid: String) {
3125        *self.mid.lock().unwrap() = Some(mid);
3126    }
3127
3128    pub fn sender(&self) -> Option<Arc<RtpSender>> {
3129        self.sender.lock().unwrap().clone()
3130    }
3131
3132    pub fn set_sender(&self, sender: Option<Arc<RtpSender>>) {
3133        if let Some(ref s) = sender {
3134            // If transport is already established, connect the sender to it
3135            if let Some(weak_transport) = self.rtp_transport.lock().unwrap().as_ref() {
3136                if let Some(transport) = weak_transport.upgrade() {
3137                    debug!(
3138                        "set_sender: connecting late sender ssrc={} to existing transport",
3139                        s.ssrc()
3140                    );
3141                    s.set_transport(transport);
3142                }
3143            }
3144            // Sync pre-allocated fields
3145            *self.sender_ssrc.lock().unwrap() = Some(s.ssrc());
3146            *self.sender_stream_id.lock().unwrap() = Some(s.stream_id().to_string());
3147            *self.sender_track_id.lock().unwrap() = Some(s.track_id().to_string());
3148        }
3149        *self.sender.lock().unwrap() = sender;
3150    }
3151
3152    /// Set the RTP transport reference. Called by start_dtls when transport is established.
3153    pub fn set_rtp_transport(&self, transport: Weak<RtpTransport>) {
3154        *self.rtp_transport.lock().unwrap() = Some(transport);
3155    }
3156
3157    pub fn receiver(&self) -> Option<Arc<RtpReceiver>> {
3158        self.receiver.lock().unwrap().clone()
3159    }
3160
3161    pub fn set_receiver(&self, receiver: Option<Arc<RtpReceiver>>) {
3162        *self.receiver.lock().unwrap() = receiver;
3163    }
3164
3165    /// Update payload type mapping for reinvite scenarios
3166    pub fn update_payload_map(&self, new_map: HashMap<u8, RtpCodecParameters>) -> RtcResult<()> {
3167        let mut payload_map = self.payload_map.write().unwrap();
3168
3169        // Log changes for debugging
3170        for (pt, codec) in &new_map {
3171            if !payload_map.contains_key(pt) || payload_map.get(pt) != Some(codec) {
3172                trace!(
3173                    "Payload type {} remapped: clock_rate={}, channels={}",
3174                    pt, codec.clock_rate, codec.channels
3175                );
3176            }
3177        }
3178
3179        *payload_map = new_map.clone();
3180
3181        // Update PT listeners in transport for fallback routing
3182        if let Some(receiver) = self.receiver() {
3183            if let Some(transport_weak) = self.rtp_transport.lock().unwrap().clone() {
3184                if let Some(transport) = transport_weak.upgrade() {
3185                    if let Some(tx) = receiver.packet_tx() {
3186                        for (&pt, _) in &new_map {
3187                            transport.register_pt_listener(pt, tx.clone());
3188                        }
3189                    }
3190                }
3191            }
3192        }
3193
3194        Ok(())
3195    }
3196
3197    /// Update RTP header extension mapping for reinvite scenarios
3198    pub fn update_extmap(&self, new_extmap: HashMap<u8, String>) -> RtcResult<()> {
3199        let mut extmap = self.extmap.write().unwrap();
3200
3201        // Log changes
3202        for (id, uri) in &new_extmap {
3203            if !extmap.contains_key(id) || extmap.get(id) != Some(uri) {
3204                trace!("Extmap ID {} remapped to {}", id, uri);
3205            }
3206        }
3207
3208        *extmap = new_extmap;
3209
3210        // Update transport extension IDs if available
3211        if let Some(weak_transport) = self.rtp_transport.lock().unwrap().as_ref() {
3212            if let Some(transport) = weak_transport.upgrade() {
3213                let id = extmap
3214                    .iter()
3215                    .find(|(_, uri)| uri.as_str() == crate::sdp::ABS_SEND_TIME_URI)
3216                    .map(|(id, _)| *id);
3217                transport.set_abs_send_time_extension_id(id);
3218
3219                let id = extmap
3220                    .iter()
3221                    .find(|(_, uri)| uri.contains("rtp-stream-id"))
3222                    .map(|(id, _)| *id);
3223                transport.set_rid_extension_id(id);
3224            }
3225        }
3226
3227        Ok(())
3228    }
3229
3230    /// Get current payload type mapping (for testing/debugging)
3231    pub fn get_payload_map(&self) -> HashMap<u8, RtpCodecParameters> {
3232        self.payload_map.read().unwrap().clone()
3233    }
3234
3235    /// Get current extmap (for testing/debugging)
3236    pub fn get_extmap(&self) -> HashMap<u8, String> {
3237        self.extmap.read().unwrap().clone()
3238    }
3239}
3240
3241pub struct RtpSender {
3242    track: Arc<dyn MediaStreamTrack>,
3243    transport: Mutex<Option<Arc<RtpTransport>>>,
3244    ssrc: u32,
3245    params: Arc<Mutex<RtpCodecParameters>>,
3246    track_id: Arc<str>,
3247    stream_id: Arc<str>,
3248    cname: Arc<str>,
3249    rtcp_tx: broadcast::Sender<RtcpPacket>,
3250    stop_tx: Arc<tokio::sync::Notify>,
3251    next_sequence_number: Arc<AtomicU16>,
3252    interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3253}
3254
3255pub struct RtpSenderBuilder {
3256    track: Arc<dyn MediaStreamTrack>,
3257    ssrc: u32,
3258    stream_id: String,
3259    params: RtpCodecParameters,
3260    interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3261}
3262
3263impl RtpSenderBuilder {
3264    pub fn new(track: Arc<dyn MediaStreamTrack>, ssrc: u32) -> Self {
3265        Self {
3266            track,
3267            ssrc,
3268            stream_id: "stream".to_string(),
3269            params: RtpCodecParameters::default(),
3270            interceptors: Vec::new(),
3271        }
3272    }
3273
3274    pub fn stream_id(mut self, id: String) -> Self {
3275        self.stream_id = id;
3276        self
3277    }
3278
3279    pub fn params(mut self, params: RtpCodecParameters) -> Self {
3280        self.params = params;
3281        self
3282    }
3283
3284    pub fn nack(mut self, buffer_size: usize) -> Self {
3285        self.interceptors
3286            .push(Arc::new(DefaultRtpSenderNackHandler::new(buffer_size)));
3287        self
3288    }
3289
3290    pub fn bitrate_controller(mut self) -> Self {
3291        self.interceptors
3292            .push(Arc::new(DefaultRtpSenderBitrateHandler::new()));
3293        self
3294    }
3295
3296    pub fn interceptor(mut self, interceptor: Arc<dyn RtpSenderInterceptor>) -> Self {
3297        self.interceptors.push(interceptor);
3298        self
3299    }
3300
3301    pub fn build(self) -> Arc<RtpSender> {
3302        Arc::new(RtpSender::new_internal(
3303            self.track,
3304            self.ssrc,
3305            self.stream_id,
3306            self.params,
3307            self.interceptors,
3308        ))
3309    }
3310}
3311
3312impl RtpSender {
3313    pub fn builder(track: Arc<dyn MediaStreamTrack>, ssrc: u32) -> RtpSenderBuilder {
3314        RtpSenderBuilder::new(track, ssrc)
3315    }
3316
3317    pub fn new(
3318        track: Arc<dyn MediaStreamTrack>,
3319        ssrc: u32,
3320        stream_id: String,
3321        params: RtpCodecParameters,
3322        interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3323    ) -> Self {
3324        Self::new_internal(track, ssrc, stream_id, params, interceptors)
3325    }
3326
3327    fn new_internal(
3328        track: Arc<dyn MediaStreamTrack>,
3329        ssrc: u32,
3330        stream_id: String,
3331        params: RtpCodecParameters,
3332        interceptors: Vec<Arc<dyn RtpSenderInterceptor + Send + Sync>>,
3333    ) -> Self {
3334        let track_label = track.id().to_string();
3335        let track_id = Arc::<str>::from(track_label.clone());
3336        let stream_id = Arc::<str>::from(stream_id);
3337        let cname = Arc::<str>::from(format!("rustrtc-cname-{ssrc}"));
3338        let (rtcp_tx, _) = broadcast::channel(100);
3339        Self {
3340            track,
3341            transport: Mutex::new(None),
3342            ssrc,
3343            params: Arc::new(Mutex::new(params)),
3344            track_id,
3345            stream_id,
3346            cname,
3347            rtcp_tx,
3348            stop_tx: Arc::new(tokio::sync::Notify::new()),
3349            next_sequence_number: Arc::new(AtomicU16::new(random_u32() as u16)),
3350            interceptors,
3351        }
3352    }
3353
3354    pub fn ssrc(&self) -> u32 {
3355        self.ssrc
3356    }
3357
3358    pub fn cname(&self) -> &str {
3359        &self.cname
3360    }
3361
3362    pub fn track_id(&self) -> &str {
3363        &self.track_id
3364    }
3365
3366    pub fn stream_id(&self) -> &str {
3367        &self.stream_id
3368    }
3369
3370    pub fn subscribe_rtcp(&self) -> broadcast::Receiver<RtcpPacket> {
3371        self.rtcp_tx.subscribe()
3372    }
3373
3374    pub(crate) fn deliver_rtcp(&self, packet: RtcpPacket) {
3375        let _ = self.rtcp_tx.send(packet);
3376    }
3377
3378    pub fn params(&self) -> RtpCodecParameters {
3379        self.params.lock().unwrap().clone()
3380    }
3381
3382    pub fn interceptors(&self) -> &[Arc<dyn RtpSenderInterceptor + Send + Sync>] {
3383        &self.interceptors
3384    }
3385
3386    pub fn nack_handler(&self) -> Option<Arc<dyn NackStats>> {
3387        for i in &self.interceptors {
3388            if let Some(stats) = i.clone().as_nack_stats() {
3389                return Some(stats);
3390            }
3391        }
3392        None
3393    }
3394
3395    pub fn set_transport(&self, transport: Arc<RtpTransport>) {
3396        *self.transport.lock().unwrap() = Some(transport.clone());
3397        let track = self.track.clone();
3398        let ssrc = self.ssrc;
3399        let params_lock = self.params.clone();
3400        let stop_rx = self.stop_tx.clone();
3401        let next_seq = self.next_sequence_number.clone();
3402        let interceptors = self.interceptors.clone();
3403        let mut rtcp_rx = self.rtcp_tx.subscribe();
3404
3405        tokio::spawn(async move {
3406            let mut sequence_number = next_seq.load(Ordering::SeqCst);
3407            let mut last_source_ts: Option<u32> = None;
3408            let mut timestamp_offset = random_u32(); // Start with random offset
3409            let notified = stop_rx.notified();
3410            tokio::pin!(notified);
3411
3412            loop {
3413                tokio::select! {
3414                    _ = &mut notified => break,
3415                    rtcp = rtcp_rx.recv() => {
3416                        match rtcp {
3417                            Ok(packet) => {
3418                                for interceptor in &interceptors {
3419                                    interceptor.on_rtcp_received(&packet, transport.clone()).await;
3420                                }
3421                            }
3422                            _ => {}
3423                        }
3424                    }
3425                    res = track.recv() => {
3426                        match res {
3427                            Ok(mut sample) => {
3428                                let payload_type = {
3429                                    let p = params_lock.lock().unwrap();
3430                                    p.payload_type
3431                                };
3432
3433                                // Check if application provided sequence_number (indicates app wants control)
3434                                let app_controlled = match &sample {
3435                                    crate::media::MediaSample::Audio(f) => f.sequence_number.is_some(),
3436                                    crate::media::MediaSample::Video(f) => f.sequence_number.is_some(),
3437                                };
3438
3439                                // Always rewrite sequence numbers to ensure continuity on the wire
3440                                match &mut sample {
3441                                    crate::media::MediaSample::Audio(f) => f.sequence_number = None,
3442                                    crate::media::MediaSample::Video(f) => f.sequence_number = None,
3443                                }
3444
3445                                let mut packet = sample.into_rtp_packet(
3446                                    ssrc,
3447                                    payload_type,
3448                                    &mut sequence_number,
3449                                );
3450
3451                                // Update the shared next_sequence_number
3452                                next_seq.store(sequence_number, Ordering::SeqCst);
3453
3454                                if !app_controlled {
3455                                    // Application doesn't control seq/ts, use rustrtc's logic
3456                                    // Timestamp rewriting
3457                                    let src_ts = packet.header.timestamp;
3458                                    if let Some(last_src) = last_source_ts {
3459                                        let delta = src_ts.wrapping_sub(last_src);
3460                                        // Check if src_ts is newer (delta < 2^31)
3461                                        if delta < 0x80000000 {
3462                                            // If delta is very large (e.g. > 10 seconds), assume source switch/reset
3463                                            // 10 seconds * 90000 = 900,000.
3464                                            if delta > 900_000 {
3465                                                // Discontinuity detected.
3466                                                // We want the new timestamp to continue from where we left off.
3467                                                // But we don't track last_out_ts explicitly here, we rely on offset.
3468                                                // last_out_ts was (last_src + old_offset).
3469                                                // new_out_ts should be (last_out_ts + small_delta).
3470                                                // Let's assume small_delta = 3000 (1/30s at 90khz) or just 1 to be safe.
3471                                                // new_out_ts = last_src + old_offset + 3000.
3472                                                // new_out_ts = src_ts + new_offset.
3473                                                // => new_offset = last_src + old_offset + 3000 - src_ts.
3474                                                timestamp_offset = last_src.wrapping_add(timestamp_offset).wrapping_add(3000).wrapping_sub(src_ts);
3475                                            }
3476                                            last_source_ts = Some(src_ts);
3477                                        }
3478                                        // If src_ts is older (delta >= 2^31), it's an out-of-order packet.
3479                                        // We use the existing offset and do NOT update last_source_ts.
3480                                    } else {
3481                                        // First packet, establish offset
3482                                        // We want out_ts = src_ts + offset.
3483                                        // We initialized offset to random.
3484                                        // So out_ts will be random. Correct.
3485                                        last_source_ts = Some(src_ts);
3486                                    }
3487
3488                                    packet.header.timestamp = src_ts.wrapping_add(timestamp_offset);
3489
3490                                    // Rewrite sequence number
3491                                    packet.header.sequence_number = next_seq.fetch_add(1, Ordering::Relaxed);
3492                                }
3493
3494                                for interceptor in &interceptors {
3495                                    interceptor.on_packet_sent(&packet).await;
3496                                }
3497
3498                                if let Err(e) = transport.send_rtp(&packet).await {
3499                                    debug!("Failed to send RTP: {}", e);
3500                                }
3501                            }
3502                            Err(crate::media::error::MediaError::Lagged) => {
3503                                debug!("RtpSender: track lagged, skipping sample");
3504                                continue;
3505                            }
3506                            Err(_) => break,
3507                        }
3508                    }
3509                }
3510            }
3511        });
3512    }
3513}
3514
3515impl Drop for RtpSender {
3516    fn drop(&mut self) {
3517        self.stop_tx.notify_waiters();
3518    }
3519}
3520
3521pub struct RtpReceiver {
3522    track: Arc<SampleStreamTrack>,
3523    source: Arc<SampleStreamSource>,
3524    ssrc: Mutex<u32>,
3525    params: Mutex<RtpCodecParameters>,
3526    transport: Mutex<Option<Arc<RtpTransport>>>,
3527    packet_tx: Mutex<Option<mpsc::Sender<(crate::rtp::RtpPacket, std::net::SocketAddr)>>>,
3528    rtcp_feedback_ssrc: Mutex<Option<u32>>,
3529    rtx_ssrc: Mutex<Option<u32>>,
3530    fir_seq: AtomicU8,
3531    feedback_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
3532    simulcast_tracks: Mutex<
3533        HashMap<
3534            String,
3535            (
3536                Arc<SampleStreamSource>,
3537                Arc<SampleStreamTrack>,
3538                Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
3539                Arc<Mutex<Option<u32>>>,
3540            ),
3541        >,
3542    >,
3543    runner_tx: Mutex<Option<mpsc::UnboundedSender<ReceiverCommand>>>,
3544    interceptors: Vec<Arc<dyn RtpReceiverInterceptor>>,
3545    track_ready_event_tx: Mutex<Option<mpsc::UnboundedSender<PeerConnectionEvent>>>,
3546    track_ready_transceiver: Mutex<Option<Weak<RtpTransceiver>>>,
3547    track_event_sent: AtomicBool,
3548    pub depacketizer_factory: Arc<dyn DepacketizerFactory>,
3549}
3550
3551pub struct RtpReceiverBuilder {
3552    kind: MediaKind,
3553    ssrc: u32,
3554    interceptors: Vec<Arc<dyn RtpReceiverInterceptor>>,
3555    depacketizer_factory: Option<Arc<dyn DepacketizerFactory>>,
3556}
3557
3558impl RtpReceiverBuilder {
3559    pub fn new(kind: MediaKind, ssrc: u32) -> Self {
3560        Self {
3561            kind,
3562            ssrc,
3563            interceptors: Vec::new(),
3564            depacketizer_factory: None,
3565        }
3566    }
3567
3568    pub fn depacketizer_factory(mut self, factory: Arc<dyn DepacketizerFactory>) -> Self {
3569        self.depacketizer_factory = Some(factory);
3570        self
3571    }
3572
3573    pub fn nack(mut self) -> Self {
3574        self.interceptors
3575            .push(Arc::new(DefaultRtpReceiverNackHandler::new()));
3576        self
3577    }
3578
3579    pub fn interceptor(mut self, interceptor: Arc<dyn RtpReceiverInterceptor>) -> Self {
3580        self.interceptors.push(interceptor);
3581        self
3582    }
3583
3584    pub fn build(self) -> Arc<RtpReceiver> {
3585        let media_kind = match self.kind {
3586            MediaKind::Audio => crate::media::frame::MediaKind::Audio,
3587            MediaKind::Video => crate::media::frame::MediaKind::Video,
3588            _ => crate::media::frame::MediaKind::Audio,
3589        };
3590        let (source, track, feedback_rx) = sample_track(media_kind, 100);
3591
3592        let params = match self.kind {
3593            MediaKind::Audio => RtpCodecParameters {
3594                payload_type: 111,
3595                clock_rate: 48000,
3596                channels: 2,
3597            },
3598            MediaKind::Video => RtpCodecParameters {
3599                payload_type: 96,
3600                clock_rate: 90000,
3601                channels: 0,
3602            },
3603            _ => RtpCodecParameters::default(),
3604        };
3605
3606        Arc::new(RtpReceiver {
3607            track,
3608            source: Arc::new(source),
3609            ssrc: Mutex::new(self.ssrc),
3610            params: Mutex::new(params),
3611            transport: Mutex::new(None),
3612            packet_tx: Mutex::new(None),
3613            rtcp_feedback_ssrc: Mutex::new(None),
3614            rtx_ssrc: Mutex::new(None),
3615            fir_seq: AtomicU8::new(0),
3616            feedback_rx: Arc::new(tokio::sync::Mutex::new(feedback_rx)),
3617            simulcast_tracks: Mutex::new(HashMap::new()),
3618            runner_tx: Mutex::new(None),
3619            interceptors: self.interceptors,
3620            track_ready_event_tx: Mutex::new(None),
3621            track_ready_transceiver: Mutex::new(None),
3622            track_event_sent: AtomicBool::new(false),
3623            depacketizer_factory: self.depacketizer_factory.unwrap_or_else(|| {
3624                Arc::new(crate::media::depacketizer::DefaultDepacketizerFactory)
3625            }),
3626        })
3627    }
3628}
3629
3630impl RtpReceiver {
3631    pub fn new(
3632        kind: MediaKind,
3633        ssrc: u32,
3634        interceptors: Vec<Arc<dyn RtpReceiverInterceptor>>,
3635    ) -> Self {
3636        let media_kind = match kind {
3637            MediaKind::Audio => crate::media::frame::MediaKind::Audio,
3638            MediaKind::Video => crate::media::frame::MediaKind::Video,
3639            _ => crate::media::frame::MediaKind::Audio, // Fallback or panic
3640        };
3641        let (source, track, feedback_rx) = sample_track(media_kind, 100);
3642
3643        let params = match kind {
3644            MediaKind::Audio => RtpCodecParameters {
3645                payload_type: 111,
3646                clock_rate: 48000,
3647                channels: 2,
3648            },
3649            MediaKind::Video => RtpCodecParameters {
3650                payload_type: 96,
3651                clock_rate: 90000,
3652                channels: 0,
3653            },
3654            _ => RtpCodecParameters::default(),
3655        };
3656
3657        Self {
3658            track,
3659            source: Arc::new(source),
3660            ssrc: Mutex::new(ssrc),
3661            params: Mutex::new(params),
3662            transport: Mutex::new(None),
3663            packet_tx: Mutex::new(None),
3664            rtcp_feedback_ssrc: Mutex::new(None),
3665            rtx_ssrc: Mutex::new(None),
3666            fir_seq: AtomicU8::new(0),
3667            feedback_rx: Arc::new(tokio::sync::Mutex::new(feedback_rx)),
3668            simulcast_tracks: Mutex::new(HashMap::new()),
3669            runner_tx: Mutex::new(None),
3670            interceptors,
3671            track_ready_event_tx: Mutex::new(None),
3672            track_ready_transceiver: Mutex::new(None),
3673            track_event_sent: AtomicBool::new(false),
3674            depacketizer_factory: Arc::new(crate::media::depacketizer::DefaultDepacketizerFactory),
3675        }
3676    }
3677
3678    pub fn add_simulcast_track(self: &Arc<Self>, rid: String) -> Arc<SampleStreamTrack> {
3679        let (source, track, feedback_rx) = sample_track(self.track.kind(), 100);
3680        let source = Arc::new(source);
3681        let feedback_rx = Arc::new(tokio::sync::Mutex::new(feedback_rx));
3682        let simulcast_ssrc = Arc::new(Mutex::new(None));
3683
3684        // If runner is active, send command
3685        let runner_tx = self.runner_tx.lock().unwrap().clone();
3686        if let Some(tx) = runner_tx {
3687            let transport = self.transport.lock().unwrap().clone();
3688            if let Some(transport) = transport {
3689                let (packet_tx, packet_rx) = mpsc::channel(100);
3690                transport.register_rid_listener(rid.clone(), packet_tx);
3691
3692                let cmd = ReceiverCommand::AddTrack {
3693                    rid: Some(rid.clone()),
3694                    packet_rx,
3695                    feedback_rx: feedback_rx.clone(),
3696                    source: source.clone(),
3697                    simulcast_ssrc: simulcast_ssrc.clone(),
3698                };
3699                let _ = tx.send(cmd);
3700            }
3701        }
3702
3703        self.simulcast_tracks
3704            .lock()
3705            .unwrap()
3706            .insert(rid, (source, track.clone(), feedback_rx, simulcast_ssrc));
3707
3708        track
3709    }
3710
3711    pub fn track(&self) -> Arc<SampleStreamTrack> {
3712        self.track.clone()
3713    }
3714
3715    pub fn nack_handler(&self) -> Option<Arc<dyn NackStats>> {
3716        for i in &self.interceptors {
3717            if let Some(stats) = i.clone().as_nack_stats() {
3718                return Some(stats);
3719            }
3720        }
3721        None
3722    }
3723
3724    pub fn simulcast_track(&self, rid: &str) -> Option<Arc<SampleStreamTrack>> {
3725        let tracks = self.simulcast_tracks.lock().unwrap();
3726        tracks.get(rid).map(|(_, track, _, _)| track.clone())
3727    }
3728
3729    pub fn get_simulcast_rids(&self) -> Vec<String> {
3730        let tracks = self.simulcast_tracks.lock().unwrap();
3731        tracks.keys().cloned().collect()
3732    }
3733
3734    pub fn set_params(&self, params: RtpCodecParameters) {
3735        *self.params.lock().unwrap() = params;
3736    }
3737
3738    pub fn ssrc(&self) -> u32 {
3739        *self.ssrc.lock().unwrap()
3740    }
3741
3742    pub fn packet_tx(&self) -> Option<mpsc::Sender<(crate::rtp::RtpPacket, std::net::SocketAddr)>> {
3743        self.packet_tx.lock().unwrap().clone()
3744    }
3745
3746    pub fn rtx_ssrc(&self) -> Option<u32> {
3747        *self.rtx_ssrc.lock().unwrap()
3748    }
3749
3750    pub fn set_ssrc(&self, ssrc: u32) {
3751        *self.ssrc.lock().unwrap() = ssrc;
3752        let transport = self.transport.lock().unwrap().clone();
3753        let packet_tx = self.packet_tx.lock().unwrap().clone();
3754
3755        if let Some(transport) = transport
3756            && let Some(tx) = packet_tx
3757        {
3758            transport.register_listener_sync(ssrc, tx);
3759        }
3760    }
3761
3762    pub fn ensure_provisional_listener(&self) {
3763        let transport = self.transport.lock().unwrap().clone();
3764        let packet_tx = self.packet_tx.lock().unwrap().clone();
3765
3766        if let Some(transport) = transport
3767            && let Some(tx) = packet_tx
3768        {
3769            transport.register_provisional_listener(tx);
3770        }
3771    }
3772
3773    pub fn set_rtx_ssrc(&self, ssrc: u32) {
3774        *self.rtx_ssrc.lock().unwrap() = Some(ssrc);
3775    }
3776
3777    pub fn set_transport(
3778        self: &Arc<Self>,
3779        transport: Arc<RtpTransport>,
3780        event_tx: Option<mpsc::UnboundedSender<PeerConnectionEvent>>,
3781        transceiver: Option<Weak<RtpTransceiver>>,
3782    ) {
3783        *self.transport.lock().unwrap() = Some(transport.clone());
3784        *self.track_ready_event_tx.lock().unwrap() = event_tx;
3785        *self.track_ready_transceiver.lock().unwrap() = transceiver;
3786
3787        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3788        *self.runner_tx.lock().unwrap() = Some(cmd_tx);
3789
3790        let mut initial_tracks = Vec::new();
3791
3792        // Main track
3793        let (tx, rx) = mpsc::channel(2000);
3794        let ssrc = *self.ssrc.lock().unwrap();
3795        transport.register_listener_sync(ssrc, tx.clone());
3796        transport.register_provisional_listener(tx.clone());
3797
3798        // Also register current payload type as a fallback
3799        let pt = self.params.lock().unwrap().payload_type;
3800        transport.register_pt_listener(pt, tx.clone());
3801
3802        *self.packet_tx.lock().unwrap() = Some(tx);
3803
3804        initial_tracks.push(ReceiverCommand::AddTrack {
3805            rid: None,
3806            packet_rx: rx,
3807            feedback_rx: self.feedback_rx.clone(),
3808            source: self.source.clone(),
3809            simulcast_ssrc: Arc::new(Mutex::new(None)),
3810        });
3811
3812        // Simulcast tracks
3813        let tracks_guard = self.simulcast_tracks.lock().unwrap();
3814        for (rid, (source, _, feedback_rx, simulcast_ssrc)) in tracks_guard.iter() {
3815            let (tx, rx) = mpsc::channel(2000);
3816            transport.register_rid_listener(rid.clone(), tx);
3817            initial_tracks.push(ReceiverCommand::AddTrack {
3818                rid: Some(rid.clone()),
3819                packet_rx: rx,
3820                feedback_rx: feedback_rx.clone(),
3821                source: source.clone(),
3822                simulcast_ssrc: simulcast_ssrc.clone(),
3823            });
3824        }
3825        drop(tracks_guard);
3826
3827        let weak_self = Arc::downgrade(self);
3828        tokio::spawn(async move {
3829            Self::run_loop(weak_self, cmd_rx, initial_tracks).await;
3830        });
3831    }
3832
3833    async fn run_loop(
3834        weak_self: Weak<Self>,
3835        mut cmd_rx: mpsc::UnboundedReceiver<ReceiverCommand>,
3836        initial_tracks: Vec<ReceiverCommand>,
3837    ) {
3838        let depacketizer_factory = if let Some(receiver) = weak_self.upgrade() {
3839            receiver.depacketizer_factory.clone()
3840        } else {
3841            Arc::new(crate::media::depacketizer::DefaultDepacketizerFactory)
3842        };
3843
3844        let mut futures = FuturesUnordered::new();
3845        let mut tracks = HashMap::new();
3846
3847        fn handle_add_track(
3848            cmd: ReceiverCommand,
3849            futures: &mut FuturesUnordered<Pin<Box<dyn Future<Output = LoopEvent> + Send>>>,
3850            tracks: &mut HashMap<
3851                Option<String>,
3852                (
3853                    Arc<crate::media::track::SampleStreamSource>,
3854                    Arc<Mutex<Option<u32>>>,
3855                    Arc<tokio::sync::Mutex<mpsc::Receiver<crate::media::track::FeedbackEvent>>>,
3856                ),
3857            >,
3858            depacketizer_factory: &Arc<dyn DepacketizerFactory>,
3859        ) {
3860            let ReceiverCommand::AddTrack {
3861                rid,
3862                packet_rx,
3863                feedback_rx,
3864                source,
3865                simulcast_ssrc,
3866            } = cmd;
3867
3868            tracks.insert(
3869                rid.clone(),
3870                (source.clone(), simulcast_ssrc, feedback_rx.clone()),
3871            );
3872
3873            let rid_clone = rid.clone();
3874            // Initialize depacketizer
3875            let depacketizer = depacketizer_factory.create(source.kind());
3876
3877            futures.push(Box::pin(async move {
3878                let mut rx = packet_rx;
3879                let packet = rx.recv().await;
3880                LoopEvent::Packet(packet, rid_clone, rx, depacketizer)
3881            }));
3882
3883            let rid_clone = rid.clone();
3884            futures.push(Box::pin(async move {
3885                let event = {
3886                    let mut lock = feedback_rx.lock().await;
3887                    lock.recv().await
3888                };
3889                LoopEvent::Feedback(event, rid_clone)
3890            }));
3891        }
3892
3893        for cmd in initial_tracks {
3894            handle_add_track(cmd, &mut futures, &mut tracks, &depacketizer_factory);
3895        }
3896
3897        loop {
3898            tokio::select! {
3899                cmd = cmd_rx.recv() => {
3900                    match cmd {
3901                        Some(cmd) => handle_add_track(cmd, &mut futures, &mut tracks, &depacketizer_factory),
3902                        None => break,
3903                    }
3904                }
3905                event = futures.next(), if !futures.is_empty() => {
3906                    if let Some(event) = event {
3907                        match event {
3908                            LoopEvent::Packet(packet_opt, rid, packet_rx, mut depacketizer) => {
3909                                if let Some((packet, addr)) = packet_opt {
3910                                    if let Some((source, simulcast_ssrc, _)) = tracks.get(&rid) {
3911                                        if rid.is_some() {
3912                                            let mut s = simulcast_ssrc.lock().unwrap();
3913                                            if s.is_none() {
3914                                                *s = Some(packet.header.ssrc);
3915                                            }
3916                                        } else {
3917                                            // Main track: Update SSRC if it matched via provisional listener
3918                                            if let Some(this) = weak_self.upgrade() {
3919                                                let mut s = this.ssrc.lock().unwrap();
3920                                                let old_ssrc = *s;
3921                                                if old_ssrc != packet.header.ssrc {
3922                                                    debug!(
3923                                                        "RTP main track SSRC changed from {} to {}",
3924                                                        old_ssrc, packet.header.ssrc
3925                                                    );
3926                                                    *s = packet.header.ssrc;
3927
3928                                                    // Send Track event after SSRC latching (RTP mode)
3929                                                    // Only send if we're using provisional SSRC and haven't sent before
3930                                                    if old_ssrc >= 2000 && old_ssrc < 3000 {
3931                                                        // Use swap to atomically check and set the flag
3932                                                        if !this.track_event_sent.swap(true, Ordering::SeqCst) {
3933                                                            if let Some(ref event_tx) = *this.track_ready_event_tx.lock().unwrap() {
3934                                                                let transceiver = this.track_ready_transceiver.lock().unwrap();
3935                                                                if let Some(transceiver) = transceiver.as_ref().and_then(|t| t.upgrade()) {
3936                                                                    let _ = event_tx.send(PeerConnectionEvent::Track(transceiver.clone()));
3937                                                                    debug!("RTP mode: Sent Track event after SSRC latching complete");
3938                                                                }
3939                                                            }
3940                                                        }
3941                                                    }
3942                                                }
3943                                            }
3944                                        }
3945
3946                                        if let Some(this) = weak_self.upgrade() {
3947                                            for interceptor in &this.interceptors {
3948                                                if let Some(mut rtcp_packet) = interceptor.on_packet_received(&packet).await {
3949                                                    if let RtcpPacket::GenericNack(ref mut nack) = rtcp_packet {
3950                                                        let sender_ssrc = this.rtcp_feedback_ssrc.lock().unwrap().unwrap_or(0);
3951                                                        if sender_ssrc != 0 {
3952                                                            nack.sender_ssrc = sender_ssrc;
3953                                                        } else {
3954                                                            debug!("NACK: skipping sender_ssrc update because it is 0");
3955                                                        }
3956                                                    }
3957
3958                                                    let transport = this.transport.lock().unwrap().clone();
3959                                                    if let Some(transport) = transport {
3960                                                        let _ = transport.send_rtcp(&[rtcp_packet]).await;
3961                                                    }
3962                                                }
3963                                            }
3964
3965                                            let params = this.params.lock().unwrap().clone();
3966                                            let clock_rate = params.clock_rate;
3967
3968                                            // Fix: Use Depacketizer to handle frames correctly
3969                                            if let Ok(samples) = depacketizer.push(packet.clone(), clock_rate, addr, source.kind()) {
3970                                                for sample in samples {
3971                                                    if let Err(e) = source.send(sample).await {
3972                                                         tracing::warn!("Failed to send media sample: {}", e);
3973                                                    }
3974                                                }
3975                                            }
3976
3977                                            let rid_clone = rid.clone();
3978                                            futures.push(Box::pin(async move {
3979                                                let mut rx = packet_rx;
3980                                                let packet = rx.recv().await;
3981                                                LoopEvent::Packet(packet, rid_clone, rx, depacketizer)
3982                                            }));
3983                                        } else {
3984                                            break;
3985                                        }
3986                                    }
3987                                }
3988                            }
3989                            LoopEvent::Feedback(event_opt, rid) => {
3990                                if let Some(event) = event_opt {
3991                                    if let Some((_, simulcast_ssrc, feedback_rx)) = tracks.get(&rid) {
3992                                        if let Some(this) = weak_self.upgrade() {
3993                                            match event {
3994                                                crate::media::track::FeedbackEvent::RequestKeyFrame => {
3995                                                    let media_ssrc = if rid.is_some() {
3996                                                        *simulcast_ssrc.lock().unwrap()
3997                                                    } else {
3998                                                        Some(*this.ssrc.lock().unwrap())
3999                                                    };
4000
4001                                                    if let Some(ssrc) = media_ssrc {
4002                                                        let sender_ssrc = *this.rtcp_feedback_ssrc.lock().unwrap();
4003                                                        let pli = crate::rtp::PictureLossIndication {
4004                                                            sender_ssrc: sender_ssrc.unwrap_or(0),
4005                                                            media_ssrc: ssrc,
4006                                                        };
4007                                                        let packet = crate::rtp::RtcpPacket::PictureLossIndication(pli);
4008
4009                                                        let transport = this.transport.lock().unwrap().clone();
4010                                                        if let Some(transport) = transport {
4011                                                            if let Err(e) = transport.send_rtcp(&[packet]).await {
4012                                                                debug!("Failed to send PLI: {}", e);
4013                                                            }
4014                                                        }
4015                                                    }
4016                                                }
4017                                            }
4018
4019                                            let rid_clone = rid.clone();
4020                                            let feedback_rx = feedback_rx.clone();
4021                                            futures.push(Box::pin(async move {
4022                                                let event = {
4023                                                    let mut lock = feedback_rx.lock().await;
4024                                                    lock.recv().await
4025                                                };
4026                                                LoopEvent::Feedback(event, rid_clone)
4027                                            }));
4028                                        } else {
4029                                            break;
4030                                        }
4031                                    }
4032                                }
4033                            }
4034                        }
4035                    }
4036                }
4037            }
4038        }
4039    }
4040
4041    pub fn set_feedback_ssrc(&self, ssrc: u32) {
4042        *self.rtcp_feedback_ssrc.lock().unwrap() = Some(ssrc);
4043    }
4044
4045    pub async fn send_nack(&self, lost_packets: Vec<u16>) -> RtcResult<()> {
4046        let transport = self.transport.lock().unwrap().clone();
4047        if let Some(transport) = transport {
4048            let media_ssrc = *self.ssrc.lock().unwrap();
4049            let sender_ssrc = (*self.rtcp_feedback_ssrc.lock().unwrap()).unwrap_or(media_ssrc);
4050
4051            let nack = crate::rtp::GenericNack {
4052                sender_ssrc,
4053                media_ssrc,
4054                lost_packets,
4055            };
4056            let packet = RtcpPacket::GenericNack(nack);
4057            transport
4058                .send_rtcp(&[packet])
4059                .await
4060                .map_err(|e| RtcError::Internal(format!("Failed to send NACK: {}", e)))?;
4061            Ok(())
4062        } else {
4063            Err(RtcError::InvalidState("Transport not set".into()))
4064        }
4065    }
4066
4067    pub async fn request_key_frame(&self) -> RtcResult<()> {
4068        let transport = self.transport.lock().unwrap().clone();
4069        if let Some(transport) = transport {
4070            let media_ssrc = *self.ssrc.lock().unwrap();
4071            let sender_ssrc = (*self.rtcp_feedback_ssrc.lock().unwrap()).unwrap_or(media_ssrc);
4072
4073            // Try FIR
4074            let seq = self.fir_seq.fetch_add(1, Ordering::Relaxed);
4075            let fir = FullIntraRequest {
4076                sender_ssrc,
4077                requests: vec![FirRequest {
4078                    ssrc: media_ssrc,
4079                    sequence_number: seq,
4080                }],
4081            };
4082            let packet_fir = RtcpPacket::FullIntraRequest(fir);
4083
4084            let pli = PictureLossIndication {
4085                sender_ssrc,
4086                media_ssrc,
4087            };
4088            let packet_pli = RtcpPacket::PictureLossIndication(pli);
4089            transport
4090                .send_rtcp(&[packet_fir, packet_pli])
4091                .await
4092                .map_err(|e| RtcError::Internal(format!("Failed to send PLI: {}", e)))?;
4093            Ok(())
4094        } else {
4095            Err(RtcError::InvalidState("Transport not set".into()))
4096        }
4097    }
4098}
4099
4100#[cfg(test)]
4101mod tests {
4102    use super::*;
4103    use crate::transports::ice::IceTransportState;
4104    use crate::{Direction, MediaKind, RtcConfiguration};
4105
4106    const AUDIO_PAYLOAD_TYPE: u8 = 111;
4107    const VIDEO_PAYLOAD_TYPE: u8 = 96;
4108    const SCTP_FORMAT: &str = "webrtc-datachannel";
4109    const SCTP_PORT: u16 = 5000;
4110
4111    #[tokio::test]
4112    async fn create_offer_contains_transceiver() {
4113        let pc = PeerConnection::new(RtcConfiguration::default());
4114        let transceiver = pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4115
4116        // Add a sender so direction is not downgraded
4117        let (_, track, _) = sample_track(crate::media::frame::MediaKind::Audio, 48000);
4118        let params = RtpCodecParameters {
4119            payload_type: 111,
4120            clock_rate: 48000,
4121            channels: 2,
4122        };
4123        let sender = RtpSender::builder(track, 12345)
4124            .stream_id("stream".to_string())
4125            .params(params)
4126            .build();
4127        transceiver.set_sender(Some(sender));
4128
4129        // First create_offer triggers gathering
4130        let _ = pc.create_offer().await.unwrap();
4131
4132        // Wait for gathering to complete to ensure we have candidates and end-of-candidates
4133        pc.wait_for_gathering_complete().await;
4134
4135        // Create offer again to get the candidates
4136        let offer = pc.create_offer().await.unwrap();
4137
4138        assert_eq!(offer.media_sections.len(), 1);
4139        let section = &offer.media_sections[0];
4140        assert_eq!(section.kind, MediaKind::Audio);
4141        assert_eq!(section.direction, Direction::SendRecv);
4142        assert_eq!(section.formats, vec![AUDIO_PAYLOAD_TYPE.to_string()]);
4143        let attrs = &section.attributes;
4144        assert!(attrs.iter().any(|attr| attr.key == "ice-ufrag"));
4145        assert!(attrs.iter().any(|attr| attr.key == "ice-pwd"));
4146
4147        // Should have msid-semantic
4148        assert!(
4149            offer
4150                .session
4151                .attributes
4152                .iter()
4153                .any(|a| a.key == "msid-semantic")
4154        );
4155
4156        // Should have msid in media section
4157        assert!(attrs.iter().any(|a| a.key == "msid"));
4158
4159        // Should have ssrc in media section
4160        assert!(attrs.iter().any(|a| a.key == "ssrc"));
4161        assert!(attrs.iter().any(|attr| attr.key == "ice-options"));
4162        assert!(attrs.iter().any(|attr| attr.key == "end-of-candidates"));
4163        assert!(attrs.iter().filter(|attr| attr.key == "candidate").count() >= 1);
4164        assert!(attrs.iter().any(|attr| {
4165            attr.key == "rtpmap"
4166                && attr
4167                    .value
4168                    .as_deref()
4169                    .map(|v| v.contains("opus"))
4170                    .unwrap_or(false)
4171        }));
4172        assert!(attrs.iter().any(|attr| attr.key == "fingerprint"));
4173        assert!(attrs.iter().any(|attr| {
4174            attr.key == "setup"
4175                && attr
4176                    .value
4177                    .as_deref()
4178                    .map(|v| v == "actpass")
4179                    .unwrap_or(false)
4180        }));
4181        assert_eq!(pc.signaling_state(), SignalingState::Stable);
4182    }
4183
4184    #[tokio::test]
4185    async fn offer_includes_video_capabilities() {
4186        let pc = PeerConnection::new(RtcConfiguration::default());
4187        pc.add_transceiver(MediaKind::Video, TransceiverDirection::SendRecv);
4188        let offer = pc.create_offer().await.unwrap();
4189        let section = &offer.media_sections[0];
4190        assert_eq!(section.kind, MediaKind::Video);
4191        assert_eq!(section.formats, vec![VIDEO_PAYLOAD_TYPE.to_string()]);
4192        let attrs = &section.attributes;
4193        assert!(attrs.iter().any(|attr| attr.key == "rtcp-fb"));
4194        assert!(attrs.iter().any(|attr| {
4195            attr.key == "rtpmap"
4196                && attr
4197                    .value
4198                    .as_deref()
4199                    .map(|v| v.contains("VP8"))
4200                    .unwrap_or(false)
4201        }));
4202    }
4203
4204    #[tokio::test]
4205    async fn offer_includes_application_capabilities() {
4206        let pc = PeerConnection::new(RtcConfiguration::default());
4207        pc.add_transceiver(MediaKind::Application, TransceiverDirection::SendRecv);
4208        let offer = pc.create_offer().await.unwrap();
4209        let section = &offer.media_sections[0];
4210        assert_eq!(section.kind, MediaKind::Application);
4211        assert_eq!(section.protocol, "UDP/DTLS/SCTP");
4212        assert_eq!(section.formats, vec![SCTP_FORMAT.to_string()]);
4213        let attrs = &section.attributes;
4214        let expected_port = SCTP_PORT.to_string();
4215        assert!(attrs.iter().any(|attr| {
4216            attr.key == "sctp-port"
4217                && attr
4218                    .value
4219                    .as_deref()
4220                    .map(|v| v == expected_port)
4221                    .unwrap_or(false)
4222        }));
4223    }
4224
4225    #[tokio::test]
4226    async fn test_simulcast_setup() {
4227        use crate::{SdpType, SessionDescription};
4228        let pc = PeerConnection::new(RtcConfiguration::default());
4229
4230        // Create SDP with Simulcast
4231        // We need to include extmap for RID
4232        let sdp_str = "v=0\r\n\
4233                       o=- 123456 0 IN IP4 127.0.0.1\r\n\
4234                       s=-\r\n\
4235                       t=0 0\r\n\
4236                       a=extmap:3 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id\r\n\
4237                       c=IN IP4 127.0.0.1\r\n\
4238                       m=video 9 RTP/SAVPF 96\r\n\
4239                       a=rtpmap:96 VP8/90000\r\n\
4240                       a=rid:hi send\r\n\
4241                       a=rid:mid send\r\n\
4242                       a=rid:lo send\r\n\
4243                       a=simulcast:send hi;mid;lo\r\n";
4244
4245        let desc = SessionDescription::parse(SdpType::Offer, sdp_str).unwrap();
4246        pc.set_remote_description(desc).await.unwrap();
4247
4248        let transceivers = pc.inner.transceivers.lock().unwrap();
4249        assert_eq!(transceivers.len(), 1);
4250        let t = &transceivers[0];
4251        let rx = t.receiver.lock().unwrap().as_ref().unwrap().clone();
4252
4253        // Check simulcast tracks
4254        let simulcast_tracks = rx.simulcast_tracks.lock().unwrap();
4255        assert!(simulcast_tracks.contains_key("hi"));
4256        assert!(simulcast_tracks.contains_key("mid"));
4257        assert!(simulcast_tracks.contains_key("lo"));
4258        assert_eq!(simulcast_tracks.len(), 3);
4259    }
4260
4261    #[tokio::test]
4262    async fn test_rtcp_mux_detection() {
4263        use crate::{SdpType, SessionDescription, TransportMode};
4264        // Setup PC in RTP mode
4265        let mut config = RtcConfiguration::default();
4266        config.transport_mode = TransportMode::Rtp;
4267        let pc = PeerConnection::new(config);
4268
4269        // Create SDP without rtcp-mux
4270        let sdp_str = "v=0\r\n\
4271                       o=- 123456 0 IN IP4 127.0.0.1\r\n\
4272                       s=-\r\n\
4273                       t=0 0\r\n\
4274                       c=IN IP4 127.0.0.1\r\n\
4275                       m=audio 4000 RTP/AVP 111\r\n\
4276                       a=rtpmap:111 opus/48000/2\r\n";
4277        let desc = SessionDescription::parse(SdpType::Offer, sdp_str).unwrap();
4278
4279        pc.set_remote_description(desc).await.unwrap();
4280
4281        // Wait for connection
4282        let mut state_rx = pc.subscribe_peer_state();
4283        loop {
4284            if *state_rx.borrow() == PeerConnectionState::Connected {
4285                break;
4286            }
4287            state_rx.changed().await.unwrap();
4288        }
4289
4290        // Now check IceConn
4291        let rtp_transport = pc.inner.rtp_transport.lock().unwrap().clone().unwrap();
4292        let ice_conn = rtp_transport.ice_conn();
4293        let rtcp_addr = *ice_conn.remote_rtcp_addr.read().unwrap();
4294
4295        assert!(rtcp_addr.is_some());
4296        assert_eq!(rtcp_addr.unwrap().port(), 4001);
4297    }
4298
4299    #[tokio::test]
4300    async fn test_rtcp_mux_enabled() {
4301        use crate::{SdpType, SessionDescription, TransportMode};
4302        // Setup PC in RTP mode
4303        let mut config = RtcConfiguration::default();
4304        config.transport_mode = TransportMode::Rtp;
4305        let pc = PeerConnection::new(config);
4306
4307        // Create SDP WITH rtcp-mux
4308        let sdp_str = "v=0\r\n\
4309                       o=- 123456 0 IN IP4 127.0.0.1\r\n\
4310                       s=-\r\n\
4311                       t=0 0\r\n\
4312                       c=IN IP4 127.0.0.1\r\n\
4313                       m=audio 4000 RTP/AVP 111\r\n\
4314                       a=rtcp-mux\r\n\
4315                       a=rtpmap:111 opus/48000/2\r\n";
4316        let desc = SessionDescription::parse(SdpType::Offer, sdp_str).unwrap();
4317
4318        pc.set_remote_description(desc).await.unwrap();
4319
4320        let mut state_rx = pc.subscribe_peer_state();
4321        loop {
4322            if *state_rx.borrow() == PeerConnectionState::Connected {
4323                break;
4324            }
4325            state_rx.changed().await.unwrap();
4326        }
4327
4328        let rtp_transport = pc.inner.rtp_transport.lock().unwrap().clone().unwrap();
4329        let ice_conn = rtp_transport.ice_conn();
4330        let rtcp_addr = *ice_conn.remote_rtcp_addr.read().unwrap();
4331
4332        assert!(rtcp_addr.is_none());
4333    }
4334
4335    #[tokio::test]
4336    async fn set_local_description_transitions_state() {
4337        let pc = PeerConnection::new(RtcConfiguration::default());
4338        pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4339        let offer = pc.create_offer().await.unwrap();
4340        pc.set_local_description(offer.clone()).unwrap();
4341        assert_eq!(pc.signaling_state(), SignalingState::HaveLocalOffer);
4342
4343        let mut answer = offer.clone();
4344        answer.sdp_type = SdpType::Answer;
4345        pc.set_remote_description(answer).await.unwrap();
4346        assert_eq!(pc.signaling_state(), SignalingState::Stable);
4347    }
4348
4349    #[tokio::test]
4350    async fn create_answer_requires_remote_offer() {
4351        let pc = PeerConnection::new(RtcConfiguration::default());
4352        pc.add_transceiver(MediaKind::Video, TransceiverDirection::SendOnly);
4353        let err = pc.create_answer().await.unwrap_err();
4354        assert!(matches!(err, RtcError::InvalidState(_)));
4355
4356        let offer = pc.create_offer().await.unwrap();
4357        pc.set_remote_description(offer.clone()).await.unwrap();
4358        let answer = pc.create_answer().await.unwrap();
4359        assert_eq!(answer.media_sections.len(), 1);
4360        assert_eq!(answer.media_sections[0].direction, Direction::RecvOnly);
4361        pc.set_local_description(answer).unwrap();
4362        assert_eq!(pc.signaling_state(), SignalingState::Stable);
4363    }
4364
4365    #[tokio::test]
4366    async fn remote_answer_without_local_offer_is_error() {
4367        let pc = PeerConnection::new(RtcConfiguration::default());
4368        pc.add_transceiver(MediaKind::Audio, TransceiverDirection::RecvOnly);
4369        let mut fake_answer = pc.create_offer().await.unwrap();
4370        fake_answer.sdp_type = SdpType::Answer;
4371        let err = pc.set_remote_description(fake_answer).await.unwrap_err();
4372        assert!(matches!(err, RtcError::InvalidState(_)));
4373    }
4374
4375    #[tokio::test]
4376    async fn peer_connection_exposes_ice_transport() {
4377        let pc = PeerConnection::new(RtcConfiguration::default());
4378        let ice = pc.ice_transport();
4379        assert_eq!(ice.state(), IceTransportState::New);
4380        assert_eq!(ice.config().ice_servers.len(), 0);
4381    }
4382
4383    #[tokio::test]
4384    async fn create_offer_rtp_mode() {
4385        use crate::TransportMode;
4386        let mut config = RtcConfiguration::default();
4387        config.transport_mode = TransportMode::Rtp;
4388        let pc = PeerConnection::new(config);
4389        let transceiver = pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4390
4391        // Add a sender so direction is not downgraded and RTP mode can advertise SSRC.
4392        let (_, track, _) = sample_track(crate::media::frame::MediaKind::Audio, 48000);
4393        let params = RtpCodecParameters {
4394            payload_type: 111,
4395            clock_rate: 48000,
4396            channels: 2,
4397        };
4398        let sender = RtpSender::builder(track, 12345)
4399            .stream_id("stream".to_string())
4400            .params(params)
4401            .build();
4402        transceiver.set_sender(Some(sender));
4403
4404        let offer = pc.create_offer().await.unwrap();
4405        let section = &offer.media_sections[0];
4406
4407        // Should NOT have ICE attributes
4408        assert!(!section.attributes.iter().any(|a| a.key == "ice-ufrag"));
4409        assert!(!section.attributes.iter().any(|a| a.key == "candidate"));
4410
4411        // Should NOT have DTLS fingerprint
4412        assert!(!section.attributes.iter().any(|a| a.key == "fingerprint"));
4413
4414        // Should NOT have msid-semantic
4415        assert!(
4416            !offer
4417                .session
4418                .attributes
4419                .iter()
4420                .any(|a| a.key == "msid-semantic")
4421        );
4422
4423        // Should NOT have msid in media section
4424        assert!(!section.attributes.iter().any(|a| a.key == "msid"));
4425
4426        // Should have ssrc in media section
4427        assert!(section.attributes.iter().any(|a| a.key == "ssrc"));
4428
4429        // Protocol should be RTP/AVP
4430        assert_eq!(section.protocol, "RTP/AVP");
4431    }
4432
4433    #[tokio::test]
4434    async fn create_offer_srtp_mode() {
4435        use crate::TransportMode;
4436        let mut config = RtcConfiguration::default();
4437        config.transport_mode = TransportMode::Srtp;
4438        let pc = PeerConnection::new(config);
4439        pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4440
4441        let offer = pc.create_offer().await.unwrap();
4442        let section = &offer.media_sections[0];
4443
4444        // Should NOT have ICE attributes
4445        assert!(!section.attributes.iter().any(|a| a.key == "ice-ufrag"));
4446        assert!(!section.attributes.iter().any(|a| a.key == "candidate"));
4447
4448        // Should have DTLS fingerprint
4449        assert!(section.attributes.iter().any(|a| a.key == "fingerprint"));
4450
4451        // Protocol should be UDP/TLS/RTP/SAVPF
4452        assert_eq!(section.protocol, "UDP/TLS/RTP/SAVPF");
4453    }
4454
4455    #[tokio::test]
4456    async fn test_ssrc_parsing_with_fid_group() {
4457        let _ = env_logger::builder().is_test(true).try_init();
4458        let pc = PeerConnection::new(RtcConfiguration::default());
4459
4460        // Mock SDP
4461        let sdp_str = "v=0\r\n\
4462o=- 123456 123456 IN IP4 127.0.0.1\r\n\
4463s=-\r\n\
4464t=0 0\r\n\
4465m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
4466c=IN IP4 127.0.0.1\r\n\
4467a=mid:0\r\n\
4468a=sendrecv\r\n\
4469a=rtpmap:96 VP8/90000\r\n\
4470a=ssrc:12345 cname:foo\r\n\
4471a=ssrc:67890 cname:foo\r\n\
4472a=ssrc-group:FID 12345 67890\r\n";
4473
4474        let sdp =
4475            crate::sdp::SessionDescription::parse(crate::sdp::SdpType::Offer, sdp_str).unwrap();
4476        pc.set_remote_description(sdp).await.unwrap();
4477
4478        let transceivers = pc.get_transceivers();
4479        assert_eq!(transceivers.len(), 1);
4480        let t = &transceivers[0];
4481        let receiver = t.receiver().unwrap();
4482
4483        assert_eq!(receiver.ssrc(), 12345);
4484        assert_eq!(receiver.rtx_ssrc(), Some(67890));
4485    }
4486
4487    #[tokio::test]
4488    async fn test_ssrc_parsing_with_fid_group_before_ssrc() {
4489        let _ = env_logger::builder().is_test(true).try_init();
4490        let pc = PeerConnection::new(RtcConfiguration::default());
4491
4492        // Mock SDP
4493        let sdp_str = "v=0\r\n\
4494o=- 123456 123456 IN IP4 127.0.0.1\r\n\
4495s=-\r\n\
4496t=0 0\r\n\
4497m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
4498c=IN IP4 127.0.0.1\r\n\
4499a=mid:0\r\n\
4500a=sendrecv\r\n\
4501a=rtpmap:96 VP8/90000\r\n\
4502a=ssrc-group:FID 12345 67890\r\n\
4503a=ssrc:12345 cname:foo\r\n\
4504a=ssrc:67890 cname:foo\r\n";
4505
4506        let sdp =
4507            crate::sdp::SessionDescription::parse(crate::sdp::SdpType::Offer, sdp_str).unwrap();
4508        pc.set_remote_description(sdp).await.unwrap();
4509
4510        let transceivers = pc.get_transceivers();
4511        assert_eq!(transceivers.len(), 1);
4512        let t = &transceivers[0];
4513        let receiver = t.receiver().unwrap();
4514
4515        assert_eq!(receiver.ssrc(), 12345);
4516        assert_eq!(receiver.rtx_ssrc(), Some(67890));
4517    }
4518
4519    #[tokio::test]
4520    async fn test_ssrc_parsing_rtx_first_group_last() {
4521        let _ = env_logger::builder().is_test(true).try_init();
4522        let pc = PeerConnection::new(RtcConfiguration::default());
4523
4524        // Mock SDP: RTX (67890) comes before Primary (12345), and Group is last.
4525        let sdp_str = "v=0\r\n\
4526o=- 123456 123456 IN IP4 127.0.0.1\r\n\
4527s=-\r\n\
4528t=0 0\r\n\
4529m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
4530c=IN IP4 127.0.0.1\r\n\
4531a=mid:0\r\n\
4532a=sendrecv\r\n\
4533a=rtpmap:96 VP8/90000\r\n\
4534a=ssrc:67890 cname:foo\r\n\
4535a=ssrc:12345 cname:foo\r\n\
4536a=ssrc-group:FID 12345 67890\r\n";
4537
4538        let sdp =
4539            crate::sdp::SessionDescription::parse(crate::sdp::SdpType::Offer, sdp_str).unwrap();
4540        pc.set_remote_description(sdp).await.unwrap();
4541
4542        let transceivers = pc.get_transceivers();
4543        assert_eq!(transceivers.len(), 1);
4544        let t = &transceivers[0];
4545        let receiver = t.receiver().unwrap();
4546
4547        println!("SSRC: {}", receiver.ssrc());
4548        println!("RTX SSRC: {:?}", receiver.rtx_ssrc());
4549        assert_eq!(receiver.ssrc(), 12345); // Should be Primary
4550        assert_eq!(receiver.rtx_ssrc(), Some(67890));
4551    }
4552
4553    #[test]
4554    fn test_sdes_key_generation_and_parsing() {
4555        let params = generate_sdes_key_params();
4556        assert!(params.starts_with("inline:"));
4557
4558        let key = parse_sdes_key_params(&params).expect("Failed to parse generated params");
4559        assert_eq!(key.len(), 30); // 30 bytes for AES_CM_128_HMAC_SHA1_80 (16 key + 14 salt)
4560
4561        // Test invalid params
4562        assert!(parse_sdes_key_params("invalid").is_err());
4563        assert!(parse_sdes_key_params("inline:invalid_base64").is_err());
4564    }
4565
4566    #[tokio::test]
4567    async fn create_offer_srtp_mode_includes_crypto() {
4568        use crate::TransportMode;
4569        let mut config = RtcConfiguration::default();
4570        config.transport_mode = TransportMode::Srtp;
4571        let pc = PeerConnection::new(config);
4572        pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendRecv);
4573
4574        let offer = pc.create_offer().await.unwrap();
4575        let section = &offer.media_sections[0];
4576
4577        // Should have crypto attribute
4578        let crypto = section.attributes.iter().find(|a| a.key == "crypto");
4579        assert!(crypto.is_some(), "Missing crypto attribute in SRTP mode");
4580
4581        let crypto_val = crypto.unwrap().value.as_ref().unwrap();
4582        assert!(crypto_val.starts_with("1 AES_CM_128_HMAC_SHA1_80 inline:"));
4583    }
4584
4585    #[tokio::test]
4586    async fn test_receiver_nack_handler() {
4587        use crate::rtp::RtpHeader;
4588        let handler = DefaultRtpReceiverNackHandler::new();
4589        let mut header = RtpHeader::new(96, 100, 0, 1234);
4590        let packet1 = RtpPacket::new(header.clone(), vec![1, 2, 3]);
4591
4592        // First packet initializes
4593        assert!(handler.on_packet_received(&packet1).await.is_none());
4594
4595        // Consecutive packet
4596        header.sequence_number = 101;
4597        let packet2 = RtpPacket::new(header.clone(), vec![4, 5, 6]);
4598        assert!(handler.on_packet_received(&packet2).await.is_none());
4599
4600        // Gap detected (102 missing)
4601        header.sequence_number = 103;
4602        let packet3 = RtpPacket::new(header.clone(), vec![7, 8, 9]);
4603        let res = handler
4604            .on_packet_received(&packet3)
4605            .await
4606            .expect("Should generate NACK");
4607        if let RtcpPacket::GenericNack(nack) = res {
4608            assert_eq!(nack.lost_packets, vec![102]);
4609            assert_eq!(nack.media_ssrc, 1234);
4610        } else {
4611            panic!("Expected GenericNack");
4612        }
4613
4614        // Multiple gap detected (104, 105 missing)
4615        header.sequence_number = 106;
4616        let packet4 = RtpPacket::new(header.clone(), vec![10]);
4617        let res = handler
4618            .on_packet_received(&packet4)
4619            .await
4620            .expect("Should generate NACK");
4621        if let RtcpPacket::GenericNack(nack) = res {
4622            assert_eq!(nack.lost_packets, vec![104, 105]);
4623        } else {
4624            panic!("Expected GenericNack");
4625        }
4626    }
4627
4628    #[tokio::test]
4629    async fn test_sender_nack_handler() {
4630        use crate::rtp::RtpHeader;
4631        use crate::transports::ice::conn::IceConn;
4632        use crate::transports::rtp::RtpTransport;
4633        use std::net::{Ipv4Addr, SocketAddr};
4634
4635        let handler = DefaultRtpSenderNackHandler::new(10);
4636        let mut header = RtpHeader::new(96, 100, 0, 1234);
4637        let packet1 = RtpPacket::new(header.clone(), vec![1, 2, 3]);
4638
4639        handler.on_packet_sent(&packet1).await;
4640
4641        // Mock transport (we just need it to not crash, though it won't actually send)
4642        let (_, socket_rx) = tokio::sync::watch::channel(None);
4643        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
4644        let ice_conn = IceConn::new(socket_rx, addr);
4645        let transport = Arc::new(RtpTransport::new(ice_conn, false));
4646
4647        let nack = GenericNack {
4648            sender_ssrc: 0,
4649            media_ssrc: 1234,
4650            lost_packets: vec![100],
4651        };
4652
4653        // This will retransmit
4654        handler
4655            .on_rtcp_received(&RtcpPacket::GenericNack(nack), transport)
4656            .await;
4657
4658        // Buffer overflow test
4659        for i in 101..115 {
4660            header.sequence_number = i;
4661            handler
4662                .on_packet_sent(&RtpPacket::new(header.clone(), vec![0]))
4663                .await;
4664        }
4665
4666        // Packet 100 should be gone now (buffer size 10, we sent 14 more)
4667        let nack_old = GenericNack {
4668            sender_ssrc: 0,
4669            media_ssrc: 1234,
4670            lost_packets: vec![100],
4671        };
4672
4673        // We can't easily check if it was sent without a mock transport that records sends,
4674        // but we can at least verify it doesn't panic and the logic runs.
4675        let (_, socket_rx2) = tokio::sync::watch::channel(None);
4676        let ice_conn2 = IceConn::new(socket_rx2, addr);
4677        let transport2 = Arc::new(RtpTransport::new(ice_conn2, false));
4678        handler
4679            .on_rtcp_received(&RtcpPacket::GenericNack(nack_old), transport2)
4680            .await;
4681    }
4682
4683    #[tokio::test]
4684    async fn test_nack_configuration() {
4685        let mut config = RtcConfiguration::default();
4686        config.nack_buffer_size = 200;
4687
4688        let pc = PeerConnection::new(config);
4689        let transceiver = pc.add_transceiver(MediaKind::Video, TransceiverDirection::SendRecv);
4690
4691        // Check receiver has handler
4692        let receiver = transceiver.receiver().unwrap();
4693        assert!(receiver.nack_handler().is_some());
4694
4695        // Check sender has handler
4696        let (_, track, _) = sample_track(crate::media::frame::MediaKind::Video, 90000);
4697        let sender = pc
4698            .add_track_with_stream_id(track, "stream1".to_string(), RtpCodecParameters::default())
4699            .unwrap();
4700        assert!(sender.nack_handler().is_some());
4701    }
4702
4703    #[tokio::test]
4704    async fn rtp_mode_sends_track_event_after_ssrc_latching() {
4705        // Test that in RTP mode, Track event is sent after SSRC latching
4706        let mut config = RtcConfiguration::default();
4707        config.transport_mode = TransportMode::Rtp;
4708
4709        let pc = PeerConnection::new(config);
4710
4711        // Add a transceiver (simulating SIP call setup)
4712        let transceiver = pc.add_transceiver(MediaKind::Audio, TransceiverDirection::RecvOnly);
4713
4714        // Create remote SDP offer (simulating SIP INVITE with SDP)
4715        let remote_sdp = "\
4716v=0
4717o=- 12345 12345 IN IP4 192.168.1.100
4718s=-
4719c=IN IP4 192.168.1.100
4720t=0 0
4721m=audio 9000 RTP/AVP 8
4722a=rtpmap:8 PCMA/8000
4723a=sendonly
4724a=mid:0
4725";
4726
4727        let remote_offer = SessionDescription::parse(SdpType::Offer, remote_sdp).unwrap();
4728        pc.set_remote_description(remote_offer).await.unwrap();
4729
4730        // Verify transceiver has receiver
4731        let receiver = transceiver.receiver().unwrap();
4732        let initial_ssrc = receiver.ssrc();
4733
4734        // In RTP mode, initial SSRC should be provisional (2000-2999 range)
4735        assert!(
4736            initial_ssrc >= 2000 && initial_ssrc < 3000,
4737            "Initial SSRC should be provisional, got {}",
4738            initial_ssrc
4739        );
4740
4741        println!(
4742            "✓ RTP mode test setup complete, initial provisional SSRC: {}",
4743            initial_ssrc
4744        );
4745        println!("✓ When real RTP packets arrive with actual SSRC, Track event will be sent");
4746        println!("✓ Track event sending logic is in place at SSRC latching point");
4747    }
4748
4749    #[tokio::test]
4750    async fn test_custom_depacketizer_strategy() {
4751        use crate::config::DepacketizerStrategy;
4752        use crate::media::depacketizer::{
4753            Depacketizer, DepacketizerFactory, PassThroughDepacketizer,
4754        };
4755        use crate::media::frame::MediaKind as FrameMediaKind;
4756
4757        #[derive(Debug)]
4758        struct MockFactory;
4759
4760        impl DepacketizerFactory for MockFactory {
4761            fn create(&self, _kind: FrameMediaKind) -> Box<dyn Depacketizer> {
4762                Box::new(PassThroughDepacketizer)
4763            }
4764        }
4765
4766        let factory: Arc<dyn DepacketizerFactory> = Arc::new(MockFactory);
4767        let mut config = RtcConfiguration::default();
4768        config.depacketizer_strategy = DepacketizerStrategy {
4769            factory: factory.clone(),
4770        };
4771
4772        let pc = PeerConnection::new(config);
4773
4774        let retrieved_config = pc.config();
4775        assert!(Arc::ptr_eq(
4776            &retrieved_config.depacketizer_strategy.factory,
4777            &factory
4778        ));
4779
4780        // Ensure adding transceiver works with custom strategy
4781        let transceiver = pc.add_transceiver(MediaKind::Video, TransceiverDirection::RecvOnly);
4782        assert_eq!(transceiver.kind(), MediaKind::Video);
4783    }
4784}