active_call/media/track/
rtp.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::Samples,
6    media::TrackId,
7    media::{
8        jitter::JitterBuffer,
9        negotiate::select_peer_media,
10        processor::ProcessorChain,
11        track::{Track, TrackConfig, TrackPacketSender},
12    },
13};
14use anyhow::Result;
15use async_trait::async_trait;
16use audio_codec::CodecType;
17use bytes::Bytes;
18use rsip::HostWithPort;
19use rsipstack::transport::{SipAddr, udp::UdpConnection};
20use std::{
21    io::Cursor,
22    net::{IpAddr, SocketAddr},
23    sync::{
24        Arc, Mutex,
25        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
26    },
27    time::Duration,
28};
29use tokio::{select, time::Instant, time::interval_at};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, info, trace, warn};
32use webrtc::{
33    rtcp::{
34        goodbye::Goodbye,
35        receiver_report::ReceiverReport,
36        reception_report::ReceptionReport,
37        sender_report::SenderReport,
38        source_description::{
39            SdesType, SourceDescription, SourceDescriptionChunk, SourceDescriptionItem,
40        },
41    },
42    rtp::{
43        codecs::g7xx::G7xxPayloader,
44        packet::Packet,
45        packetizer::{Packetizer, new_packetizer},
46        sequence::{Sequencer, new_random_sequencer},
47    },
48    sdp::{
49        MediaDescription, SessionDescription,
50        description::{
51            common::{Address, Attribute, ConnectionInformation},
52            media::{MediaName, RangedPort},
53            session::{
54                ATTR_KEY_RTCPMUX, ATTR_KEY_SEND_ONLY, ATTR_KEY_SEND_RECV, ATTR_KEY_SSRC, Origin,
55                TimeDescription, Timing,
56            },
57        },
58    },
59    util::{Marshal, Unmarshal},
60};
61const RTP_MTU: usize = 1500; // UDP MTU size
62const RTP_OUTBOUND_MTU: usize = 1200; // Standard MTU size
63const RTCP_SR_INTERVAL_MS: u64 = 5000; // 5 seconds RTCP sender report interval
64const DTMF_EVENT_DURATION_MS: u64 = 160; // Default DTMF event duration (in ms)
65const DTMF_EVENT_VOLUME: u8 = 10; // Default volume for DTMF events (0-63)
66const RTP_RESYNC_MIN_SKIP_PACKETS: u32 = 3; // Require at least this many missing packets before resyncing
67const RTP_RESYNC_COOLDOWN_FRAMES: u64 = 3; // Cooldown window (in frames) between resync attempts
68
69// STUN constants for ICE connectivity check
70const STUN_BINDING_REQUEST: u16 = 0x0001;
71const STUN_BINDING_RESPONSE: u16 = 0x0101;
72const STUN_MAGIC_COOKIE: u32 = 0x2112A442;
73const STUN_TRANSACTION_ID_SIZE: usize = 12;
74
75struct RtpTrackStats {
76    timestamp: Arc<AtomicU32>,
77    packet_count: Arc<AtomicU32>,
78    octet_count: Arc<AtomicU32>,
79    last_timestamp_update: Arc<AtomicU64>,
80    last_resync_ts: Arc<AtomicU64>,
81    received_packets: Arc<AtomicU32>,
82    received_octets: Arc<AtomicU32>,
83    expected_packets: Arc<AtomicU32>,
84    lost_packets: Arc<AtomicU32>,
85    highest_seq_num: Arc<AtomicU32>,
86    base_seq: Arc<AtomicU32>,
87    last_receive_seq: Arc<AtomicU32>,
88    jitter: Arc<AtomicU32>,
89    last_sr_timestamp: Arc<AtomicU64>,
90    last_sr_ntp: Arc<AtomicU64>,
91}
92
93impl RtpTrackStats {
94    fn new() -> Self {
95        Self {
96            timestamp: Arc::new(AtomicU32::new(0)),
97            packet_count: Arc::new(AtomicU32::new(0)),
98            octet_count: Arc::new(AtomicU32::new(0)),
99            last_timestamp_update: Arc::new(AtomicU64::new(0)),
100            last_resync_ts: Arc::new(AtomicU64::new(0)),
101            received_packets: Arc::new(AtomicU32::new(0)),
102            received_octets: Arc::new(AtomicU32::new(0)),
103            expected_packets: Arc::new(AtomicU32::new(0)),
104            lost_packets: Arc::new(AtomicU32::new(0)),
105            highest_seq_num: Arc::new(AtomicU32::new(0)),
106            base_seq: Arc::new(AtomicU32::new(0)),
107            last_receive_seq: Arc::new(AtomicU32::new(0)),
108            jitter: Arc::new(AtomicU32::new(0)),
109            last_sr_timestamp: Arc::new(AtomicU64::new(0)),
110            last_sr_ntp: Arc::new(AtomicU64::new(0)),
111        }
112    }
113
114    fn update_send_stats(&self, packet_len: u32, samples_per_packet: u32) {
115        self.packet_count.fetch_add(1, Ordering::Relaxed);
116        self.octet_count.fetch_add(packet_len, Ordering::Relaxed);
117        self.timestamp
118            .fetch_add(samples_per_packet, Ordering::Relaxed);
119    }
120
121    fn update_receive_stats(&self, seq_num: u32, payload_len: u32) {
122        let prev_received = self.received_packets.fetch_add(1, Ordering::Relaxed);
123        let received = prev_received + 1;
124        self.received_octets
125            .fetch_add(payload_len, Ordering::Relaxed);
126
127        if prev_received == 0 {
128            self.base_seq.store(seq_num, Ordering::Relaxed);
129            self.last_receive_seq.store(seq_num, Ordering::Relaxed);
130            self.highest_seq_num.store(seq_num, Ordering::Relaxed);
131            self.lost_packets.store(0, Ordering::Relaxed);
132            self.expected_packets.store(received, Ordering::Relaxed);
133        } else {
134            let last_seq = self.last_receive_seq.load(Ordering::Relaxed);
135            let gap = (seq_num as u16).wrapping_sub(last_seq as u16) as u32;
136
137            if gap > 0 && gap < 0x8000 {
138                if gap > 1 {
139                    self.lost_packets.fetch_add(gap - 1, Ordering::Relaxed);
140                }
141                self.last_receive_seq.store(seq_num, Ordering::Relaxed);
142                self.highest_seq_num.store(seq_num, Ordering::Relaxed);
143            }
144
145            let lost = self.lost_packets.load(Ordering::Relaxed);
146            self.expected_packets
147                .store(received + lost, Ordering::Relaxed);
148        }
149
150        let current_jitter = self.jitter.load(Ordering::Relaxed);
151        let new_jitter = (current_jitter + (seq_num % 100)) / 2;
152        self.jitter.store(new_jitter, Ordering::Relaxed);
153    }
154
155    fn store_sr_info(&self, rtp_time: u64, ntp_time: u64) {
156        self.last_sr_timestamp.store(rtp_time, Ordering::Relaxed);
157        self.last_sr_ntp.store(ntp_time, Ordering::Relaxed);
158    }
159
160    fn get_fraction_lost(&self) -> u8 {
161        let expected_packets = self.expected_packets.load(Ordering::Relaxed);
162        let lost_packets = self.lost_packets.load(Ordering::Relaxed);
163
164        if expected_packets > 0 {
165            ((lost_packets * 256) / expected_packets).min(255) as u8
166        } else {
167            0
168        }
169    }
170}
171
172pub struct RtpTrackBuilder {
173    cancel_token: Option<CancellationToken>,
174    track_id: TrackId,
175    config: TrackConfig,
176    local_addr: Option<IpAddr>,
177    external_addr: Option<IpAddr>,
178    rtp_socket: Option<UdpConnection>,
179    rtcp_socket: Option<UdpConnection>,
180    rtcp_mux: bool,
181    rtp_start_port: u16,
182    rtp_end_port: u16,
183    rtp_alloc_count: u32,
184    enabled_codecs: Vec<CodecType>,
185    ssrc_cname: String,
186    ssrc: u32,
187    ice_connectivity_check: bool,
188}
189pub struct RtpTrackInner {
190    dtmf_payload_type: u8,
191    payload_type: u8,
192    remote_description: Option<String>,
193    packetizer: Mutex<Option<Box<dyn Packetizer + Send + Sync>>>,
194    stats: Arc<RtpTrackStats>,
195    rtcp_mux: bool,
196    remote_addr: Option<SipAddr>,
197    remote_rtcp_addr: Option<SipAddr>,
198    enabled_codecs: Vec<CodecType>,
199    rtp_map: Vec<(u8, (CodecType, u32, u16))>,
200}
201
202pub struct RtpTrack {
203    ssrc: u32,
204    ssrc_cname: String,
205    track_id: TrackId,
206    config: TrackConfig,
207    cancel_token: CancellationToken,
208    processor_chain: ProcessorChain,
209    rtp_socket: UdpConnection,
210    rtcp_socket: UdpConnection,
211    encoder: TrackCodec,
212    sequencer: Box<dyn Sequencer + Send + Sync>,
213    sendrecv: AtomicBool,
214    ice_connectivity_check: bool,
215    inner: Arc<Mutex<RtpTrackInner>>,
216}
217
218enum PacketKind {
219    Rtp,
220    Rtcp,
221    Stun(u16),
222    Ignore,
223}
224impl RtpTrackBuilder {
225    pub fn new(track_id: TrackId, config: TrackConfig) -> Self {
226        let ssrc = rand::random::<u32>();
227        Self {
228            track_id,
229            config,
230            local_addr: None,
231            external_addr: None,
232            cancel_token: None,
233            rtp_socket: None,
234            rtcp_socket: None,
235            rtcp_mux: true,
236            rtp_start_port: 12000,
237            rtp_end_port: u16::MAX - 1,
238            rtp_alloc_count: 500,
239            enabled_codecs: vec![
240                #[cfg(feature = "opus")]
241                CodecType::Opus,
242                CodecType::G729,
243                CodecType::G722,
244                CodecType::PCMU,
245                CodecType::PCMA,
246                CodecType::TelephoneEvent,
247            ],
248            ssrc_cname: format!("rustpbx-{}", ssrc),
249            ssrc,
250            ice_connectivity_check: true, // Default enabled
251        }
252    }
253
254    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
255        self.ssrc = ssrc;
256        self.ssrc_cname = format!("rustpbx-{}", ssrc);
257        self
258    }
259
260    pub fn with_rtp_start_port(mut self, rtp_start_port: u16) -> Self {
261        self.rtp_start_port = rtp_start_port;
262        self
263    }
264    pub fn with_rtp_end_port(mut self, rtp_end_port: u16) -> Self {
265        self.rtp_end_port = rtp_end_port;
266        self
267    }
268    pub fn with_rtp_alloc_count(mut self, rtp_alloc_count: u32) -> Self {
269        self.rtp_alloc_count = rtp_alloc_count;
270        self
271    }
272    pub fn with_local_addr(mut self, local_addr: IpAddr) -> Self {
273        self.local_addr = Some(local_addr);
274        self
275    }
276
277    pub fn with_external_addr(mut self, external_addr: IpAddr) -> Self {
278        self.external_addr = Some(external_addr);
279        self
280    }
281
282    pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
283        self.cancel_token = Some(cancel_token);
284        self
285    }
286
287    pub fn with_rtp_socket(mut self, rtp_socket: UdpConnection) -> Self {
288        self.rtp_socket = Some(rtp_socket);
289        self
290    }
291    pub fn with_rtcp_socket(mut self, rtcp_socket: UdpConnection) -> Self {
292        self.rtcp_socket = Some(rtcp_socket);
293        self
294    }
295    pub fn with_rtcp_mux(mut self, rtcp_mux: bool) -> Self {
296        self.rtcp_mux = rtcp_mux;
297        self
298    }
299
300    pub fn with_enabled_codecs(mut self, enabled_codecs: Vec<CodecType>) -> Self {
301        self.enabled_codecs = enabled_codecs;
302        self
303    }
304    pub fn with_session_name(mut self, session_name: String) -> Self {
305        self.ssrc_cname = session_name;
306        self
307    }
308
309    pub fn with_ice_connectivity_check(mut self, enabled: bool) -> Self {
310        self.ice_connectivity_check = enabled;
311        self
312    }
313    pub async fn build_rtp_rtcp_conn(&self) -> Result<(UdpConnection, UdpConnection)> {
314        let addr = match self.local_addr {
315            Some(addr) => addr,
316            None => crate::net_tool::get_first_non_loopback_interface()?,
317        };
318        let mut rtp_conn = None;
319        let mut rtcp_conn = None;
320
321        for _ in 0..self.rtp_alloc_count {
322            let port = rand::random_range::<u16, _>(self.rtp_start_port..=self.rtp_end_port);
323            if port % 2 != 0 {
324                continue;
325            }
326            if let Ok(c) = UdpConnection::create_connection(
327                format!("{:?}:{}", addr, port).parse()?,
328                None,
329                self.cancel_token.clone(),
330            )
331            .await
332            {
333                if !self.rtcp_mux {
334                    // if rtcp mux is not enabled, we need to create a separate RTCP socket
335                    rtcp_conn = match UdpConnection::create_connection(
336                        format!("{:?}:{}", addr, port + 1).parse()?,
337                        None,
338                        self.cancel_token.clone(),
339                    )
340                    .await
341                    {
342                        Ok(c) => Some(c),
343                        Err(_) => {
344                            continue;
345                        }
346                    };
347                } else {
348                    rtcp_conn = Some(c.clone());
349                }
350                rtp_conn = Some(c);
351                break;
352            }
353        }
354
355        let mut rtp_conn = match rtp_conn {
356            Some(c) => c,
357            None => return Err(anyhow::anyhow!("failed to bind RTP socket")),
358        };
359        let mut rtcp_conn = match rtcp_conn {
360            Some(c) => c,
361            None => return Err(anyhow::anyhow!("failed to bind RTCP socket")),
362        };
363
364        if let Some(addr) = self.external_addr {
365            rtp_conn.external = Some(
366                SocketAddr::new(
367                    addr,
368                    *rtp_conn
369                        .get_addr()
370                        .addr
371                        .port
372                        .clone()
373                        .unwrap_or_default()
374                        .value(),
375                )
376                .into(),
377            );
378            rtcp_conn.external = Some(
379                SocketAddr::new(
380                    addr,
381                    *rtcp_conn
382                        .get_addr()
383                        .addr
384                        .port
385                        .clone()
386                        .unwrap_or_default()
387                        .value(),
388                )
389                .into(),
390            );
391        }
392        Ok((rtp_conn, rtcp_conn))
393    }
394
395    pub async fn build(mut self) -> Result<RtpTrack> {
396        let mut rtp_socket = self.rtp_socket.take();
397        let mut rtcp_socket = self.rtcp_socket.take();
398
399        if rtp_socket.is_none() || rtcp_socket.is_none() {
400            let (rtp_conn, rtcp_conn) = self.build_rtp_rtcp_conn().await?;
401            rtp_socket = Some(rtp_conn);
402            rtcp_socket = Some(rtcp_conn);
403        }
404        let cancel_token = self
405            .cancel_token
406            .unwrap_or_else(|| CancellationToken::new());
407        let processor_chain = ProcessorChain::new(self.config.samplerate);
408        let ssrc = if self.ssrc != 0 {
409            self.ssrc
410        } else {
411            loop {
412                let i = rand::random::<u32>();
413                if i % 2 == 0 {
414                    break i;
415                }
416            }
417        };
418        let inner = RtpTrackInner {
419            dtmf_payload_type: 101, // Default DTMF payload type
420            payload_type: 0,        // Will be set later based on remote description
421            remote_description: None,
422            packetizer: Mutex::new(None),
423            stats: Arc::new(RtpTrackStats::new()),
424            rtcp_mux: self.rtcp_mux,
425            remote_addr: None,
426            remote_rtcp_addr: None,
427            enabled_codecs: self.enabled_codecs.clone(),
428            rtp_map: vec![],
429        };
430        let track = RtpTrack {
431            ssrc,
432            ssrc_cname: self.ssrc_cname.clone(),
433            track_id: self.track_id,
434            config: self.config,
435            cancel_token,
436            processor_chain,
437            rtp_socket: rtp_socket.unwrap(),
438            rtcp_socket: rtcp_socket.unwrap(),
439            encoder: TrackCodec::new(),
440            sequencer: Box::new(new_random_sequencer()),
441            sendrecv: AtomicBool::new(true),
442            ice_connectivity_check: self.ice_connectivity_check,
443            inner: Arc::new(Mutex::new(inner)),
444        };
445        Ok(track)
446    }
447}
448
449impl RtpTrack {
450    pub fn id(&self) -> &str {
451        &self.track_id
452    }
453
454    pub fn ssrc(&self) -> u32 {
455        self.ssrc
456    }
457
458    pub fn remote_description(&self) -> Option<String> {
459        self.inner.lock().unwrap().remote_description.clone()
460    }
461
462    pub fn set_rtp_map(&self, rtp_map: Vec<(u8, (CodecType, u32, u16))>) {
463        if let Ok(mut inner) = self.inner.lock() {
464            inner.rtp_map = rtp_map;
465        }
466    }
467
468    pub fn set_remote_description(&self, answer: &str) -> Result<()> {
469        let mut inner = self.inner.lock().unwrap();
470        let mut reader = Cursor::new(answer);
471        let sdp = SessionDescription::unmarshal(&mut reader)?;
472        let peer_media = match select_peer_media(&sdp, "audio") {
473            Some(peer_media) => peer_media,
474            None => return Err(anyhow::anyhow!("no audio media in answer SDP")),
475        };
476
477        inner.rtp_map = peer_media.rtp_map.clone();
478
479        if peer_media.codecs.is_empty() {
480            return Err(anyhow::anyhow!("no audio codecs in answer SDP"));
481        }
482
483        if peer_media.rtp_addr.is_empty() {
484            return Err(anyhow::anyhow!("no rtp addr in answer SDP"));
485        }
486
487        inner.remote_description.replace(answer.to_string());
488
489        let remote_addr = SipAddr {
490            addr: HostWithPort {
491                host: peer_media.rtp_addr.parse()?,
492                port: Some(peer_media.rtp_port.into()),
493            },
494            r#type: Some(rsip::transport::Transport::Udp),
495        };
496        let remote_rtcp_addr = SipAddr {
497            addr: HostWithPort {
498                host: peer_media.rtcp_addr.parse()?,
499                port: Some(peer_media.rtcp_port.into()),
500            },
501            r#type: Some(rsip::transport::Transport::Udp),
502        };
503        let codec_type = peer_media.codecs[0];
504        info!(
505            track_id = self.track_id,
506            rtcp_mux = peer_media.rtcp_mux,
507            %remote_addr,
508            %remote_rtcp_addr,
509            ?codec_type,
510            ssrc = self.ssrc,
511            "set remote description"
512        );
513
514        inner.payload_type = codec_type.payload_type();
515        inner.enabled_codecs = vec![codec_type];
516        for (payload_type, (codec, clock_rate, _)) in peer_media.rtp_map.iter() {
517            if *codec == codec_type {
518                inner.payload_type = *payload_type;
519            }
520
521            if codec == &CodecType::TelephoneEvent && clock_rate == &codec_type.clock_rate() {
522                inner.dtmf_payload_type = *payload_type;
523            }
524        }
525
526        inner.remote_addr.replace(remote_addr);
527        inner.remote_rtcp_addr.replace(remote_rtcp_addr);
528        inner.rtcp_mux = peer_media.rtcp_mux;
529
530        let payloader = match codec_type {
531            #[cfg(feature = "opus")]
532            CodecType::Opus => Box::<webrtc::rtp::codecs::opus::OpusPayloader>::default()
533                as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
534            _ => Box::<G7xxPayloader>::default()
535                as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
536        };
537
538        inner
539            .packetizer
540            .lock()
541            .unwrap()
542            .replace(Box::new(new_packetizer(
543                RTP_OUTBOUND_MTU,
544                inner.payload_type,
545                self.ssrc,
546                payloader,
547                self.sequencer.clone(),
548                codec_type.clock_rate(),
549            )));
550        Ok(())
551    }
552
553    pub fn local_description(&self) -> Result<String> {
554        let socketaddr: SocketAddr = self.rtp_socket.get_addr().addr.to_owned().try_into()?;
555        let mut sdp = SessionDescription::default();
556
557        // Set session-level attributes
558        sdp.version = 0;
559        sdp.origin = Origin {
560            username: "-".to_string(),
561            session_id: 0,
562            session_version: 0,
563            network_type: "IN".to_string(),
564            address_type: "IP4".to_string(),
565            unicast_address: socketaddr.ip().to_string(),
566        };
567        sdp.session_name = "-".to_string();
568        sdp.connection_information = Some(ConnectionInformation {
569            address_type: "IP4".to_string(),
570            network_type: "IN".to_string(),
571            address: Some(Address {
572                address: socketaddr.ip().to_string(),
573                ttl: None,
574                range: None,
575            }),
576        });
577        sdp.time_descriptions.push(TimeDescription {
578            timing: Timing {
579                start_time: 0,
580                stop_time: 0,
581            },
582            repeat_times: vec![],
583        });
584
585        // Add media section
586        let mut media = MediaDescription::default();
587        media.media_name = MediaName {
588            media: "audio".to_string(),
589            port: RangedPort {
590                value: socketaddr.port() as isize,
591                range: None,
592            },
593            protos: vec!["RTP".to_string(), "AVP".to_string()],
594            formats: vec![],
595        };
596        let inner = self.inner.lock().unwrap();
597        for codec in inner.enabled_codecs.iter() {
598            if codec == &CodecType::TelephoneEvent {
599                continue;
600            }
601            // Try to find payload type from rtp_map (from caller's offer), otherwise use default
602            let mut payload_type = codec.payload_type();
603            for (payload_typ, (rtp_map_codec, _, _)) in inner.rtp_map.iter() {
604                if *rtp_map_codec == *codec {
605                    payload_type = *payload_typ;
606                    break;
607                }
608            }
609
610            media.media_name.formats.push(payload_type.to_string());
611            media.attributes.push(Attribute {
612                key: "rtpmap".to_string(),
613                value: Some(format!("{} {}", payload_type, codec.rtpmap())),
614            });
615            if let Some(fmtp) = codec.fmtp() {
616                media.attributes.push(Attribute {
617                    key: "fmtp".to_string(),
618                    value: Some(format!("{} {}", payload_type, fmtp)),
619                });
620            }
621        }
622
623        // Add telephone-event
624        // Creating an offer: add telephone-event if enabled_codecs have 8000 or 48000 clock rate
625        let has_8khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 8000);
626        let has_48khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 48000);
627
628        if has_8khz_codec {
629            // Add telephone-event at 8000 Hz (default payload type 101)
630            let mut payload_type = 101;
631            for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
632                if *codec == CodecType::TelephoneEvent && *clock_rate == 8000 {
633                    payload_type = *typ;
634                    break;
635                }
636            }
637            media.media_name.formats.push(payload_type.to_string());
638            media.attributes.push(Attribute {
639                key: "rtpmap".to_string(),
640                value: Some(format!("{} telephone-event/8000", payload_type)),
641            });
642            media.attributes.push(Attribute {
643                key: "fmtp".to_string(),
644                value: Some(format!("{} 0-16", payload_type)),
645            });
646        }
647
648        if has_48khz_codec {
649            let mut payload_type = 97;
650            for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
651                if *codec == CodecType::TelephoneEvent && *clock_rate == 48000 {
652                    payload_type = *typ;
653                    break;
654                }
655            }
656
657            media.media_name.formats.push(payload_type.to_string());
658            media.attributes.push(Attribute {
659                key: "rtpmap".to_string(),
660                value: Some(format!("{} telephone-event/48000", payload_type)),
661            });
662            media.attributes.push(Attribute {
663                key: "fmtp".to_string(),
664                value: Some(format!("{} 0-16", payload_type)),
665            });
666        }
667
668        // Add media-level attributes
669        if inner.rtcp_mux {
670            media.attributes.push(Attribute {
671                key: ATTR_KEY_RTCPMUX.to_string(),
672                value: None,
673            });
674        }
675        media.attributes.push(Attribute {
676            key: ATTR_KEY_SSRC.to_string(),
677            value: Some(if self.ssrc_cname.is_empty() {
678                self.ssrc.to_string()
679            } else {
680                format!("{} cname:{}", self.ssrc, self.ssrc_cname)
681            }),
682        });
683        if self.sendrecv.load(Ordering::Relaxed) {
684            media.attributes.push(Attribute {
685                key: ATTR_KEY_SEND_RECV.to_string(),
686                value: None,
687            });
688        } else {
689            media.attributes.push(Attribute {
690                key: ATTR_KEY_SEND_ONLY.to_string(),
691                value: None,
692            });
693        }
694        media.attributes.push(Attribute {
695            key: "ptime".to_string(),
696            value: Some(format!("{}", self.config.ptime.as_millis())),
697        });
698        sdp.media_descriptions.push(media);
699        Ok(sdp.marshal())
700    }
701
702    // Send DTMF tone using RFC 4733
703    pub async fn send_dtmf(&self, digit: &str, duration_ms: Option<u64>) -> Result<()> {
704        // Map DTMF digit to event code first (validate before checking remote address)
705        let event_code = match digit {
706            "0" => 0,
707            "1" => 1,
708            "2" => 2,
709            "3" => 3,
710            "4" => 4,
711            "5" => 5,
712            "6" => 6,
713            "7" => 7,
714            "8" => 8,
715            "9" => 9,
716            "*" => 10,
717            "#" => 11,
718            "A" => 12,
719            "B" => 13,
720            "C" => 14,
721            "D" => 15,
722            _ => return Err(anyhow::anyhow!("Invalid DTMF digit")),
723        };
724        let inner = self.inner.lock().unwrap();
725        let socket = &self.rtp_socket;
726        let remote_addr = match inner.remote_addr.as_ref() {
727            Some(addr) => addr.clone(),
728            None => return Err(anyhow::anyhow!("Remote address not set")),
729        };
730
731        // Use default duration if not specified
732        let duration = duration_ms.unwrap_or(DTMF_EVENT_DURATION_MS);
733
734        // Calculate number of packets to send
735        // We send one packet every 20ms (default packet time)
736        let num_packets = (duration as f64 / self.config.ptime.as_millis() as f64).ceil() as u32;
737
738        // Calculate samples per packet for timestamp increments
739        let samples_per_packet =
740            (self.config.samplerate as f64 * self.config.ptime.as_secs_f64()) as u32;
741
742        let now = crate::media::get_timestamp();
743        inner
744            .stats
745            .last_timestamp_update
746            .store(now, Ordering::Relaxed);
747
748        // Generate RFC 4733 DTMF events
749        for i in 0..num_packets {
750            let is_end = i == num_packets - 1;
751            let event_duration = i * (self.config.ptime.as_millis() as u32 * 8); // Duration in timestamp units
752
753            // Create DTMF event payload
754            // Format: |event(8)|E|R|Volume(6)|Duration(16)|
755            let mut payload = vec![0u8; 4];
756            payload[0] = event_code;
757            payload[1] = DTMF_EVENT_VOLUME & 0x3F; // Volume (0-63)
758            if is_end {
759                payload[1] |= 0x80; // Set end bit (E)
760            }
761
762            // Duration (16 bits, network byte order)
763            payload[2] = ((event_duration >> 8) & 0xFF) as u8;
764            payload[3] = (event_duration & 0xFF) as u8;
765
766            let packets = match inner.packetizer.lock().unwrap().as_mut() {
767                Some(p) => p.packetize(&Bytes::from_owner(payload), samples_per_packet)?,
768                None => return Err(anyhow::anyhow!("Packetizer not set")),
769            };
770            for mut packet in packets {
771                packet.header.payload_type = inner.dtmf_payload_type;
772                packet.header.marker = false;
773
774                match packet.marshal() {
775                    Ok(ref rtp_data) => {
776                        match socket.send_raw(rtp_data, &remote_addr).await {
777                            Ok(_) => {}
778                            Err(e) => {
779                                warn!("Failed to send DTMF RTP packet: {}", e);
780                            }
781                        }
782
783                        // Update counters for RTCP
784                        inner.stats.packet_count.fetch_add(1, Ordering::Relaxed);
785                        inner
786                            .stats
787                            .octet_count
788                            .fetch_add(rtp_data.len() as u32, Ordering::Relaxed);
789
790                        // Sleep for packet time if not the last packet
791                        if !is_end {
792                            tokio::time::sleep(self.config.ptime).await;
793                        }
794                    }
795                    Err(e) => {
796                        warn!("Failed to create DTMF RTP packet: {:?}", e);
797                        continue;
798                    }
799                }
800            }
801        }
802
803        // After sending DTMF, update the timestamp to account for the DTMF duration
804        inner
805            .stats
806            .timestamp
807            .fetch_add(samples_per_packet * num_packets, Ordering::Relaxed);
808
809        Ok(())
810    }
811
812    // Send STUN Binding Request for ICE connectivity check
813    async fn send_ice_connectivity_check(
814        socket: &UdpConnection,
815        remote_addr: &SipAddr,
816    ) -> Result<()> {
817        let mut stun_packet = vec![0u8; 20]; // STUN header is 20 bytes
818        stun_packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
819        stun_packet[2..4].copy_from_slice(&0u16.to_be_bytes());
820        stun_packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
821        let transaction_id: [u8; STUN_TRANSACTION_ID_SIZE] = rand::random();
822        stun_packet[8..20].copy_from_slice(&transaction_id);
823
824        socket.send_raw(&stun_packet, remote_addr).await.ok();
825        Ok(())
826    }
827
828    async fn handle_rtcp_packet(
829        track_id: &TrackId,
830        buf: &[u8],
831        n: usize,
832        stats: &Arc<RtpTrackStats>,
833        ssrc: u32,
834    ) -> Result<()> {
835        use webrtc::rtcp::packet::unmarshal;
836
837        let mut buf_slice = &buf[0..n];
838        let packets = match unmarshal(&mut buf_slice) {
839            Ok(packets) => packets,
840            Err(e) => {
841                warn!(track_id, "Failed to parse RTCP packet: {:?}", e);
842                return Ok(());
843            }
844        };
845
846        for packet in packets {
847            if let Some(sr) = packet.as_any().downcast_ref::<SenderReport>() {
848                stats.store_sr_info(sr.rtp_time as u64, sr.ntp_time);
849                info!(
850                    track_id,
851                    ssrc = sr.ssrc,
852                    packet_count = sr.packet_count,
853                    octet_count = sr.octet_count,
854                    rtp_time = sr.rtp_time,
855                    "Received SR"
856                );
857            } else if let Some(rr) = packet.as_any().downcast_ref::<ReceiverReport>() {
858                for report in &rr.reports {
859                    if report.ssrc == ssrc {
860                        let packet_loss = report.fraction_lost;
861                        let total_lost = report.total_lost;
862                        let jitter = report.jitter;
863
864                        info!(
865                            track_id,
866                            ssrc = report.ssrc,
867                            fraction_lost = packet_loss,
868                            total_lost = total_lost,
869                            jitter = jitter,
870                            last_sequence_number = report.last_sequence_number,
871                            "Received RR for our stream"
872                        );
873
874                        if packet_loss > 50 {
875                            warn!(track_id, "High packet loss detected: {}%", packet_loss);
876                        }
877                    }
878                }
879            } else if let Some(_) = packet.as_any().downcast_ref::<SourceDescription>() {
880            } else {
881                debug!(
882                    track_id,
883                    packet_type = %packet.header().packet_type,
884                    "Received other RTCP packet type"
885                );
886            }
887        }
888
889        Ok(())
890    }
891
892    async fn classify_packet(
893        track_id: &TrackId,
894        buf: &[u8],
895        n: usize,
896        stats: &Arc<RtpTrackStats>,
897        ssrc: u32,
898    ) -> PacketKind {
899        // Detect STUN packets first
900        if n >= 20 {
901            let msg_type = u16::from_be_bytes([buf[0], buf[1]]);
902            let msg_length = u16::from_be_bytes([buf[2], buf[3]]);
903            let magic_cookie = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
904
905            if magic_cookie == STUN_MAGIC_COOKIE
906                || ((msg_type & 0xC000) == 0x0000 && (msg_length as usize + 20) <= n)
907            {
908                debug!(
909                    track_id = track_id.as_str(),
910                    "Received STUN packet with message type: 0x{:04X}, length: {}", msg_type, n
911                );
912                return PacketKind::Stun(msg_type);
913            }
914        }
915
916        // Detect RTCP packets
917        let version = (buf[0] >> 6) & 0x03;
918        let rtcp_pt = buf[1];
919        if version == 2 && rtcp_pt >= 200 && rtcp_pt <= 207 {
920            if let Err(e) = Self::handle_rtcp_packet(track_id, buf, n, stats, ssrc).await {
921                warn!(
922                    track_id = track_id.as_str(),
923                    "Failed to handle RTCP packet: {:?}", e
924                );
925            }
926            return PacketKind::Rtcp;
927        }
928
929        // Validate RTP packets
930        let rtp_pt = buf[1] & 0x7F;
931        if version != 2 {
932            info!(
933                track_id = track_id.as_str(),
934                "Received packet with invalid RTP version: {}, skipping", version
935            );
936            return PacketKind::Ignore;
937        }
938
939        if rtp_pt >= 128 {
940            debug!(
941                track_id = track_id.as_str(),
942                "Received packet with invalid RTP payload type: {}, might be unrecognized protocol",
943                rtp_pt
944            );
945            return PacketKind::Ignore;
946        }
947
948        PacketKind::Rtp
949    }
950
951    async fn recv_rtp_packets(
952        inner: Arc<Mutex<RtpTrackInner>>,
953        ptime: Duration,
954        rtp_socket: UdpConnection,
955        track_id: TrackId,
956        processor_chain: ProcessorChain,
957        packet_sender: TrackPacketSender,
958        _rtcp_socket: UdpConnection,
959        ssrc: u32,
960    ) -> Result<()> {
961        let mut buf = vec![0u8; RTP_MTU];
962        let mut send_ticker = tokio::time::interval(ptime);
963        let mut jitter = JitterBuffer::new();
964        let stats = inner.lock().unwrap().stats.clone();
965
966        loop {
967            select! {
968                Ok((n, src_addr)) = rtp_socket.recv_raw(&mut buf) => {
969                    if n == 0 {
970                        continue;
971                    }
972
973
974                    let packet_kind = Self::classify_packet(&track_id, &buf, n, &stats, ssrc).await;
975                    match packet_kind {
976                        PacketKind::Stun(msg_type) => {
977                            let force = msg_type == STUN_BINDING_RESPONSE;
978                            Self::maybe_update_remote_addr(&inner, &src_addr, force, &track_id, "stun");
979                            continue;
980                        }
981                        PacketKind::Rtcp => {
982                            Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtcp");
983                            continue;
984                        }
985                        PacketKind::Ignore => {
986                            continue;
987                        }
988                        PacketKind::Rtp => {
989                            Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtp-private");
990                        }
991                    }
992                    let packet = match Packet::unmarshal(&mut &buf[0..n]) {
993                        Ok(packet) => packet,
994                        Err(e) => {
995                            info!(track_id, "Error creating RTP reader: {:?}", e);
996                            continue;
997                        }
998                    };
999
1000                    let seq_num = packet.header.sequence_number as u32;
1001                    let payload_len = packet.payload.len() as u32;
1002                    stats.update_receive_stats(seq_num, payload_len);
1003
1004                    let payload_type = packet.header.payload_type;
1005                    let payload = packet.payload.to_vec();
1006                    let sample_rate = match payload_type {
1007                        9 => 16000,   // G.722
1008                        111 => 48000, // Opus
1009                        _ => 8000,
1010                    };
1011
1012                    let frame = AudioFrame {
1013                        track_id: track_id.clone(),
1014                        samples: Samples::RTP {
1015                            payload_type,
1016                            payload,
1017                            sequence_number: packet.header.sequence_number.into(),
1018                        },
1019                        timestamp: crate::media::get_timestamp(),
1020                        sample_rate,
1021                    };
1022
1023                    jitter.push(frame);
1024                }
1025                _ = send_ticker.tick() => {
1026                    let mut frame = match jitter.pop() {
1027                        Some(f) => f,
1028                        None => continue,
1029                    };
1030
1031                    if let Err(e) = processor_chain.process_frame(&mut frame) {
1032                        trace!(track_id, "Failed to process frame: {}", e);
1033                        continue;
1034                    }
1035                    match packet_sender.send(frame) {
1036                        Ok(_) => {}
1037                        Err(e) => {
1038                            warn!(track_id, "Error sending audio frame: {}", e);
1039                            break;
1040                        }
1041                    }
1042                }
1043            }
1044        }
1045        Ok(())
1046    }
1047
1048    fn maybe_update_remote_addr(
1049        inner: &Arc<Mutex<RtpTrackInner>>,
1050        src_addr: &SipAddr,
1051        force: bool,
1052        track_id: &TrackId,
1053        reason: &'static str,
1054    ) -> bool {
1055        let mut guard = inner.lock().unwrap();
1056        let src_ip = Self::sip_addr_ip(src_addr);
1057
1058        let should_update = if force {
1059            true
1060        } else {
1061            match (guard.remote_addr.as_ref(), src_ip) {
1062                (Some(remote), Some(src_ip)) => match Self::sip_addr_ip(remote) {
1063                    Some(remote_ip) => remote_ip != src_ip && Self::is_private_ip(&remote_ip),
1064                    None => false,
1065                },
1066                (None, _) => true,
1067                _ => false,
1068            }
1069        };
1070
1071        if should_update {
1072            let old = guard.remote_addr.replace(src_addr.clone());
1073            if guard.rtcp_mux {
1074                guard.remote_rtcp_addr = Some(src_addr.clone());
1075            } else if let Some(rtcp_addr) = guard.remote_rtcp_addr.as_mut() {
1076                rtcp_addr.addr.host = src_addr.addr.host.clone();
1077            }
1078            info!(
1079                track_id = track_id.as_str(),
1080                ?old,
1081                ?src_addr,
1082                reason = reason,
1083                "Updating remote RTP address"
1084            );
1085            return true;
1086        }
1087        false
1088    }
1089
1090    fn sip_addr_ip(addr: &SipAddr) -> Option<IpAddr> {
1091        addr.addr.host.to_string().parse().ok()
1092    }
1093
1094    fn is_private_ip(ip: &IpAddr) -> bool {
1095        match ip {
1096            IpAddr::V4(v4) => {
1097                v4.is_private()
1098                    || v4.is_loopback()
1099                    || v4.is_link_local()
1100                    || v4.is_broadcast()
1101                    || v4.is_documentation()
1102                    || v4.is_unspecified()
1103            }
1104            IpAddr::V6(v6) => {
1105                v6.is_unique_local()
1106                    || v6.is_loopback()
1107                    || v6.is_unspecified()
1108                    || v6.is_unicast_link_local()
1109            }
1110        }
1111    }
1112
1113    // Send RTCP sender reports periodically
1114    async fn send_rtcp_reports(
1115        inner: Arc<Mutex<RtpTrackInner>>,
1116        track_id: TrackId,
1117        token: CancellationToken,
1118        rtcp_socket: &UdpConnection,
1119        ssrc: u32,
1120        ssrc_cname: String,
1121    ) -> Result<()> {
1122        let mut interval = interval_at(
1123            Instant::now() + Duration::from_millis(RTCP_SR_INTERVAL_MS),
1124            Duration::from_millis(RTCP_SR_INTERVAL_MS),
1125        );
1126        let stats = inner.lock().unwrap().stats.clone();
1127        let mut last_sent_octets = stats.octet_count.load(Ordering::Relaxed);
1128        let mut last_recv_octets = stats.received_octets.load(Ordering::Relaxed);
1129        let mut last_rate_instant = Instant::now();
1130        loop {
1131            select! {
1132                _ = token.cancelled() => {
1133                    info!(track_id, "RTCP reports task cancelled");
1134                    break;
1135                }
1136                _ = interval.tick() => {
1137                    // Generate RTCP Sender Report
1138                    let packet_count = stats.packet_count.load(Ordering::Relaxed);
1139                    let octet_count = stats.octet_count.load(Ordering::Relaxed);
1140                    let rtp_timestamp = stats.timestamp.load(Ordering::Relaxed);
1141
1142                    let sent_octets = octet_count;
1143                    let recv_octets = stats.received_octets.load(Ordering::Relaxed);
1144                    let now = Instant::now();
1145                    let elapsed = now.saturating_duration_since(last_rate_instant).as_secs_f64();
1146                    if elapsed > 0.0 {
1147                        let delta_sent = if sent_octets >= last_sent_octets {
1148                            (sent_octets - last_sent_octets) as u64
1149                        } else {
1150                            (u32::MAX as u64 - last_sent_octets as u64) + sent_octets as u64 + 1
1151                        };
1152                        let delta_recv = if recv_octets >= last_recv_octets {
1153                            (recv_octets - last_recv_octets) as u64
1154                        } else {
1155                            (u32::MAX as u64 - last_recv_octets as u64) + recv_octets as u64 + 1
1156                        };
1157
1158                        let send_bps = (delta_sent as f64 * 8.0) / elapsed;
1159                        let recv_bps = (delta_recv as f64 * 8.0) / elapsed;
1160                        let received_packets = stats.received_packets.load(Ordering::Relaxed);
1161                        let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1162                        let expected_packets = stats.expected_packets.load(Ordering::Relaxed);
1163                        let fraction_lost = stats.get_fraction_lost();
1164                        let loss_pct = (fraction_lost as f64) * 100.0 / 256.0;
1165                        let jitter = stats.jitter.load(Ordering::Relaxed);
1166
1167                        info!(
1168                            track_id = track_id.as_str(),
1169                            send_kbps = send_bps / 1000.0,
1170                            recv_kbps = recv_bps / 1000.0,
1171                            sent_packets = packet_count,
1172                            recv_packets = received_packets,
1173                            expected_packets,
1174                            lost_packets,
1175                            loss_pct,
1176                            jitter,
1177                            "RTP throughput"
1178                        );
1179
1180                        last_rate_instant = now;
1181                        last_sent_octets = sent_octets;
1182                        last_recv_octets = recv_octets;
1183                    }
1184
1185                    let mut pkts = vec![Box::new(SenderReport {
1186                        ssrc,
1187                        ntp_time: Instant::now().elapsed().as_secs() as u64,
1188                        rtp_time: rtp_timestamp,
1189                        packet_count,
1190                        octet_count,
1191                        profile_extensions: Bytes::new(),
1192                        reports: vec![],
1193                    })
1194                        as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1195
1196                    if !ssrc_cname.is_empty() {
1197                        pkts.push(Box::new(SourceDescription {
1198                            chunks: vec![SourceDescriptionChunk {
1199                                source: ssrc,
1200                                items: vec![SourceDescriptionItem {
1201                                    sdes_type: SdesType::SdesCname,
1202                                    text: ssrc_cname.clone().into(),
1203                                }],
1204                            }],
1205                        })
1206                            as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1207                    }
1208
1209                    let received_packets = stats.received_packets.load(Ordering::Relaxed);
1210                    let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1211                    let highest_seq = stats.highest_seq_num.load(Ordering::Relaxed);
1212                    let jitter = stats.jitter.load(Ordering::Relaxed);
1213                    let fraction_lost = stats.get_fraction_lost();
1214
1215                    if received_packets > 0 || lost_packets > 0 {
1216                        let remote_ssrc = ssrc + 1;
1217                        let report = ReceptionReport {
1218                            ssrc: remote_ssrc,
1219                            fraction_lost,
1220                            total_lost: lost_packets,
1221                            last_sequence_number: highest_seq,
1222                            jitter,
1223                            last_sender_report: (stats.last_sr_timestamp.load(Ordering::Relaxed) >> 16) as u32,
1224                            delay: 0,
1225                        };
1226
1227                        let rr = ReceiverReport {
1228                            ssrc,
1229                            reports: vec![report],
1230                            profile_extensions: Bytes::new(),
1231                        };
1232                        pkts.push(Box::new(rr) as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1233                    }
1234
1235                    let rtcp_data = webrtc::rtcp::packet::marshal(&pkts)?;
1236                    let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1237                    match remote_rtcp_addr{
1238                        Some(ref addr) => {
1239                            if let Err(e) = rtcp_socket.send_raw(&rtcp_data, addr).await {
1240                                warn!(track_id, "Failed to send RTCP report: {}", e);
1241                            }
1242                        }
1243                        None => {}
1244                    }
1245                }
1246            }
1247        }
1248        Ok(())
1249    }
1250
1251    async fn try_ice_connectivity_check(&self) {
1252        let remote_addr = self.inner.lock().unwrap().remote_addr.clone();
1253        let remote_rtcp_addr = self.inner.lock().unwrap().remote_rtcp_addr.clone();
1254
1255        if let Some(ref addr) = remote_addr {
1256            Self::send_ice_connectivity_check(&self.rtp_socket, addr)
1257                .await
1258                .ok();
1259            if let Some(ref rtcp_addr) = remote_rtcp_addr {
1260                if rtcp_addr != addr {
1261                    Self::send_ice_connectivity_check(&self.rtcp_socket, rtcp_addr)
1262                        .await
1263                        .ok();
1264                }
1265            }
1266        }
1267    }
1268}
1269
1270#[async_trait]
1271impl Track for RtpTrack {
1272    fn ssrc(&self) -> u32 {
1273        self.ssrc
1274    }
1275    fn id(&self) -> &TrackId {
1276        &self.track_id
1277    }
1278    fn config(&self) -> &TrackConfig {
1279        &self.config
1280    }
1281    fn processor_chain(&mut self) -> &mut ProcessorChain {
1282        &mut self.processor_chain
1283    }
1284
1285    async fn handshake(&mut self, offer: String, _timeout: Option<Duration>) -> Result<String> {
1286        self.set_remote_description(&offer)?;
1287        self.local_description()
1288    }
1289
1290    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
1291        self.set_remote_description(&answer).ok();
1292
1293        if self.ice_connectivity_check {
1294            self.try_ice_connectivity_check().await;
1295        }
1296        Ok(())
1297    }
1298
1299    async fn start(
1300        &self,
1301        event_sender: EventSender,
1302        packet_sender: TrackPacketSender,
1303    ) -> Result<()> {
1304        let track_id = self.track_id.clone();
1305        let rtcp_socket = self.rtcp_socket.clone();
1306        let ssrc = self.ssrc;
1307        let rtp_socket = self.rtp_socket.clone();
1308        let processor_chain = self.processor_chain.clone();
1309        let token = self.cancel_token.clone();
1310        let ssrc_cname = self.ssrc_cname.clone();
1311        let start_time = crate::media::get_timestamp();
1312        let ptime = self.config.ptime;
1313
1314        // Send ICE connectivity check if enabled and remote address is available
1315        if self.ice_connectivity_check {
1316            self.try_ice_connectivity_check().await;
1317        }
1318
1319        let inner = self.inner.clone();
1320
1321        tokio::spawn(async move {
1322            select! {
1323                _ = token.cancelled() => {
1324                    debug!(track_id, "RTP processor task cancelled");
1325                },
1326                _ = Self::send_rtcp_reports(inner.clone(),track_id.clone(), token.clone(), &rtcp_socket, ssrc, ssrc_cname) => {
1327                }
1328                _ = Self::recv_rtp_packets(
1329                    inner.clone(),
1330                    ptime,
1331                    rtp_socket,
1332                    track_id.clone(),
1333                    processor_chain,
1334                    packet_sender,
1335                    rtcp_socket.clone(),
1336                    ssrc,
1337                ) => {
1338                }
1339            };
1340            let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1341            // send rtcp bye packet
1342            match remote_rtcp_addr {
1343                Some(ref addr) => {
1344                    let pkts = vec![Box::new(Goodbye {
1345                        sources: vec![ssrc],
1346                        reason: "end of call".into(),
1347                    })
1348                        as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1349                    if let Ok(data) = webrtc::rtcp::packet::marshal(&pkts) {
1350                        if let Err(e) = rtcp_socket.send_raw(&data, addr).await {
1351                            warn!(track_id, "Failed to send RTCP goodbye packet: {}", e);
1352                        }
1353                    }
1354                }
1355                None => {}
1356            }
1357            info!(track_id, "RTP processor completed");
1358            event_sender
1359                .send(SessionEvent::TrackEnd {
1360                    track_id,
1361                    timestamp: crate::media::get_timestamp(),
1362                    duration: crate::media::get_timestamp() - start_time,
1363                    ssrc,
1364                    play_id: None,
1365                })
1366                .ok();
1367        });
1368
1369        Ok(())
1370    }
1371
1372    async fn stop(&self) -> Result<()> {
1373        self.cancel_token.cancel();
1374        Ok(())
1375    }
1376
1377    async fn send_packet(&self, packet: &AudioFrame) -> Result<()> {
1378        let remote_addr = match self.inner.lock().unwrap().remote_addr.clone() {
1379            Some(addr) => addr,
1380            None => return Ok(()),
1381        };
1382        let stats = self.inner.lock().unwrap().stats.clone();
1383
1384        let (payload_type, payload) = self
1385            .encoder
1386            .encode(self.inner.lock().unwrap().payload_type, packet.clone());
1387        if payload.is_empty() {
1388            return Ok(());
1389        }
1390
1391        let clock_rate = match payload_type {
1392            9 => 8000,    // G.722 (RTP clock rate is 8000 even though sample rate is 16000)
1393            111 => 48000, // Opus
1394            _ => 8000,
1395        };
1396
1397        let now = crate::media::get_timestamp();
1398        let last_update = stats.last_timestamp_update.load(Ordering::Relaxed);
1399        let mut skipped_packets: u32 = 0;
1400
1401        if last_update > 0 {
1402            let frame_duration_ms = self.config.ptime.as_millis() as u64;
1403            if frame_duration_ms > 0 {
1404                let delta_ms = now.saturating_sub(last_update);
1405                let delta_frames = delta_ms / frame_duration_ms;
1406                let prospective_skip = delta_frames.saturating_sub(1);
1407
1408                if prospective_skip >= RTP_RESYNC_MIN_SKIP_PACKETS as u64 {
1409                    let last_resync = stats.last_resync_ts.load(Ordering::Relaxed);
1410                    let cooldown_ms = frame_duration_ms.saturating_mul(RTP_RESYNC_COOLDOWN_FRAMES);
1411                    if last_resync == 0 || now.saturating_sub(last_resync) >= cooldown_ms {
1412                        skipped_packets = prospective_skip.min(u32::MAX as u64) as u32;
1413                        debug!(
1414                            track_id = self.track_id,
1415                            delta_ms, skipped_packets, "Resyncing RTP timestamp"
1416                        );
1417                        for _ in 0..skipped_packets {
1418                            self.sequencer.next_sequence_number();
1419                        }
1420                        stats.last_resync_ts.store(now, Ordering::Relaxed);
1421                    }
1422                }
1423            }
1424        }
1425
1426        stats.last_timestamp_update.store(now, Ordering::Relaxed);
1427
1428        let samples_per_packet = (clock_rate as f64 * self.config.ptime.as_secs_f64()) as u32;
1429        let packets = match self
1430            .inner
1431            .lock()
1432            .unwrap()
1433            .packetizer
1434            .lock()
1435            .unwrap()
1436            .as_mut()
1437        {
1438            Some(p) => {
1439                if skipped_packets > 0 {
1440                    let skip_samples = (skipped_packets as u64)
1441                        .saturating_mul(samples_per_packet as u64)
1442                        .min(u32::MAX as u64) as u32;
1443                    p.skip_samples(skip_samples);
1444                }
1445                p.packetize(&Bytes::from_owner(payload), samples_per_packet)?
1446            }
1447            None => return Err(anyhow::anyhow!("Packetizer not set")),
1448        };
1449        for mut packet in packets {
1450            packet.header.marker = false;
1451            packet.header.payload_type = payload_type;
1452            match packet.marshal() {
1453                Ok(ref rtp_data) => match self.rtp_socket.send_raw(rtp_data, &remote_addr).await {
1454                    Ok(_) => {
1455                        stats.update_send_stats(rtp_data.len() as u32, samples_per_packet);
1456                    }
1457                    Err(e) => {
1458                        warn!(track_id = self.track_id, "Failed to send RTP packet: {}", e);
1459                    }
1460                },
1461                Err(e) => {
1462                    warn!(
1463                        track_id = self.track_id,
1464                        "Failed to build RTP packet: {:?}", e
1465                    );
1466                    return Err(anyhow::anyhow!("Failed to build RTP packet"));
1467                }
1468            }
1469        }
1470        Ok(())
1471    }
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476    use super::*;
1477
1478    #[test]
1479    fn test_rtp_track_stats_new() {
1480        let stats = RtpTrackStats::new();
1481        assert_eq!(stats.packet_count.load(Ordering::Relaxed), 0);
1482        assert_eq!(stats.octet_count.load(Ordering::Relaxed), 0);
1483        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 0);
1484        assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1485        assert_eq!(stats.jitter.load(Ordering::Relaxed), 0);
1486    }
1487
1488    #[test]
1489    fn test_update_send_stats() {
1490        let stats = RtpTrackStats::new();
1491        stats.update_send_stats(1200, 160);
1492
1493        assert_eq!(stats.packet_count.load(Ordering::Relaxed), 1);
1494        assert_eq!(stats.octet_count.load(Ordering::Relaxed), 1200);
1495        assert_eq!(stats.timestamp.load(Ordering::Relaxed), 160);
1496
1497        // Test multiple updates
1498        stats.update_send_stats(800, 160);
1499        assert_eq!(stats.packet_count.load(Ordering::Relaxed), 2);
1500        assert_eq!(stats.octet_count.load(Ordering::Relaxed), 2000);
1501        assert_eq!(stats.timestamp.load(Ordering::Relaxed), 320);
1502    }
1503
1504    #[test]
1505    fn test_update_receive_stats() {
1506        let stats = RtpTrackStats::new();
1507
1508        // First packet
1509        stats.update_receive_stats(1000, 160);
1510        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 1);
1511        assert_eq!(stats.received_octets.load(Ordering::Relaxed), 160);
1512        assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1000);
1513        assert_eq!(stats.base_seq.load(Ordering::Relaxed), 1000);
1514        assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1000);
1515        assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 1);
1516        assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1517
1518        // Second packet with gap
1519        stats.update_receive_stats(1002, 160);
1520        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 2);
1521        assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1002);
1522        assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1002);
1523        assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 1);
1524        assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 3);
1525    }
1526
1527    #[test]
1528    fn test_get_fraction_lost() {
1529        let stats = RtpTrackStats::new();
1530
1531        // No packets - should return 0
1532        assert_eq!(stats.get_fraction_lost(), 0);
1533
1534        // Set some loss
1535        stats.expected_packets.store(100, Ordering::Relaxed);
1536        stats.lost_packets.store(5, Ordering::Relaxed);
1537
1538        let fraction_lost = stats.get_fraction_lost();
1539        assert_eq!(fraction_lost, 12); // (5 * 256) / 100 = 12.8 -> 12
1540
1541        // Test maximum loss
1542        stats.lost_packets.store(100, Ordering::Relaxed);
1543        assert_eq!(stats.get_fraction_lost(), 255); // Should cap at 255
1544    }
1545
1546    #[test]
1547    fn test_store_sr_info() {
1548        let stats = RtpTrackStats::new();
1549        stats.store_sr_info(123456, 789012);
1550
1551        assert_eq!(stats.last_sr_timestamp.load(Ordering::Relaxed), 123456);
1552        assert_eq!(stats.last_sr_ntp.load(Ordering::Relaxed), 789012);
1553    }
1554
1555    #[tokio::test]
1556    async fn test_parse_pjsip_sdp() {
1557        let sdp = r#"v=0
1558o=- 3954304612 3954304613 IN IP4 192.168.1.202
1559s=pjmedia
1560b=AS:117
1561t=0 0
1562a=X-nat:3
1563m=audio 4002 RTP/AVP 9 101
1564c=IN IP4 192.168.1.202
1565b=TIAS:96000
1566a=rtcp:4003 IN IP4 192.168.1.202
1567a=sendrecv
1568a=rtpmap:9 G722/8000
1569a=ssrc:1089147397 cname:61753255553b9c6f
1570a=rtpmap:101 telephone-event/8000
1571a=fmtp:101 0-16"#;
1572        let rtp_track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1573            .build()
1574            .await
1575            .expect("Failed to build rtp track");
1576        rtp_track
1577            .set_remote_description(sdp)
1578            .expect("Failed to set remote description");
1579        let inner = rtp_track.inner.lock().unwrap();
1580        assert_eq!(inner.payload_type, 9);
1581        assert!(!inner.rtcp_mux); // RTCP is on separate port
1582    }
1583
1584    #[tokio::test]
1585    async fn test_parse_rtcp_mux() {
1586        let answer = r#"v=0
1587o=- 723884243 723884244 IN IP4 11.22.33.44
1588s=-
1589c=IN IP4 11.22.33.44
1590t=0 0
1591m=audio 10638 RTP/AVP 8 101
1592a=rtpmap:8 PCMA/8000
1593a=rtpmap:101 telephone-event/8000
1594a=fmtp:101 0-15
1595a=sendrecv
1596a=rtcp-mux"#;
1597        let mut reader = Cursor::new(answer);
1598        let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1599        let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1600        assert!(peer_media.rtcp_mux);
1601        assert_eq!(peer_media.rtcp_port, 10638);
1602    }
1603
1604    #[tokio::test]
1605    async fn test_parse_linphone_candidate() {
1606        let answer = r#"v=0
1607o=mpi 2590 792 IN IP4 192.168.3.181
1608s=Talk
1609c=IN IP4 192.168.3.181
1610t=0 0
1611a=ice-pwd:96adb77560869c783656fe0a
1612a=ice-ufrag:409dfd53
1613a=rtcp-xr:rcvr-rtt=all:10000 stat-summary=loss,dup,jitt,TTL voip-metrics
1614a=record:off
1615m=audio 61794 RTP/AVP 8 101
1616c=IN IP4 115.205.103.101
1617a=rtpmap:101 telephone-event/8000
1618a=rtcp:50735
1619a=candidate:1 1 UDP 2130706303 192.168.3.181 61794 typ host
1620a=candidate:1 2 UDP 2130706302 192.168.3.181 50735 typ host
1621a=candidate:2 1 UDP 1694498687 115.205.103.101 61794 typ srflx raddr 192.168.3.181 rport 61794
1622a=candidate:2 2 UDP 1694498686 115.205.103.101 50735 typ srflx raddr 192.168.3.181 rport 50735
1623a=rtcp-fb:* trr-int 5000
1624a=rtcp-fb:* ccm tmmbr"#;
1625        let mut reader = Cursor::new(answer);
1626        let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1627        let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1628        assert_eq!(peer_media.rtp_addr, "192.168.3.181");
1629    }
1630
1631    #[tokio::test]
1632    async fn test_rtp_track_builder() {
1633        let track_id = "test_track".to_string();
1634        let config = TrackConfig::default();
1635
1636        let track = RtpTrackBuilder::new(track_id.clone(), config)
1637            .with_rtp_start_port(20000)
1638            .with_rtp_end_port(20100)
1639            .with_session_name("test_session".to_string())
1640            .build()
1641            .await
1642            .expect("Failed to build track");
1643
1644        assert_eq!(track.track_id, track_id);
1645        // SSRC is randomly generated in build(), so we can't predict exact value
1646        assert_ne!(track.ssrc, 0); // Should not be zero
1647        assert_eq!(track.ssrc_cname, "test_session");
1648        let inner = track.inner.lock().unwrap();
1649        assert!(inner.rtcp_mux);
1650    }
1651
1652    #[tokio::test]
1653    async fn test_local_description_generation() {
1654        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1655            .build()
1656            .await
1657            .expect("Failed to build track");
1658
1659        let local_desc = track
1660            .local_description()
1661            .expect("Failed to generate local description");
1662
1663        // Verify SDP contains expected elements
1664        assert!(local_desc.contains("m=audio"));
1665        assert!(local_desc.contains("RTP/AVP"));
1666        assert!(local_desc.contains("a=rtcp-mux")); // Should have rtcp-mux by default
1667        assert!(local_desc.contains("a=sendrecv"));
1668        assert!(local_desc.contains(&format!("a=ssrc:{}", track.ssrc)));
1669    }
1670
1671    #[tokio::test]
1672    async fn test_double_set_remote_description() {
1673        let sdp = r#"v=0
1674o=- 123 124 IN IP4 192.168.1.1
1675s=-
1676c=IN IP4 192.168.1.1
1677t=0 0
1678m=audio 5004 RTP/AVP 0
1679a=rtpmap:0 PCMU/8000"#;
1680
1681        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1682            .build()
1683            .await
1684            .expect("Failed to build track");
1685
1686        // First call should succeed
1687        assert!(track.set_remote_description(sdp).is_ok());
1688        assert!(track.remote_description().is_some());
1689
1690        // Second call should be ignored (no error)
1691        assert!(track.set_remote_description(sdp).is_ok());
1692    }
1693
1694    #[tokio::test]
1695    async fn test_invalid_sdp() {
1696        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1697            .build()
1698            .await
1699            .expect("Failed to build track");
1700
1701        // Invalid SDP without audio media
1702        let invalid_sdp = r#"v=0
1703o=- 123 124 IN IP4 192.168.1.1
1704s=-
1705c=IN IP4 192.168.1.1
1706t=0 0"#;
1707
1708        assert!(track.set_remote_description(invalid_sdp).is_err());
1709    }
1710
1711    #[tokio::test]
1712    async fn test_dtmf_digit_mapping() {
1713        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1714            .build()
1715            .await
1716            .expect("Failed to build track");
1717
1718        // Test valid digits - these should not panic during mapping
1719        let valid_digits = [
1720            "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "*", "#", "A", "B", "C", "D",
1721        ];
1722
1723        for digit in &valid_digits {
1724            // Since we don't have remote address set, this will fail with "Remote address not set"
1725            // but it shouldn't fail on digit mapping
1726            let result = track.send_dtmf(digit, Some(100)).await;
1727            assert!(result.is_err());
1728            let error_msg = result.unwrap_err().to_string();
1729            assert!(error_msg.contains("Remote address not set"));
1730        }
1731
1732        // Test invalid digit
1733        let result = track.send_dtmf("X", Some(100)).await;
1734        assert!(result.is_err());
1735        let error_msg = result.unwrap_err().to_string();
1736        assert!(error_msg.contains("Invalid DTMF digit"));
1737    }
1738
1739    #[test]
1740    fn test_rtcp_packet_type_detection() {
1741        // Test RTCP packet type ranges
1742        assert!(200 >= 200 && 200 <= 207); // SR
1743        assert!(201 >= 200 && 201 <= 207); // RR
1744        assert!(202 >= 200 && 202 <= 207); // SDES
1745        assert!(203 >= 200 && 203 <= 207); // BYE
1746        assert!(204 >= 200 && 204 <= 207); // APP
1747
1748        // Test RTP payload type extraction
1749        let rtp_byte = 0b10001001; // Version 2, PT 9
1750        let version = (rtp_byte >> 6) & 0x03;
1751        let pt = rtp_byte & 0x7F;
1752
1753        assert_eq!(version, 2);
1754        assert_eq!(pt, 9);
1755    }
1756
1757    #[test]
1758    fn test_stun_magic_cookie_detection() {
1759        let stun_magic_cookie = STUN_MAGIC_COOKIE;
1760        let bytes = stun_magic_cookie.to_be_bytes();
1761        let reconstructed = u32::from_be_bytes(bytes);
1762
1763        assert_eq!(reconstructed, stun_magic_cookie);
1764    }
1765
1766    #[tokio::test]
1767    async fn test_track_ssrc_and_id() {
1768        let track_id = "unique_track_123".to_string();
1769        let custom_ssrc = 0x12345678;
1770
1771        let track = RtpTrackBuilder::new(track_id.clone(), TrackConfig::default())
1772            .with_ssrc(custom_ssrc)
1773            .build()
1774            .await
1775            .expect("Failed to build track");
1776
1777        // Note: build() overrides SSRC with random value, so we test the builder method separately
1778        let builder =
1779            RtpTrackBuilder::new(track_id.clone(), TrackConfig::default()).with_ssrc(custom_ssrc);
1780        assert_eq!(builder.ssrc, custom_ssrc);
1781        assert_eq!(track.id(), &track_id);
1782    }
1783
1784    #[test]
1785    fn test_codec_type_payload_mapping() {
1786        // Test common codec payload types
1787        assert_eq!(CodecType::PCMU.payload_type(), 0);
1788        assert_eq!(CodecType::G722.payload_type(), 9);
1789        assert_eq!(CodecType::PCMA.payload_type(), 8);
1790        assert_eq!(CodecType::TelephoneEvent.payload_type(), 101);
1791    }
1792
1793    #[tokio::test]
1794    async fn test_stats_initialization() {
1795        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1796            .build()
1797            .await
1798            .expect("Failed to build track");
1799        let inner = track.inner.lock().unwrap();
1800        // Verify stats are properly initialized
1801        assert_eq!(inner.stats.packet_count.load(Ordering::Relaxed), 0);
1802        assert_eq!(inner.stats.octet_count.load(Ordering::Relaxed), 0);
1803        assert_eq!(inner.stats.received_packets.load(Ordering::Relaxed), 0);
1804        assert_eq!(inner.stats.lost_packets.load(Ordering::Relaxed), 0);
1805        assert_eq!(inner.stats.highest_seq_num.load(Ordering::Relaxed), 0);
1806        assert_eq!(inner.stats.jitter.load(Ordering::Relaxed), 0);
1807        assert_eq!(inner.stats.last_sr_timestamp.load(Ordering::Relaxed), 0);
1808        assert_eq!(inner.stats.last_sr_ntp.load(Ordering::Relaxed), 0);
1809        assert_eq!(inner.stats.base_seq.load(Ordering::Relaxed), 0);
1810        assert_eq!(inner.stats.last_receive_seq.load(Ordering::Relaxed), 0);
1811        assert_eq!(inner.stats.last_resync_ts.load(Ordering::Relaxed), 0);
1812    }
1813
1814    #[test]
1815    fn test_sequence_number_gap_calculation() {
1816        let stats = RtpTrackStats::new();
1817
1818        // Simulate receiving packets with gaps
1819        stats.update_receive_stats(1000, 160); // First packet
1820        stats.update_receive_stats(1002, 160); // Skip 1001
1821        stats.update_receive_stats(1003, 160); // Consecutive
1822        stats.update_receive_stats(1005, 160); // Skip 1004
1823
1824        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 4);
1825        assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1005);
1826        // Loss calculation is simplified, so we just verify some loss is detected
1827        assert!(stats.lost_packets.load(Ordering::Relaxed) > 0);
1828    }
1829
1830    #[test]
1831    fn test_jitter_calculation() {
1832        let stats = RtpTrackStats::new();
1833
1834        // Test jitter calculation with sequence numbers
1835        stats.update_receive_stats(1000, 160);
1836        let _initial_jitter = stats.jitter.load(Ordering::Relaxed);
1837
1838        stats.update_receive_stats(1001, 160);
1839        let updated_jitter = stats.jitter.load(Ordering::Relaxed);
1840
1841        // Jitter calculation is simplified and may not always change
1842        // Let's just verify it doesn't panic and stays within reasonable bounds
1843        assert!(updated_jitter < 1000); // Should be reasonable value
1844    }
1845
1846    #[test]
1847    fn test_builder_with_custom_ssrc() {
1848        let custom_ssrc = 0x12345678u32;
1849        let builder =
1850            RtpTrackBuilder::new("test".to_string(), TrackConfig::default()).with_ssrc(custom_ssrc);
1851
1852        // Verify builder stores the custom SSRC
1853        assert_eq!(builder.ssrc, custom_ssrc);
1854        assert_eq!(builder.ssrc_cname, format!("rustpbx-{}", custom_ssrc));
1855    }
1856
1857    #[test]
1858    fn test_builder_configuration() {
1859        let builder = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1860            .with_rtp_start_port(10000)
1861            .with_rtp_end_port(20000)
1862            .with_rtp_alloc_count(100)
1863            .with_rtcp_mux(false)
1864            .with_session_name("custom_session".to_string());
1865
1866        assert_eq!(builder.rtp_start_port, 10000);
1867        assert_eq!(builder.rtp_end_port, 20000);
1868        assert_eq!(builder.rtp_alloc_count, 100);
1869        assert!(!builder.rtcp_mux);
1870        assert_eq!(builder.ssrc_cname, "custom_session");
1871    }
1872
1873    #[tokio::test]
1874    async fn test_ice_connectivity_check_enabled_by_default() {
1875        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1876            .build()
1877            .await
1878            .expect("Failed to build track");
1879
1880        assert!(track.ice_connectivity_check); // Should be enabled by default
1881    }
1882
1883    #[tokio::test]
1884    async fn test_ice_connectivity_check_can_be_disabled() {
1885        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1886            .with_ice_connectivity_check(false)
1887            .build()
1888            .await
1889            .expect("Failed to build track");
1890
1891        assert!(!track.ice_connectivity_check);
1892    }
1893
1894    #[tokio::test]
1895    async fn test_maybe_update_remote_addr_private_peer() {
1896        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1897            .build()
1898            .await
1899            .expect("Failed to build track");
1900        let inner = track.inner.clone();
1901
1902        let private_addr = SipAddr {
1903            addr: HostWithPort {
1904                host: "192.168.0.10".parse().expect("host"),
1905                port: Some(4000.into()),
1906            },
1907            r#type: Some(rsip::transport::Transport::Udp),
1908        };
1909
1910        let public_addr = SipAddr {
1911            addr: HostWithPort {
1912                host: "203.0.113.5".parse().expect("host"),
1913                port: Some(5004.into()),
1914            },
1915            r#type: Some(rsip::transport::Transport::Udp),
1916        };
1917
1918        {
1919            let mut guard = inner.lock().expect("lock");
1920            guard.remote_addr = Some(private_addr.clone());
1921            guard.remote_rtcp_addr = Some(private_addr.clone());
1922            guard.rtcp_mux = true;
1923        }
1924
1925        let updated = RtpTrack::maybe_update_remote_addr(
1926            &inner,
1927            &public_addr,
1928            false,
1929            &track.track_id,
1930            "test",
1931        );
1932
1933        assert!(updated);
1934        let guard = inner.lock().expect("lock");
1935        assert_eq!(
1936            guard
1937                .remote_addr
1938                .as_ref()
1939                .expect("remote")
1940                .addr
1941                .host
1942                .to_string(),
1943            "203.0.113.5"
1944        );
1945        assert_eq!(
1946            guard
1947                .remote_rtcp_addr
1948                .as_ref()
1949                .expect("rtcp")
1950                .addr
1951                .host
1952                .to_string(),
1953            "203.0.113.5"
1954        );
1955    }
1956
1957    #[test]
1958    fn test_stun_packet_structure() {
1959        // Test STUN constants
1960        assert_eq!(STUN_BINDING_REQUEST, 0x0001);
1961        assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442);
1962        assert_eq!(STUN_TRANSACTION_ID_SIZE, 12);
1963
1964        // Test STUN packet construction would be valid
1965        let mut packet = vec![0u8; 20];
1966        packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
1967        packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
1968
1969        // Verify message type
1970        let msg_type = u16::from_be_bytes([packet[0], packet[1]]);
1971        assert_eq!(msg_type, STUN_BINDING_REQUEST);
1972
1973        // Verify magic cookie
1974        let magic = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]);
1975        assert_eq!(magic, STUN_MAGIC_COOKIE);
1976    }
1977
1978    #[tokio::test]
1979    async fn test_ice_connectivity_check_builder_method() {
1980        let builder_enabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1981            .with_ice_connectivity_check(true);
1982        assert!(builder_enabled.ice_connectivity_check);
1983
1984        let builder_disabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1985            .with_ice_connectivity_check(false);
1986        assert!(!builder_disabled.ice_connectivity_check);
1987    }
1988
1989    #[test]
1990    fn test_ice_connectivity_terminology() {
1991        // Verify we're using correct ICE terminology
1992        // ICE connectivity checks use STUN Binding Requests
1993        // This is part of the ICE (Interactive Connectivity Establishment) standard
1994
1995        // The purpose is:
1996        // 1. NAT traversal and hole punching
1997        // 2. Connectivity verification
1998        // 3. Keep-alive for NAT bindings
1999        // 4. Path validation
2000
2001        assert_eq!(STUN_BINDING_REQUEST, 0x0001); // RFC 5389
2002        assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442); // RFC 5389
2003    }
2004}