voice_engine/media/track/
rtp.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::Samples,
6    media::TrackId,
7    media::{
8        codecs::CodecType,
9        jitter::JitterBuffer,
10        negotiate::select_peer_media,
11        processor::ProcessorChain,
12        track::{Track, TrackConfig, TrackPacketSender},
13    },
14};
15use anyhow::Result;
16use async_trait::async_trait;
17use bytes::Bytes;
18use rsip::HostWithPort;
19use rsipstack::transport::{SipAddr, udp::UdpConnection};
20use std::{
21    io::Cursor,
22    net::{IpAddr, SocketAddr},
23    sync::{
24        Arc, Mutex,
25        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
26    },
27    time::Duration,
28};
29use tokio::{select, time::Instant, time::interval_at};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, info, warn};
32use webrtc::{
33    rtcp::{
34        goodbye::Goodbye,
35        receiver_report::ReceiverReport,
36        reception_report::ReceptionReport,
37        sender_report::SenderReport,
38        source_description::{
39            SdesType, SourceDescription, SourceDescriptionChunk, SourceDescriptionItem,
40        },
41    },
42    rtp::{
43        codecs::g7xx::G7xxPayloader,
44        packet::Packet,
45        packetizer::{Packetizer, new_packetizer},
46        sequence::{Sequencer, new_random_sequencer},
47    },
48    sdp::{
49        MediaDescription, SessionDescription,
50        description::{
51            common::{Address, Attribute, ConnectionInformation},
52            media::{MediaName, RangedPort},
53            session::{
54                ATTR_KEY_RTCPMUX, ATTR_KEY_SEND_ONLY, ATTR_KEY_SEND_RECV, ATTR_KEY_SSRC, Origin,
55                TimeDescription, Timing,
56            },
57        },
58    },
59    util::{Marshal, Unmarshal},
60};
61const RTP_MTU: usize = 1500; // UDP MTU size
62const RTP_OUTBOUND_MTU: usize = 1200; // Standard MTU size
63const RTCP_SR_INTERVAL_MS: u64 = 5000; // 5 seconds RTCP sender report interval
64const DTMF_EVENT_DURATION_MS: u64 = 160; // Default DTMF event duration (in ms)
65const DTMF_EVENT_VOLUME: u8 = 10; // Default volume for DTMF events (0-63)
66const RTP_RESYNC_MIN_SKIP_PACKETS: u32 = 3; // Require at least this many missing packets before resyncing
67const RTP_RESYNC_COOLDOWN_FRAMES: u64 = 3; // Cooldown window (in frames) between resync attempts
68
69// STUN constants for ICE connectivity check
70const STUN_BINDING_REQUEST: u16 = 0x0001;
71const STUN_BINDING_RESPONSE: u16 = 0x0101;
72const STUN_MAGIC_COOKIE: u32 = 0x2112A442;
73const STUN_TRANSACTION_ID_SIZE: usize = 12;
74
75struct RtpTrackStats {
76    timestamp: Arc<AtomicU32>,
77    packet_count: Arc<AtomicU32>,
78    octet_count: Arc<AtomicU32>,
79    last_timestamp_update: Arc<AtomicU64>,
80    last_resync_ts: Arc<AtomicU64>,
81    received_packets: Arc<AtomicU32>,
82    received_octets: Arc<AtomicU32>,
83    expected_packets: Arc<AtomicU32>,
84    lost_packets: Arc<AtomicU32>,
85    highest_seq_num: Arc<AtomicU32>,
86    base_seq: Arc<AtomicU32>,
87    last_receive_seq: Arc<AtomicU32>,
88    jitter: Arc<AtomicU32>,
89    last_sr_timestamp: Arc<AtomicU64>,
90    last_sr_ntp: Arc<AtomicU64>,
91}
92
93impl RtpTrackStats {
94    fn new() -> Self {
95        Self {
96            timestamp: Arc::new(AtomicU32::new(0)),
97            packet_count: Arc::new(AtomicU32::new(0)),
98            octet_count: Arc::new(AtomicU32::new(0)),
99            last_timestamp_update: Arc::new(AtomicU64::new(0)),
100            last_resync_ts: Arc::new(AtomicU64::new(0)),
101            received_packets: Arc::new(AtomicU32::new(0)),
102            received_octets: Arc::new(AtomicU32::new(0)),
103            expected_packets: Arc::new(AtomicU32::new(0)),
104            lost_packets: Arc::new(AtomicU32::new(0)),
105            highest_seq_num: Arc::new(AtomicU32::new(0)),
106            base_seq: Arc::new(AtomicU32::new(0)),
107            last_receive_seq: Arc::new(AtomicU32::new(0)),
108            jitter: Arc::new(AtomicU32::new(0)),
109            last_sr_timestamp: Arc::new(AtomicU64::new(0)),
110            last_sr_ntp: Arc::new(AtomicU64::new(0)),
111        }
112    }
113
114    fn update_send_stats(&self, packet_len: u32, samples_per_packet: u32) {
115        self.packet_count.fetch_add(1, Ordering::Relaxed);
116        self.octet_count.fetch_add(packet_len, Ordering::Relaxed);
117        self.timestamp
118            .fetch_add(samples_per_packet, Ordering::Relaxed);
119    }
120
121    fn update_receive_stats(&self, seq_num: u32, payload_len: u32) {
122        let prev_received = self.received_packets.fetch_add(1, Ordering::Relaxed);
123        let received = prev_received + 1;
124        self.received_octets
125            .fetch_add(payload_len, Ordering::Relaxed);
126
127        if prev_received == 0 {
128            self.base_seq.store(seq_num, Ordering::Relaxed);
129            self.last_receive_seq.store(seq_num, Ordering::Relaxed);
130            self.highest_seq_num.store(seq_num, Ordering::Relaxed);
131            self.lost_packets.store(0, Ordering::Relaxed);
132            self.expected_packets.store(received, Ordering::Relaxed);
133        } else {
134            let last_seq = self.last_receive_seq.load(Ordering::Relaxed);
135            let gap = (seq_num as u16).wrapping_sub(last_seq as u16) as u32;
136
137            if gap > 0 && gap < 0x8000 {
138                if gap > 1 {
139                    self.lost_packets.fetch_add(gap - 1, Ordering::Relaxed);
140                }
141                self.last_receive_seq.store(seq_num, Ordering::Relaxed);
142                self.highest_seq_num.store(seq_num, Ordering::Relaxed);
143            }
144
145            let lost = self.lost_packets.load(Ordering::Relaxed);
146            self.expected_packets
147                .store(received + lost, Ordering::Relaxed);
148        }
149
150        let current_jitter = self.jitter.load(Ordering::Relaxed);
151        let new_jitter = (current_jitter + (seq_num % 100)) / 2;
152        self.jitter.store(new_jitter, Ordering::Relaxed);
153    }
154
155    fn store_sr_info(&self, rtp_time: u64, ntp_time: u64) {
156        self.last_sr_timestamp.store(rtp_time, Ordering::Relaxed);
157        self.last_sr_ntp.store(ntp_time, Ordering::Relaxed);
158    }
159
160    fn get_fraction_lost(&self) -> u8 {
161        let expected_packets = self.expected_packets.load(Ordering::Relaxed);
162        let lost_packets = self.lost_packets.load(Ordering::Relaxed);
163
164        if expected_packets > 0 {
165            ((lost_packets * 256) / expected_packets).min(255) as u8
166        } else {
167            0
168        }
169    }
170}
171
172pub struct RtpTrackBuilder {
173    cancel_token: Option<CancellationToken>,
174    track_id: TrackId,
175    config: TrackConfig,
176    local_addr: Option<IpAddr>,
177    external_addr: Option<IpAddr>,
178    rtp_socket: Option<UdpConnection>,
179    rtcp_socket: Option<UdpConnection>,
180    rtcp_mux: bool,
181    rtp_start_port: u16,
182    rtp_end_port: u16,
183    rtp_alloc_count: u32,
184    enabled_codecs: Vec<CodecType>,
185    ssrc_cname: String,
186    ssrc: u32,
187    ice_connectivity_check: bool,
188}
189pub struct RtpTrackInner {
190    dtmf_payload_type: u8,
191    payload_type: u8,
192    remote_description: Option<String>,
193    packetizer: Mutex<Option<Box<dyn Packetizer + Send + Sync>>>,
194    stats: Arc<RtpTrackStats>,
195    rtcp_mux: bool,
196    remote_addr: Option<SipAddr>,
197    remote_rtcp_addr: Option<SipAddr>,
198    enabled_codecs: Vec<CodecType>,
199    rtp_map: Vec<(u8, (CodecType, u32, u16))>,
200}
201
202pub struct RtpTrack {
203    ssrc: u32,
204    ssrc_cname: String,
205    track_id: TrackId,
206    config: TrackConfig,
207    cancel_token: CancellationToken,
208    processor_chain: ProcessorChain,
209    rtp_socket: UdpConnection,
210    rtcp_socket: UdpConnection,
211    encoder: TrackCodec,
212    sequencer: Box<dyn Sequencer + Send + Sync>,
213    sendrecv: AtomicBool,
214    ice_connectivity_check: bool,
215    inner: Arc<Mutex<RtpTrackInner>>,
216}
217
218enum PacketKind {
219    Rtp,
220    Rtcp,
221    Stun(u16),
222    Ignore,
223}
224impl RtpTrackBuilder {
225    pub fn new(track_id: TrackId, config: TrackConfig) -> Self {
226        let ssrc = rand::random::<u32>();
227        Self {
228            track_id,
229            config,
230            local_addr: None,
231            external_addr: None,
232            cancel_token: None,
233            rtp_socket: None,
234            rtcp_socket: None,
235            rtcp_mux: true,
236            rtp_start_port: 12000,
237            rtp_end_port: u16::MAX - 1,
238            rtp_alloc_count: 500,
239            enabled_codecs: vec![
240                #[cfg(feature = "opus")]
241                CodecType::Opus,
242                #[cfg(feature = "g729")]
243                CodecType::G729,
244                CodecType::G722,
245                CodecType::PCMU,
246                CodecType::PCMA,
247                CodecType::TelephoneEvent,
248            ],
249            ssrc_cname: format!("rustpbx-{}", ssrc),
250            ssrc,
251            ice_connectivity_check: true, // Default enabled
252        }
253    }
254
255    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
256        self.ssrc = ssrc;
257        self.ssrc_cname = format!("rustpbx-{}", ssrc);
258        self
259    }
260
261    pub fn with_rtp_start_port(mut self, rtp_start_port: u16) -> Self {
262        self.rtp_start_port = rtp_start_port;
263        self
264    }
265    pub fn with_rtp_end_port(mut self, rtp_end_port: u16) -> Self {
266        self.rtp_end_port = rtp_end_port;
267        self
268    }
269    pub fn with_rtp_alloc_count(mut self, rtp_alloc_count: u32) -> Self {
270        self.rtp_alloc_count = rtp_alloc_count;
271        self
272    }
273    pub fn with_local_addr(mut self, local_addr: IpAddr) -> Self {
274        self.local_addr = Some(local_addr);
275        self
276    }
277
278    pub fn with_external_addr(mut self, external_addr: IpAddr) -> Self {
279        self.external_addr = Some(external_addr);
280        self
281    }
282
283    pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
284        self.cancel_token = Some(cancel_token);
285        self
286    }
287
288    pub fn with_rtp_socket(mut self, rtp_socket: UdpConnection) -> Self {
289        self.rtp_socket = Some(rtp_socket);
290        self
291    }
292    pub fn with_rtcp_socket(mut self, rtcp_socket: UdpConnection) -> Self {
293        self.rtcp_socket = Some(rtcp_socket);
294        self
295    }
296    pub fn with_rtcp_mux(mut self, rtcp_mux: bool) -> Self {
297        self.rtcp_mux = rtcp_mux;
298        self
299    }
300
301    pub fn with_enabled_codecs(mut self, enabled_codecs: Vec<CodecType>) -> Self {
302        self.enabled_codecs = enabled_codecs;
303        self
304    }
305    pub fn with_session_name(mut self, session_name: String) -> Self {
306        self.ssrc_cname = session_name;
307        self
308    }
309
310    pub fn with_ice_connectivity_check(mut self, enabled: bool) -> Self {
311        self.ice_connectivity_check = enabled;
312        self
313    }
314    pub async fn build_rtp_rtcp_conn(&self) -> Result<(UdpConnection, UdpConnection)> {
315        let addr = match self.local_addr {
316            Some(addr) => addr,
317            None => crate::net_tool::get_first_non_loopback_interface()?,
318        };
319        let mut rtp_conn = None;
320        let mut rtcp_conn = None;
321
322        for _ in 0..self.rtp_alloc_count {
323            let port = rand::random_range::<u16, _>(self.rtp_start_port..=self.rtp_end_port);
324            if port % 2 != 0 {
325                continue;
326            }
327            if let Ok(c) = UdpConnection::create_connection(
328                format!("{:?}:{}", addr, port).parse()?,
329                None,
330                self.cancel_token.clone(),
331            )
332            .await
333            {
334                if !self.rtcp_mux {
335                    // if rtcp mux is not enabled, we need to create a separate RTCP socket
336                    rtcp_conn = match UdpConnection::create_connection(
337                        format!("{:?}:{}", addr, port + 1).parse()?,
338                        None,
339                        self.cancel_token.clone(),
340                    )
341                    .await
342                    {
343                        Ok(c) => Some(c),
344                        Err(_) => {
345                            continue;
346                        }
347                    };
348                } else {
349                    rtcp_conn = Some(c.clone());
350                }
351                rtp_conn = Some(c);
352                break;
353            }
354        }
355
356        let mut rtp_conn = match rtp_conn {
357            Some(c) => c,
358            None => return Err(anyhow::anyhow!("failed to bind RTP socket")),
359        };
360        let mut rtcp_conn = match rtcp_conn {
361            Some(c) => c,
362            None => return Err(anyhow::anyhow!("failed to bind RTCP socket")),
363        };
364
365        if let Some(addr) = self.external_addr {
366            rtp_conn.external = Some(
367                SocketAddr::new(
368                    addr,
369                    *rtp_conn
370                        .get_addr()
371                        .addr
372                        .port
373                        .clone()
374                        .unwrap_or_default()
375                        .value(),
376                )
377                .into(),
378            );
379            rtcp_conn.external = Some(
380                SocketAddr::new(
381                    addr,
382                    *rtcp_conn
383                        .get_addr()
384                        .addr
385                        .port
386                        .clone()
387                        .unwrap_or_default()
388                        .value(),
389                )
390                .into(),
391            );
392        }
393        Ok((rtp_conn, rtcp_conn))
394    }
395
396    pub async fn build(mut self) -> Result<RtpTrack> {
397        let mut rtp_socket = self.rtp_socket.take();
398        let mut rtcp_socket = self.rtcp_socket.take();
399
400        if rtp_socket.is_none() || rtcp_socket.is_none() {
401            let (rtp_conn, rtcp_conn) = self.build_rtp_rtcp_conn().await?;
402            rtp_socket = Some(rtp_conn);
403            rtcp_socket = Some(rtcp_conn);
404        }
405        let cancel_token = self
406            .cancel_token
407            .unwrap_or_else(|| CancellationToken::new());
408        let processor_chain = ProcessorChain::new(self.config.samplerate);
409        let ssrc = if self.ssrc != 0 {
410            self.ssrc
411        } else {
412            loop {
413                let i = rand::random::<u32>();
414                if i % 2 == 0 {
415                    break i;
416                }
417            }
418        };
419        let inner = RtpTrackInner {
420            dtmf_payload_type: 101, // Default DTMF payload type
421            payload_type: 0,        // Will be set later based on remote description
422            remote_description: None,
423            packetizer: Mutex::new(None),
424            stats: Arc::new(RtpTrackStats::new()),
425            rtcp_mux: self.rtcp_mux,
426            remote_addr: None,
427            remote_rtcp_addr: None,
428            enabled_codecs: self.enabled_codecs.clone(),
429            rtp_map: vec![],
430        };
431        let track = RtpTrack {
432            ssrc,
433            ssrc_cname: self.ssrc_cname.clone(),
434            track_id: self.track_id,
435            config: self.config,
436            cancel_token,
437            processor_chain,
438            rtp_socket: rtp_socket.unwrap(),
439            rtcp_socket: rtcp_socket.unwrap(),
440            encoder: TrackCodec::new(),
441            sequencer: Box::new(new_random_sequencer()),
442            sendrecv: AtomicBool::new(true),
443            ice_connectivity_check: self.ice_connectivity_check,
444            inner: Arc::new(Mutex::new(inner)),
445        };
446        Ok(track)
447    }
448}
449
450impl RtpTrack {
451    pub fn id(&self) -> &str {
452        &self.track_id
453    }
454
455    pub fn ssrc(&self) -> u32 {
456        self.ssrc
457    }
458
459    pub fn remote_description(&self) -> Option<String> {
460        self.inner.lock().unwrap().remote_description.clone()
461    }
462
463    pub fn set_rtp_map(&self, rtp_map: Vec<(u8, (CodecType, u32, u16))>) {
464        if let Ok(mut inner) = self.inner.lock() {
465            inner.rtp_map = rtp_map;
466        }
467    }
468
469    pub fn set_remote_description(&self, answer: &str) -> Result<()> {
470        let mut inner = self.inner.lock().unwrap();
471        let mut reader = Cursor::new(answer);
472        let sdp = SessionDescription::unmarshal(&mut reader)?;
473        let peer_media = match select_peer_media(&sdp, "audio") {
474            Some(peer_media) => peer_media,
475            None => return Err(anyhow::anyhow!("no audio media in answer SDP")),
476        };
477
478        inner.rtp_map = peer_media.rtp_map.clone();
479
480        if peer_media.codecs.is_empty() {
481            return Err(anyhow::anyhow!("no audio codecs in answer SDP"));
482        }
483
484        if peer_media.rtp_addr.is_empty() {
485            return Err(anyhow::anyhow!("no rtp addr in answer SDP"));
486        }
487
488        inner.remote_description.replace(answer.to_string());
489
490        let remote_addr = SipAddr {
491            addr: HostWithPort {
492                host: peer_media.rtp_addr.parse()?,
493                port: Some(peer_media.rtp_port.into()),
494            },
495            r#type: Some(rsip::transport::Transport::Udp),
496        };
497        let remote_rtcp_addr = SipAddr {
498            addr: HostWithPort {
499                host: peer_media.rtcp_addr.parse()?,
500                port: Some(peer_media.rtcp_port.into()),
501            },
502            r#type: Some(rsip::transport::Transport::Udp),
503        };
504        let codec_type = peer_media.codecs[0];
505        info!(
506            track_id = self.track_id,
507            rtcp_mux = peer_media.rtcp_mux,
508            %remote_addr,
509            %remote_rtcp_addr,
510            ?codec_type,
511            ssrc = self.ssrc,
512            "set remote description"
513        );
514
515        inner.payload_type = codec_type.payload_type();
516        inner.enabled_codecs = vec![codec_type];
517        for (payload_type, (codec, clock_rate, _)) in peer_media.rtp_map.iter() {
518            if *codec == codec_type {
519                inner.payload_type = *payload_type;
520            }
521
522            if codec == &CodecType::TelephoneEvent && clock_rate == &codec_type.clock_rate() {
523                inner.dtmf_payload_type = *payload_type;
524            }
525        }
526
527        inner.remote_addr.replace(remote_addr);
528        inner.remote_rtcp_addr.replace(remote_rtcp_addr);
529        inner.rtcp_mux = peer_media.rtcp_mux;
530
531        let payloader = match codec_type {
532            #[cfg(feature = "opus")]
533            CodecType::Opus => Box::<webrtc::rtp::codecs::opus::OpusPayloader>::default()
534                as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
535            _ => Box::<G7xxPayloader>::default()
536                as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
537        };
538
539        inner
540            .packetizer
541            .lock()
542            .unwrap()
543            .replace(Box::new(new_packetizer(
544                RTP_OUTBOUND_MTU,
545                inner.payload_type,
546                self.ssrc,
547                payloader,
548                self.sequencer.clone(),
549                codec_type.clock_rate(),
550            )));
551        Ok(())
552    }
553
554    pub fn local_description(&self) -> Result<String> {
555        let socketaddr: SocketAddr = self.rtp_socket.get_addr().addr.to_owned().try_into()?;
556        let mut sdp = SessionDescription::default();
557
558        // Set session-level attributes
559        sdp.version = 0;
560        sdp.origin = Origin {
561            username: "-".to_string(),
562            session_id: 0,
563            session_version: 0,
564            network_type: "IN".to_string(),
565            address_type: "IP4".to_string(),
566            unicast_address: socketaddr.ip().to_string(),
567        };
568        sdp.session_name = "-".to_string();
569        sdp.connection_information = Some(ConnectionInformation {
570            address_type: "IP4".to_string(),
571            network_type: "IN".to_string(),
572            address: Some(Address {
573                address: socketaddr.ip().to_string(),
574                ttl: None,
575                range: None,
576            }),
577        });
578        sdp.time_descriptions.push(TimeDescription {
579            timing: Timing {
580                start_time: 0,
581                stop_time: 0,
582            },
583            repeat_times: vec![],
584        });
585
586        // Add media section
587        let mut media = MediaDescription::default();
588        media.media_name = MediaName {
589            media: "audio".to_string(),
590            port: RangedPort {
591                value: socketaddr.port() as isize,
592                range: None,
593            },
594            protos: vec!["RTP".to_string(), "AVP".to_string()],
595            formats: vec![],
596        };
597        let inner = self.inner.lock().unwrap();
598        for codec in inner.enabled_codecs.iter() {
599            if codec == &CodecType::TelephoneEvent {
600                continue;
601            }
602            // Try to find payload type from rtp_map (from caller's offer), otherwise use default
603            let mut payload_type = codec.payload_type();
604            for (payload_typ, (rtp_map_codec, _, _)) in inner.rtp_map.iter() {
605                if *rtp_map_codec == *codec {
606                    payload_type = *payload_typ;
607                    break;
608                }
609            }
610
611            media.media_name.formats.push(payload_type.to_string());
612            media.attributes.push(Attribute {
613                key: "rtpmap".to_string(),
614                value: Some(format!("{} {}", payload_type, codec.rtpmap())),
615            });
616            if let Some(fmtp) = codec.fmtp() {
617                media.attributes.push(Attribute {
618                    key: "fmtp".to_string(),
619                    value: Some(format!("{} {}", payload_type, fmtp)),
620                });
621            }
622        }
623
624        // Add telephone-event
625        // Creating an offer: add telephone-event if enabled_codecs have 8000 or 48000 clock rate
626        let has_8khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 8000);
627        let has_48khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 48000);
628
629        if has_8khz_codec {
630            // Add telephone-event at 8000 Hz (default payload type 101)
631            let mut payload_type = 101;
632            for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
633                if *codec == CodecType::TelephoneEvent && *clock_rate == 8000 {
634                    payload_type = *typ;
635                    break;
636                }
637            }
638            media.media_name.formats.push(payload_type.to_string());
639            media.attributes.push(Attribute {
640                key: "rtpmap".to_string(),
641                value: Some(format!("{} telephone-event/8000", payload_type)),
642            });
643            media.attributes.push(Attribute {
644                key: "fmtp".to_string(),
645                value: Some(format!("{} 0-16", payload_type)),
646            });
647        }
648
649        if has_48khz_codec {
650            let mut payload_type = 97;
651            for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
652                if *codec == CodecType::TelephoneEvent && *clock_rate == 48000 {
653                    payload_type = *typ;
654                    break;
655                }
656            }
657
658            media.media_name.formats.push(payload_type.to_string());
659            media.attributes.push(Attribute {
660                key: "rtpmap".to_string(),
661                value: Some(format!("{} telephone-event/48000", payload_type)),
662            });
663            media.attributes.push(Attribute {
664                key: "fmtp".to_string(),
665                value: Some(format!("{} 0-16", payload_type)),
666            });
667        }
668
669        // Add media-level attributes
670        if inner.rtcp_mux {
671            media.attributes.push(Attribute {
672                key: ATTR_KEY_RTCPMUX.to_string(),
673                value: None,
674            });
675        }
676        media.attributes.push(Attribute {
677            key: ATTR_KEY_SSRC.to_string(),
678            value: Some(if self.ssrc_cname.is_empty() {
679                self.ssrc.to_string()
680            } else {
681                format!("{} cname:{}", self.ssrc, self.ssrc_cname)
682            }),
683        });
684        if self.sendrecv.load(Ordering::Relaxed) {
685            media.attributes.push(Attribute {
686                key: ATTR_KEY_SEND_RECV.to_string(),
687                value: None,
688            });
689        } else {
690            media.attributes.push(Attribute {
691                key: ATTR_KEY_SEND_ONLY.to_string(),
692                value: None,
693            });
694        }
695        media.attributes.push(Attribute {
696            key: "ptime".to_string(),
697            value: Some(format!("{}", self.config.ptime.as_millis())),
698        });
699        sdp.media_descriptions.push(media);
700        Ok(sdp.marshal())
701    }
702
703    // Send DTMF tone using RFC 4733
704    pub async fn send_dtmf(&self, digit: &str, duration_ms: Option<u64>) -> Result<()> {
705        // Map DTMF digit to event code first (validate before checking remote address)
706        let event_code = match digit {
707            "0" => 0,
708            "1" => 1,
709            "2" => 2,
710            "3" => 3,
711            "4" => 4,
712            "5" => 5,
713            "6" => 6,
714            "7" => 7,
715            "8" => 8,
716            "9" => 9,
717            "*" => 10,
718            "#" => 11,
719            "A" => 12,
720            "B" => 13,
721            "C" => 14,
722            "D" => 15,
723            _ => return Err(anyhow::anyhow!("Invalid DTMF digit")),
724        };
725        let inner = self.inner.lock().unwrap();
726        let socket = &self.rtp_socket;
727        let remote_addr = match inner.remote_addr.as_ref() {
728            Some(addr) => addr.clone(),
729            None => return Err(anyhow::anyhow!("Remote address not set")),
730        };
731
732        // Use default duration if not specified
733        let duration = duration_ms.unwrap_or(DTMF_EVENT_DURATION_MS);
734
735        // Calculate number of packets to send
736        // We send one packet every 20ms (default packet time)
737        let num_packets = (duration as f64 / self.config.ptime.as_millis() as f64).ceil() as u32;
738
739        // Calculate samples per packet for timestamp increments
740        let samples_per_packet =
741            (self.config.samplerate as f64 * self.config.ptime.as_secs_f64()) as u32;
742
743        let now = crate::media::get_timestamp();
744        inner
745            .stats
746            .last_timestamp_update
747            .store(now, Ordering::Relaxed);
748
749        // Generate RFC 4733 DTMF events
750        for i in 0..num_packets {
751            let is_end = i == num_packets - 1;
752            let event_duration = i * (self.config.ptime.as_millis() as u32 * 8); // Duration in timestamp units
753
754            // Create DTMF event payload
755            // Format: |event(8)|E|R|Volume(6)|Duration(16)|
756            let mut payload = vec![0u8; 4];
757            payload[0] = event_code;
758            payload[1] = DTMF_EVENT_VOLUME & 0x3F; // Volume (0-63)
759            if is_end {
760                payload[1] |= 0x80; // Set end bit (E)
761            }
762
763            // Duration (16 bits, network byte order)
764            payload[2] = ((event_duration >> 8) & 0xFF) as u8;
765            payload[3] = (event_duration & 0xFF) as u8;
766
767            let packets = match inner.packetizer.lock().unwrap().as_mut() {
768                Some(p) => p.packetize(&Bytes::from_owner(payload), samples_per_packet)?,
769                None => return Err(anyhow::anyhow!("Packetizer not set")),
770            };
771            for mut packet in packets {
772                packet.header.payload_type = inner.dtmf_payload_type;
773                packet.header.marker = false;
774
775                match packet.marshal() {
776                    Ok(ref rtp_data) => {
777                        match socket.send_raw(rtp_data, &remote_addr).await {
778                            Ok(_) => {}
779                            Err(e) => {
780                                warn!("Failed to send DTMF RTP packet: {}", e);
781                            }
782                        }
783
784                        // Update counters for RTCP
785                        inner.stats.packet_count.fetch_add(1, Ordering::Relaxed);
786                        inner
787                            .stats
788                            .octet_count
789                            .fetch_add(rtp_data.len() as u32, Ordering::Relaxed);
790
791                        // Sleep for packet time if not the last packet
792                        if !is_end {
793                            tokio::time::sleep(self.config.ptime).await;
794                        }
795                    }
796                    Err(e) => {
797                        warn!("Failed to create DTMF RTP packet: {:?}", e);
798                        continue;
799                    }
800                }
801            }
802        }
803
804        // After sending DTMF, update the timestamp to account for the DTMF duration
805        inner
806            .stats
807            .timestamp
808            .fetch_add(samples_per_packet * num_packets, Ordering::Relaxed);
809
810        Ok(())
811    }
812
813    // Send STUN Binding Request for ICE connectivity check
814    async fn send_ice_connectivity_check(
815        socket: &UdpConnection,
816        remote_addr: &SipAddr,
817    ) -> Result<()> {
818        let mut stun_packet = vec![0u8; 20]; // STUN header is 20 bytes
819        stun_packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
820        stun_packet[2..4].copy_from_slice(&0u16.to_be_bytes());
821        stun_packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
822        let transaction_id: [u8; STUN_TRANSACTION_ID_SIZE] = rand::random();
823        stun_packet[8..20].copy_from_slice(&transaction_id);
824
825        socket.send_raw(&stun_packet, remote_addr).await.ok();
826        Ok(())
827    }
828
829    async fn handle_rtcp_packet(
830        track_id: &TrackId,
831        buf: &[u8],
832        n: usize,
833        stats: &Arc<RtpTrackStats>,
834        ssrc: u32,
835    ) -> Result<()> {
836        use webrtc::rtcp::packet::unmarshal;
837
838        let mut buf_slice = &buf[0..n];
839        let packets = match unmarshal(&mut buf_slice) {
840            Ok(packets) => packets,
841            Err(e) => {
842                warn!(track_id, "Failed to parse RTCP packet: {:?}", e);
843                return Ok(());
844            }
845        };
846
847        for packet in packets {
848            if let Some(sr) = packet.as_any().downcast_ref::<SenderReport>() {
849                stats.store_sr_info(sr.rtp_time as u64, sr.ntp_time);
850                info!(
851                    track_id,
852                    ssrc = sr.ssrc,
853                    packet_count = sr.packet_count,
854                    octet_count = sr.octet_count,
855                    rtp_time = sr.rtp_time,
856                    "Received SR"
857                );
858            } else if let Some(rr) = packet.as_any().downcast_ref::<ReceiverReport>() {
859                for report in &rr.reports {
860                    if report.ssrc == ssrc {
861                        let packet_loss = report.fraction_lost;
862                        let total_lost = report.total_lost;
863                        let jitter = report.jitter;
864
865                        info!(
866                            track_id,
867                            ssrc = report.ssrc,
868                            fraction_lost = packet_loss,
869                            total_lost = total_lost,
870                            jitter = jitter,
871                            last_sequence_number = report.last_sequence_number,
872                            "Received RR for our stream"
873                        );
874
875                        if packet_loss > 50 {
876                            warn!(track_id, "High packet loss detected: {}%", packet_loss);
877                        }
878                    }
879                }
880            } else if let Some(_) = packet.as_any().downcast_ref::<SourceDescription>() {
881            } else {
882                debug!(
883                    track_id,
884                    packet_type = %packet.header().packet_type,
885                    "Received other RTCP packet type"
886                );
887            }
888        }
889
890        Ok(())
891    }
892
893    async fn classify_packet(
894        track_id: &TrackId,
895        buf: &[u8],
896        n: usize,
897        stats: &Arc<RtpTrackStats>,
898        ssrc: u32,
899    ) -> PacketKind {
900        // Detect STUN packets first
901        if n >= 20 {
902            let msg_type = u16::from_be_bytes([buf[0], buf[1]]);
903            let msg_length = u16::from_be_bytes([buf[2], buf[3]]);
904            let magic_cookie = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
905
906            if magic_cookie == STUN_MAGIC_COOKIE
907                || ((msg_type & 0xC000) == 0x0000 && (msg_length as usize + 20) <= n)
908            {
909                debug!(
910                    track_id = track_id.as_str(),
911                    "Received STUN packet with message type: 0x{:04X}, length: {}", msg_type, n
912                );
913                return PacketKind::Stun(msg_type);
914            }
915        }
916
917        // Detect RTCP packets
918        let version = (buf[0] >> 6) & 0x03;
919        let rtcp_pt = buf[1];
920        if version == 2 && rtcp_pt >= 200 && rtcp_pt <= 207 {
921            if let Err(e) = Self::handle_rtcp_packet(track_id, buf, n, stats, ssrc).await {
922                warn!(
923                    track_id = track_id.as_str(),
924                    "Failed to handle RTCP packet: {:?}", e
925                );
926            }
927            return PacketKind::Rtcp;
928        }
929
930        // Validate RTP packets
931        let rtp_pt = buf[1] & 0x7F;
932        if version != 2 {
933            info!(
934                track_id = track_id.as_str(),
935                "Received packet with invalid RTP version: {}, skipping", version
936            );
937            return PacketKind::Ignore;
938        }
939
940        if rtp_pt >= 128 {
941            debug!(
942                track_id = track_id.as_str(),
943                "Received packet with invalid RTP payload type: {}, might be unrecognized protocol",
944                rtp_pt
945            );
946            return PacketKind::Ignore;
947        }
948
949        PacketKind::Rtp
950    }
951
952    async fn recv_rtp_packets(
953        inner: Arc<Mutex<RtpTrackInner>>,
954        ptime: Duration,
955        rtp_socket: UdpConnection,
956        track_id: TrackId,
957        processor_chain: ProcessorChain,
958        packet_sender: TrackPacketSender,
959        _rtcp_socket: UdpConnection,
960        ssrc: u32,
961    ) -> Result<()> {
962        let mut buf = vec![0u8; RTP_MTU];
963        let mut send_ticker = tokio::time::interval(ptime);
964        let mut jitter = JitterBuffer::new();
965        let stats = inner.lock().unwrap().stats.clone();
966
967        loop {
968            select! {
969                Ok((n, src_addr)) = rtp_socket.recv_raw(&mut buf) => {
970                    if n == 0 {
971                        continue;
972                    }
973
974
975                    let packet_kind = Self::classify_packet(&track_id, &buf, n, &stats, ssrc).await;
976                    match packet_kind {
977                        PacketKind::Stun(msg_type) => {
978                            let force = msg_type == STUN_BINDING_RESPONSE;
979                            Self::maybe_update_remote_addr(&inner, &src_addr, force, &track_id, "stun");
980                            continue;
981                        }
982                        PacketKind::Rtcp => {
983                            Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtcp");
984                            continue;
985                        }
986                        PacketKind::Ignore => {
987                            continue;
988                        }
989                        PacketKind::Rtp => {
990                            Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtp-private");
991                        }
992                    }
993                    let packet = match Packet::unmarshal(&mut &buf[0..n]) {
994                        Ok(packet) => packet,
995                        Err(e) => {
996                            info!(track_id, "Error creating RTP reader: {:?}", e);
997                            continue;
998                        }
999                    };
1000
1001                    let seq_num = packet.header.sequence_number as u32;
1002                    let payload_len = packet.payload.len() as u32;
1003                    stats.update_receive_stats(seq_num, payload_len);
1004
1005                    let payload_type = packet.header.payload_type;
1006                    let payload = packet.payload.to_vec();
1007                    let sample_rate = match payload_type {
1008                        9 => 16000,   // G.722
1009                        111 => 48000, // Opus
1010                        _ => 8000,
1011                    };
1012
1013                    let frame = AudioFrame {
1014                        track_id: track_id.clone(),
1015                        samples: Samples::RTP {
1016                            payload_type,
1017                            payload,
1018                            sequence_number: packet.header.sequence_number.into(),
1019                        },
1020                        timestamp: crate::media::get_timestamp(),
1021                        sample_rate,
1022                    };
1023
1024                    jitter.push(frame);
1025                }
1026                _ = send_ticker.tick() => {
1027                    let mut frame = match jitter.pop() {
1028                        Some(f) => f,
1029                        None => continue,
1030                    };
1031
1032                    if let Err(e) = processor_chain.process_frame(&mut frame) {
1033                        warn!(track_id, "Failed to process frame: {}", e);
1034                        break;
1035                    }
1036                    match packet_sender.send(frame) {
1037                        Ok(_) => {}
1038                        Err(e) => {
1039                            warn!(track_id, "Error sending audio frame: {}", e);
1040                            break;
1041                        }
1042                    }
1043                }
1044            }
1045        }
1046        Ok(())
1047    }
1048
1049    fn maybe_update_remote_addr(
1050        inner: &Arc<Mutex<RtpTrackInner>>,
1051        src_addr: &SipAddr,
1052        force: bool,
1053        track_id: &TrackId,
1054        reason: &'static str,
1055    ) -> bool {
1056        let mut guard = inner.lock().unwrap();
1057        let src_ip = Self::sip_addr_ip(src_addr);
1058
1059        let should_update = if force {
1060            true
1061        } else {
1062            match (guard.remote_addr.as_ref(), src_ip) {
1063                (Some(remote), Some(src_ip)) => match Self::sip_addr_ip(remote) {
1064                    Some(remote_ip) => remote_ip != src_ip && Self::is_private_ip(&remote_ip),
1065                    None => false,
1066                },
1067                (None, _) => true,
1068                _ => false,
1069            }
1070        };
1071
1072        if should_update {
1073            let old = guard.remote_addr.replace(src_addr.clone());
1074            if guard.rtcp_mux {
1075                guard.remote_rtcp_addr = Some(src_addr.clone());
1076            } else if let Some(rtcp_addr) = guard.remote_rtcp_addr.as_mut() {
1077                rtcp_addr.addr.host = src_addr.addr.host.clone();
1078            }
1079            info!(
1080                track_id = track_id.as_str(),
1081                ?old,
1082                ?src_addr,
1083                reason = reason,
1084                "Updating remote RTP address"
1085            );
1086            return true;
1087        }
1088        false
1089    }
1090
1091    fn sip_addr_ip(addr: &SipAddr) -> Option<IpAddr> {
1092        addr.addr.host.to_string().parse().ok()
1093    }
1094
1095    fn is_private_ip(ip: &IpAddr) -> bool {
1096        match ip {
1097            IpAddr::V4(v4) => {
1098                v4.is_private()
1099                    || v4.is_loopback()
1100                    || v4.is_link_local()
1101                    || v4.is_broadcast()
1102                    || v4.is_documentation()
1103                    || v4.is_unspecified()
1104            }
1105            IpAddr::V6(v6) => {
1106                v6.is_unique_local()
1107                    || v6.is_loopback()
1108                    || v6.is_unspecified()
1109                    || v6.is_unicast_link_local()
1110            }
1111        }
1112    }
1113
1114    // Send RTCP sender reports periodically
1115    async fn send_rtcp_reports(
1116        inner: Arc<Mutex<RtpTrackInner>>,
1117        track_id: TrackId,
1118        token: CancellationToken,
1119        rtcp_socket: &UdpConnection,
1120        ssrc: u32,
1121        ssrc_cname: String,
1122    ) -> Result<()> {
1123        let mut interval = interval_at(
1124            Instant::now() + Duration::from_millis(RTCP_SR_INTERVAL_MS),
1125            Duration::from_millis(RTCP_SR_INTERVAL_MS),
1126        );
1127        let stats = inner.lock().unwrap().stats.clone();
1128        let mut last_sent_octets = stats.octet_count.load(Ordering::Relaxed);
1129        let mut last_recv_octets = stats.received_octets.load(Ordering::Relaxed);
1130        let mut last_rate_instant = Instant::now();
1131        loop {
1132            select! {
1133                _ = token.cancelled() => {
1134                    info!(track_id, "RTCP reports task cancelled");
1135                    break;
1136                }
1137                _ = interval.tick() => {
1138                    // Generate RTCP Sender Report
1139                    let packet_count = stats.packet_count.load(Ordering::Relaxed);
1140                    let octet_count = stats.octet_count.load(Ordering::Relaxed);
1141                    let rtp_timestamp = stats.timestamp.load(Ordering::Relaxed);
1142
1143                    let sent_octets = octet_count;
1144                    let recv_octets = stats.received_octets.load(Ordering::Relaxed);
1145                    let now = Instant::now();
1146                    let elapsed = now.saturating_duration_since(last_rate_instant).as_secs_f64();
1147                    if elapsed > 0.0 {
1148                        let delta_sent = if sent_octets >= last_sent_octets {
1149                            (sent_octets - last_sent_octets) as u64
1150                        } else {
1151                            (u32::MAX as u64 - last_sent_octets as u64) + sent_octets as u64 + 1
1152                        };
1153                        let delta_recv = if recv_octets >= last_recv_octets {
1154                            (recv_octets - last_recv_octets) as u64
1155                        } else {
1156                            (u32::MAX as u64 - last_recv_octets as u64) + recv_octets as u64 + 1
1157                        };
1158
1159                        let send_bps = (delta_sent as f64 * 8.0) / elapsed;
1160                        let recv_bps = (delta_recv as f64 * 8.0) / elapsed;
1161                        let received_packets = stats.received_packets.load(Ordering::Relaxed);
1162                        let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1163                        let expected_packets = stats.expected_packets.load(Ordering::Relaxed);
1164                        let fraction_lost = stats.get_fraction_lost();
1165                        let loss_pct = (fraction_lost as f64) * 100.0 / 256.0;
1166                        let jitter = stats.jitter.load(Ordering::Relaxed);
1167
1168                        info!(
1169                            track_id = track_id.as_str(),
1170                            send_kbps = send_bps / 1000.0,
1171                            recv_kbps = recv_bps / 1000.0,
1172                            sent_packets = packet_count,
1173                            recv_packets = received_packets,
1174                            expected_packets,
1175                            lost_packets,
1176                            loss_pct,
1177                            jitter,
1178                            "RTP throughput"
1179                        );
1180
1181                        last_rate_instant = now;
1182                        last_sent_octets = sent_octets;
1183                        last_recv_octets = recv_octets;
1184                    }
1185
1186                    let mut pkts = vec![Box::new(SenderReport {
1187                        ssrc,
1188                        ntp_time: Instant::now().elapsed().as_secs() as u64,
1189                        rtp_time: rtp_timestamp,
1190                        packet_count,
1191                        octet_count,
1192                        profile_extensions: Bytes::new(),
1193                        reports: vec![],
1194                    })
1195                        as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1196
1197                    if !ssrc_cname.is_empty() {
1198                        pkts.push(Box::new(SourceDescription {
1199                            chunks: vec![SourceDescriptionChunk {
1200                                source: ssrc,
1201                                items: vec![SourceDescriptionItem {
1202                                    sdes_type: SdesType::SdesCname,
1203                                    text: ssrc_cname.clone().into(),
1204                                }],
1205                            }],
1206                        })
1207                            as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1208                    }
1209
1210                    let received_packets = stats.received_packets.load(Ordering::Relaxed);
1211                    let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1212                    let highest_seq = stats.highest_seq_num.load(Ordering::Relaxed);
1213                    let jitter = stats.jitter.load(Ordering::Relaxed);
1214                    let fraction_lost = stats.get_fraction_lost();
1215
1216                    if received_packets > 0 || lost_packets > 0 {
1217                        let remote_ssrc = ssrc + 1;
1218                        let report = ReceptionReport {
1219                            ssrc: remote_ssrc,
1220                            fraction_lost,
1221                            total_lost: lost_packets,
1222                            last_sequence_number: highest_seq,
1223                            jitter,
1224                            last_sender_report: (stats.last_sr_timestamp.load(Ordering::Relaxed) >> 16) as u32,
1225                            delay: 0,
1226                        };
1227
1228                        let rr = ReceiverReport {
1229                            ssrc,
1230                            reports: vec![report],
1231                            profile_extensions: Bytes::new(),
1232                        };
1233                        pkts.push(Box::new(rr) as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1234                    }
1235
1236                    let rtcp_data = webrtc::rtcp::packet::marshal(&pkts)?;
1237                    let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1238                    match remote_rtcp_addr{
1239                        Some(ref addr) => {
1240                            if let Err(e) = rtcp_socket.send_raw(&rtcp_data, addr).await {
1241                                warn!(track_id, "Failed to send RTCP report: {}", e);
1242                            }
1243                        }
1244                        None => {}
1245                    }
1246                }
1247            }
1248        }
1249        Ok(())
1250    }
1251
1252    async fn try_ice_connectivity_check(&self) {
1253        let remote_addr = self.inner.lock().unwrap().remote_addr.clone();
1254        let remote_rtcp_addr = self.inner.lock().unwrap().remote_rtcp_addr.clone();
1255
1256        if let Some(ref addr) = remote_addr {
1257            Self::send_ice_connectivity_check(&self.rtp_socket, addr)
1258                .await
1259                .ok();
1260            if let Some(ref rtcp_addr) = remote_rtcp_addr {
1261                if rtcp_addr != addr {
1262                    Self::send_ice_connectivity_check(&self.rtcp_socket, rtcp_addr)
1263                        .await
1264                        .ok();
1265                }
1266            }
1267        }
1268    }
1269}
1270
1271#[async_trait]
1272impl Track for RtpTrack {
1273    fn ssrc(&self) -> u32 {
1274        self.ssrc
1275    }
1276    fn id(&self) -> &TrackId {
1277        &self.track_id
1278    }
1279    fn config(&self) -> &TrackConfig {
1280        &self.config
1281    }
1282    fn processor_chain(&mut self) -> &mut ProcessorChain {
1283        &mut self.processor_chain
1284    }
1285
1286    async fn handshake(&mut self, offer: String, _timeout: Option<Duration>) -> Result<String> {
1287        self.set_remote_description(&offer)?;
1288        self.local_description()
1289    }
1290
1291    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
1292        self.set_remote_description(&answer).ok();
1293
1294        if self.ice_connectivity_check {
1295            self.try_ice_connectivity_check().await;
1296        }
1297        Ok(())
1298    }
1299
1300    async fn start(
1301        &self,
1302        event_sender: EventSender,
1303        packet_sender: TrackPacketSender,
1304    ) -> Result<()> {
1305        let track_id = self.track_id.clone();
1306        let rtcp_socket = self.rtcp_socket.clone();
1307        let ssrc = self.ssrc;
1308        let rtp_socket = self.rtp_socket.clone();
1309        let processor_chain = self.processor_chain.clone();
1310        let token = self.cancel_token.clone();
1311        let ssrc_cname = self.ssrc_cname.clone();
1312        let start_time = crate::media::get_timestamp();
1313        let ptime = self.config.ptime;
1314
1315        // Send ICE connectivity check if enabled and remote address is available
1316        if self.ice_connectivity_check {
1317            self.try_ice_connectivity_check().await;
1318        }
1319
1320        let inner = self.inner.clone();
1321
1322        tokio::spawn(async move {
1323            select! {
1324                _ = token.cancelled() => {
1325                    debug!(track_id, "RTP processor task cancelled");
1326                },
1327                _ = Self::send_rtcp_reports(inner.clone(),track_id.clone(), token.clone(), &rtcp_socket, ssrc, ssrc_cname) => {
1328                }
1329                _ = Self::recv_rtp_packets(
1330                    inner.clone(),
1331                    ptime,
1332                    rtp_socket,
1333                    track_id.clone(),
1334                    processor_chain,
1335                    packet_sender,
1336                    rtcp_socket.clone(),
1337                    ssrc,
1338                ) => {
1339                }
1340            };
1341            let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1342            // send rtcp bye packet
1343            match remote_rtcp_addr {
1344                Some(ref addr) => {
1345                    let pkts = vec![Box::new(Goodbye {
1346                        sources: vec![ssrc],
1347                        reason: "end of call".into(),
1348                    })
1349                        as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1350                    if let Ok(data) = webrtc::rtcp::packet::marshal(&pkts) {
1351                        if let Err(e) = rtcp_socket.send_raw(&data, addr).await {
1352                            warn!(track_id, "Failed to send RTCP goodbye packet: {}", e);
1353                        }
1354                    }
1355                }
1356                None => {}
1357            }
1358            info!(track_id, "RTP processor completed");
1359            event_sender
1360                .send(SessionEvent::TrackEnd {
1361                    track_id,
1362                    timestamp: crate::media::get_timestamp(),
1363                    duration: crate::media::get_timestamp() - start_time,
1364                    ssrc,
1365                    play_id: None,
1366                })
1367                .ok();
1368        });
1369
1370        Ok(())
1371    }
1372
1373    async fn stop(&self) -> Result<()> {
1374        self.cancel_token.cancel();
1375        Ok(())
1376    }
1377
1378    async fn send_packet(&self, packet: &AudioFrame) -> Result<()> {
1379        let remote_addr = match self.inner.lock().unwrap().remote_addr.clone() {
1380            Some(addr) => addr,
1381            None => return Ok(()),
1382        };
1383        let stats = self.inner.lock().unwrap().stats.clone();
1384
1385        let (payload_type, payload) = self
1386            .encoder
1387            .encode(self.inner.lock().unwrap().payload_type, packet.clone());
1388        if payload.is_empty() {
1389            return Ok(());
1390        }
1391
1392        let clock_rate = match payload_type {
1393            9 => 8000,    // G.722 (RTP clock rate is 8000 even though sample rate is 16000)
1394            111 => 48000, // Opus
1395            _ => 8000,
1396        };
1397
1398        let now = crate::media::get_timestamp();
1399        let last_update = stats.last_timestamp_update.load(Ordering::Relaxed);
1400        let mut skipped_packets: u32 = 0;
1401
1402        if last_update > 0 {
1403            let frame_duration_ms = self.config.ptime.as_millis() as u64;
1404            if frame_duration_ms > 0 {
1405                let delta_ms = now.saturating_sub(last_update);
1406                let delta_frames = delta_ms / frame_duration_ms;
1407                let prospective_skip = delta_frames.saturating_sub(1);
1408
1409                if prospective_skip >= RTP_RESYNC_MIN_SKIP_PACKETS as u64 {
1410                    let last_resync = stats.last_resync_ts.load(Ordering::Relaxed);
1411                    let cooldown_ms = frame_duration_ms.saturating_mul(RTP_RESYNC_COOLDOWN_FRAMES);
1412                    if last_resync == 0 || now.saturating_sub(last_resync) >= cooldown_ms {
1413                        skipped_packets = prospective_skip.min(u32::MAX as u64) as u32;
1414                        debug!(
1415                            track_id = self.track_id,
1416                            delta_ms, skipped_packets, "Resyncing RTP timestamp"
1417                        );
1418                        for _ in 0..skipped_packets {
1419                            self.sequencer.next_sequence_number();
1420                        }
1421                        stats.last_resync_ts.store(now, Ordering::Relaxed);
1422                    }
1423                }
1424            }
1425        }
1426
1427        stats.last_timestamp_update.store(now, Ordering::Relaxed);
1428
1429        let samples_per_packet = (clock_rate as f64 * self.config.ptime.as_secs_f64()) as u32;
1430        let packets = match self
1431            .inner
1432            .lock()
1433            .unwrap()
1434            .packetizer
1435            .lock()
1436            .unwrap()
1437            .as_mut()
1438        {
1439            Some(p) => {
1440                if skipped_packets > 0 {
1441                    let skip_samples = (skipped_packets as u64)
1442                        .saturating_mul(samples_per_packet as u64)
1443                        .min(u32::MAX as u64) as u32;
1444                    p.skip_samples(skip_samples);
1445                }
1446                p.packetize(&Bytes::from_owner(payload), samples_per_packet)?
1447            }
1448            None => return Err(anyhow::anyhow!("Packetizer not set")),
1449        };
1450        for mut packet in packets {
1451            packet.header.marker = false;
1452            packet.header.payload_type = payload_type;
1453            match packet.marshal() {
1454                Ok(ref rtp_data) => match self.rtp_socket.send_raw(rtp_data, &remote_addr).await {
1455                    Ok(_) => {
1456                        stats.update_send_stats(rtp_data.len() as u32, samples_per_packet);
1457                    }
1458                    Err(e) => {
1459                        warn!(track_id = self.track_id, "Failed to send RTP packet: {}", e);
1460                    }
1461                },
1462                Err(e) => {
1463                    warn!(
1464                        track_id = self.track_id,
1465                        "Failed to build RTP packet: {:?}", e
1466                    );
1467                    return Err(anyhow::anyhow!("Failed to build RTP packet"));
1468                }
1469            }
1470        }
1471        Ok(())
1472    }
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477    use super::*;
1478
1479    #[test]
1480    fn test_rtp_track_stats_new() {
1481        let stats = RtpTrackStats::new();
1482        assert_eq!(stats.packet_count.load(Ordering::Relaxed), 0);
1483        assert_eq!(stats.octet_count.load(Ordering::Relaxed), 0);
1484        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 0);
1485        assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1486        assert_eq!(stats.jitter.load(Ordering::Relaxed), 0);
1487    }
1488
1489    #[test]
1490    fn test_update_send_stats() {
1491        let stats = RtpTrackStats::new();
1492        stats.update_send_stats(1200, 160);
1493
1494        assert_eq!(stats.packet_count.load(Ordering::Relaxed), 1);
1495        assert_eq!(stats.octet_count.load(Ordering::Relaxed), 1200);
1496        assert_eq!(stats.timestamp.load(Ordering::Relaxed), 160);
1497
1498        // Test multiple updates
1499        stats.update_send_stats(800, 160);
1500        assert_eq!(stats.packet_count.load(Ordering::Relaxed), 2);
1501        assert_eq!(stats.octet_count.load(Ordering::Relaxed), 2000);
1502        assert_eq!(stats.timestamp.load(Ordering::Relaxed), 320);
1503    }
1504
1505    #[test]
1506    fn test_update_receive_stats() {
1507        let stats = RtpTrackStats::new();
1508
1509        // First packet
1510        stats.update_receive_stats(1000, 160);
1511        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 1);
1512        assert_eq!(stats.received_octets.load(Ordering::Relaxed), 160);
1513        assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1000);
1514        assert_eq!(stats.base_seq.load(Ordering::Relaxed), 1000);
1515        assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1000);
1516        assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 1);
1517        assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1518
1519        // Second packet with gap
1520        stats.update_receive_stats(1002, 160);
1521        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 2);
1522        assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1002);
1523        assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1002);
1524        assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 1);
1525        assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 3);
1526    }
1527
1528    #[test]
1529    fn test_get_fraction_lost() {
1530        let stats = RtpTrackStats::new();
1531
1532        // No packets - should return 0
1533        assert_eq!(stats.get_fraction_lost(), 0);
1534
1535        // Set some loss
1536        stats.expected_packets.store(100, Ordering::Relaxed);
1537        stats.lost_packets.store(5, Ordering::Relaxed);
1538
1539        let fraction_lost = stats.get_fraction_lost();
1540        assert_eq!(fraction_lost, 12); // (5 * 256) / 100 = 12.8 -> 12
1541
1542        // Test maximum loss
1543        stats.lost_packets.store(100, Ordering::Relaxed);
1544        assert_eq!(stats.get_fraction_lost(), 255); // Should cap at 255
1545    }
1546
1547    #[test]
1548    fn test_store_sr_info() {
1549        let stats = RtpTrackStats::new();
1550        stats.store_sr_info(123456, 789012);
1551
1552        assert_eq!(stats.last_sr_timestamp.load(Ordering::Relaxed), 123456);
1553        assert_eq!(stats.last_sr_ntp.load(Ordering::Relaxed), 789012);
1554    }
1555
1556    #[tokio::test]
1557    async fn test_parse_pjsip_sdp() {
1558        let sdp = r#"v=0
1559o=- 3954304612 3954304613 IN IP4 192.168.1.202
1560s=pjmedia
1561b=AS:117
1562t=0 0
1563a=X-nat:3
1564m=audio 4002 RTP/AVP 9 101
1565c=IN IP4 192.168.1.202
1566b=TIAS:96000
1567a=rtcp:4003 IN IP4 192.168.1.202
1568a=sendrecv
1569a=rtpmap:9 G722/8000
1570a=ssrc:1089147397 cname:61753255553b9c6f
1571a=rtpmap:101 telephone-event/8000
1572a=fmtp:101 0-16"#;
1573        let rtp_track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1574            .build()
1575            .await
1576            .expect("Failed to build rtp track");
1577        rtp_track
1578            .set_remote_description(sdp)
1579            .expect("Failed to set remote description");
1580        let inner = rtp_track.inner.lock().unwrap();
1581        assert_eq!(inner.payload_type, 9);
1582        assert!(!inner.rtcp_mux); // RTCP is on separate port
1583    }
1584
1585    #[tokio::test]
1586    async fn test_parse_rtcp_mux() {
1587        let answer = r#"v=0
1588o=- 723884243 723884244 IN IP4 11.22.33.44
1589s=-
1590c=IN IP4 11.22.33.44
1591t=0 0
1592m=audio 10638 RTP/AVP 8 101
1593a=rtpmap:8 PCMA/8000
1594a=rtpmap:101 telephone-event/8000
1595a=fmtp:101 0-15
1596a=sendrecv
1597a=rtcp-mux"#;
1598        let mut reader = Cursor::new(answer);
1599        let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1600        let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1601        assert!(peer_media.rtcp_mux);
1602        assert_eq!(peer_media.rtcp_port, 10638);
1603    }
1604
1605    #[tokio::test]
1606    async fn test_parse_linphone_candidate() {
1607        let answer = r#"v=0
1608o=mpi 2590 792 IN IP4 192.168.3.181
1609s=Talk
1610c=IN IP4 192.168.3.181
1611t=0 0
1612a=ice-pwd:96adb77560869c783656fe0a
1613a=ice-ufrag:409dfd53
1614a=rtcp-xr:rcvr-rtt=all:10000 stat-summary=loss,dup,jitt,TTL voip-metrics
1615a=record:off
1616m=audio 61794 RTP/AVP 8 101
1617c=IN IP4 115.205.103.101
1618a=rtpmap:101 telephone-event/8000
1619a=rtcp:50735
1620a=candidate:1 1 UDP 2130706303 192.168.3.181 61794 typ host
1621a=candidate:1 2 UDP 2130706302 192.168.3.181 50735 typ host
1622a=candidate:2 1 UDP 1694498687 115.205.103.101 61794 typ srflx raddr 192.168.3.181 rport 61794
1623a=candidate:2 2 UDP 1694498686 115.205.103.101 50735 typ srflx raddr 192.168.3.181 rport 50735
1624a=rtcp-fb:* trr-int 5000
1625a=rtcp-fb:* ccm tmmbr"#;
1626        let mut reader = Cursor::new(answer);
1627        let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1628        let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1629        assert_eq!(peer_media.rtp_addr, "192.168.3.181");
1630    }
1631
1632    #[tokio::test]
1633    async fn test_rtp_track_builder() {
1634        let track_id = "test_track".to_string();
1635        let config = TrackConfig::default();
1636
1637        let track = RtpTrackBuilder::new(track_id.clone(), config)
1638            .with_rtp_start_port(20000)
1639            .with_rtp_end_port(20100)
1640            .with_session_name("test_session".to_string())
1641            .build()
1642            .await
1643            .expect("Failed to build track");
1644
1645        assert_eq!(track.track_id, track_id);
1646        // SSRC is randomly generated in build(), so we can't predict exact value
1647        assert_ne!(track.ssrc, 0); // Should not be zero
1648        assert_eq!(track.ssrc_cname, "test_session");
1649        let inner = track.inner.lock().unwrap();
1650        assert!(inner.rtcp_mux);
1651    }
1652
1653    #[tokio::test]
1654    async fn test_local_description_generation() {
1655        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1656            .build()
1657            .await
1658            .expect("Failed to build track");
1659
1660        let local_desc = track
1661            .local_description()
1662            .expect("Failed to generate local description");
1663
1664        // Verify SDP contains expected elements
1665        assert!(local_desc.contains("m=audio"));
1666        assert!(local_desc.contains("RTP/AVP"));
1667        assert!(local_desc.contains("a=rtcp-mux")); // Should have rtcp-mux by default
1668        assert!(local_desc.contains("a=sendrecv"));
1669        assert!(local_desc.contains(&format!("a=ssrc:{}", track.ssrc)));
1670    }
1671
1672    #[tokio::test]
1673    async fn test_double_set_remote_description() {
1674        let sdp = r#"v=0
1675o=- 123 124 IN IP4 192.168.1.1
1676s=-
1677c=IN IP4 192.168.1.1
1678t=0 0
1679m=audio 5004 RTP/AVP 0
1680a=rtpmap:0 PCMU/8000"#;
1681
1682        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1683            .build()
1684            .await
1685            .expect("Failed to build track");
1686
1687        // First call should succeed
1688        assert!(track.set_remote_description(sdp).is_ok());
1689        assert!(track.remote_description().is_some());
1690
1691        // Second call should be ignored (no error)
1692        assert!(track.set_remote_description(sdp).is_ok());
1693    }
1694
1695    #[tokio::test]
1696    async fn test_invalid_sdp() {
1697        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1698            .build()
1699            .await
1700            .expect("Failed to build track");
1701
1702        // Invalid SDP without audio media
1703        let invalid_sdp = r#"v=0
1704o=- 123 124 IN IP4 192.168.1.1
1705s=-
1706c=IN IP4 192.168.1.1
1707t=0 0"#;
1708
1709        assert!(track.set_remote_description(invalid_sdp).is_err());
1710    }
1711
1712    #[tokio::test]
1713    async fn test_dtmf_digit_mapping() {
1714        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1715            .build()
1716            .await
1717            .expect("Failed to build track");
1718
1719        // Test valid digits - these should not panic during mapping
1720        let valid_digits = [
1721            "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "*", "#", "A", "B", "C", "D",
1722        ];
1723
1724        for digit in &valid_digits {
1725            // Since we don't have remote address set, this will fail with "Remote address not set"
1726            // but it shouldn't fail on digit mapping
1727            let result = track.send_dtmf(digit, Some(100)).await;
1728            assert!(result.is_err());
1729            let error_msg = result.unwrap_err().to_string();
1730            assert!(error_msg.contains("Remote address not set"));
1731        }
1732
1733        // Test invalid digit
1734        let result = track.send_dtmf("X", Some(100)).await;
1735        assert!(result.is_err());
1736        let error_msg = result.unwrap_err().to_string();
1737        assert!(error_msg.contains("Invalid DTMF digit"));
1738    }
1739
1740    #[test]
1741    fn test_rtcp_packet_type_detection() {
1742        // Test RTCP packet type ranges
1743        assert!(200 >= 200 && 200 <= 207); // SR
1744        assert!(201 >= 200 && 201 <= 207); // RR
1745        assert!(202 >= 200 && 202 <= 207); // SDES
1746        assert!(203 >= 200 && 203 <= 207); // BYE
1747        assert!(204 >= 200 && 204 <= 207); // APP
1748
1749        // Test RTP payload type extraction
1750        let rtp_byte = 0b10001001; // Version 2, PT 9
1751        let version = (rtp_byte >> 6) & 0x03;
1752        let pt = rtp_byte & 0x7F;
1753
1754        assert_eq!(version, 2);
1755        assert_eq!(pt, 9);
1756    }
1757
1758    #[test]
1759    fn test_stun_magic_cookie_detection() {
1760        let stun_magic_cookie = STUN_MAGIC_COOKIE;
1761        let bytes = stun_magic_cookie.to_be_bytes();
1762        let reconstructed = u32::from_be_bytes(bytes);
1763
1764        assert_eq!(reconstructed, stun_magic_cookie);
1765    }
1766
1767    #[tokio::test]
1768    async fn test_track_ssrc_and_id() {
1769        let track_id = "unique_track_123".to_string();
1770        let custom_ssrc = 0x12345678;
1771
1772        let track = RtpTrackBuilder::new(track_id.clone(), TrackConfig::default())
1773            .with_ssrc(custom_ssrc)
1774            .build()
1775            .await
1776            .expect("Failed to build track");
1777
1778        // Note: build() overrides SSRC with random value, so we test the builder method separately
1779        let builder =
1780            RtpTrackBuilder::new(track_id.clone(), TrackConfig::default()).with_ssrc(custom_ssrc);
1781        assert_eq!(builder.ssrc, custom_ssrc);
1782        assert_eq!(track.id(), &track_id);
1783    }
1784
1785    #[test]
1786    fn test_codec_type_payload_mapping() {
1787        // Test common codec payload types
1788        assert_eq!(CodecType::PCMU.payload_type(), 0);
1789        assert_eq!(CodecType::G722.payload_type(), 9);
1790        assert_eq!(CodecType::PCMA.payload_type(), 8);
1791        assert_eq!(CodecType::TelephoneEvent.payload_type(), 101);
1792    }
1793
1794    #[tokio::test]
1795    async fn test_stats_initialization() {
1796        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1797            .build()
1798            .await
1799            .expect("Failed to build track");
1800        let inner = track.inner.lock().unwrap();
1801        // Verify stats are properly initialized
1802        assert_eq!(inner.stats.packet_count.load(Ordering::Relaxed), 0);
1803        assert_eq!(inner.stats.octet_count.load(Ordering::Relaxed), 0);
1804        assert_eq!(inner.stats.received_packets.load(Ordering::Relaxed), 0);
1805        assert_eq!(inner.stats.lost_packets.load(Ordering::Relaxed), 0);
1806        assert_eq!(inner.stats.highest_seq_num.load(Ordering::Relaxed), 0);
1807        assert_eq!(inner.stats.jitter.load(Ordering::Relaxed), 0);
1808        assert_eq!(inner.stats.last_sr_timestamp.load(Ordering::Relaxed), 0);
1809        assert_eq!(inner.stats.last_sr_ntp.load(Ordering::Relaxed), 0);
1810        assert_eq!(inner.stats.base_seq.load(Ordering::Relaxed), 0);
1811        assert_eq!(inner.stats.last_receive_seq.load(Ordering::Relaxed), 0);
1812        assert_eq!(inner.stats.last_resync_ts.load(Ordering::Relaxed), 0);
1813    }
1814
1815    #[test]
1816    fn test_sequence_number_gap_calculation() {
1817        let stats = RtpTrackStats::new();
1818
1819        // Simulate receiving packets with gaps
1820        stats.update_receive_stats(1000, 160); // First packet
1821        stats.update_receive_stats(1002, 160); // Skip 1001
1822        stats.update_receive_stats(1003, 160); // Consecutive
1823        stats.update_receive_stats(1005, 160); // Skip 1004
1824
1825        assert_eq!(stats.received_packets.load(Ordering::Relaxed), 4);
1826        assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1005);
1827        // Loss calculation is simplified, so we just verify some loss is detected
1828        assert!(stats.lost_packets.load(Ordering::Relaxed) > 0);
1829    }
1830
1831    #[test]
1832    fn test_jitter_calculation() {
1833        let stats = RtpTrackStats::new();
1834
1835        // Test jitter calculation with sequence numbers
1836        stats.update_receive_stats(1000, 160);
1837        let _initial_jitter = stats.jitter.load(Ordering::Relaxed);
1838
1839        stats.update_receive_stats(1001, 160);
1840        let updated_jitter = stats.jitter.load(Ordering::Relaxed);
1841
1842        // Jitter calculation is simplified and may not always change
1843        // Let's just verify it doesn't panic and stays within reasonable bounds
1844        assert!(updated_jitter < 1000); // Should be reasonable value
1845    }
1846
1847    #[test]
1848    fn test_builder_with_custom_ssrc() {
1849        let custom_ssrc = 0x12345678u32;
1850        let builder =
1851            RtpTrackBuilder::new("test".to_string(), TrackConfig::default()).with_ssrc(custom_ssrc);
1852
1853        // Verify builder stores the custom SSRC
1854        assert_eq!(builder.ssrc, custom_ssrc);
1855        assert_eq!(builder.ssrc_cname, format!("rustpbx-{}", custom_ssrc));
1856    }
1857
1858    #[test]
1859    fn test_builder_configuration() {
1860        let builder = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1861            .with_rtp_start_port(10000)
1862            .with_rtp_end_port(20000)
1863            .with_rtp_alloc_count(100)
1864            .with_rtcp_mux(false)
1865            .with_session_name("custom_session".to_string());
1866
1867        assert_eq!(builder.rtp_start_port, 10000);
1868        assert_eq!(builder.rtp_end_port, 20000);
1869        assert_eq!(builder.rtp_alloc_count, 100);
1870        assert!(!builder.rtcp_mux);
1871        assert_eq!(builder.ssrc_cname, "custom_session");
1872    }
1873
1874    #[tokio::test]
1875    async fn test_ice_connectivity_check_enabled_by_default() {
1876        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1877            .build()
1878            .await
1879            .expect("Failed to build track");
1880
1881        assert!(track.ice_connectivity_check); // Should be enabled by default
1882    }
1883
1884    #[tokio::test]
1885    async fn test_ice_connectivity_check_can_be_disabled() {
1886        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1887            .with_ice_connectivity_check(false)
1888            .build()
1889            .await
1890            .expect("Failed to build track");
1891
1892        assert!(!track.ice_connectivity_check);
1893    }
1894
1895    #[tokio::test]
1896    async fn test_maybe_update_remote_addr_private_peer() {
1897        let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1898            .build()
1899            .await
1900            .expect("Failed to build track");
1901        let inner = track.inner.clone();
1902
1903        let private_addr = SipAddr {
1904            addr: HostWithPort {
1905                host: "192.168.0.10".parse().expect("host"),
1906                port: Some(4000.into()),
1907            },
1908            r#type: Some(rsip::transport::Transport::Udp),
1909        };
1910
1911        let public_addr = SipAddr {
1912            addr: HostWithPort {
1913                host: "203.0.113.5".parse().expect("host"),
1914                port: Some(5004.into()),
1915            },
1916            r#type: Some(rsip::transport::Transport::Udp),
1917        };
1918
1919        {
1920            let mut guard = inner.lock().expect("lock");
1921            guard.remote_addr = Some(private_addr.clone());
1922            guard.remote_rtcp_addr = Some(private_addr.clone());
1923            guard.rtcp_mux = true;
1924        }
1925
1926        let updated = RtpTrack::maybe_update_remote_addr(
1927            &inner,
1928            &public_addr,
1929            false,
1930            &track.track_id,
1931            "test",
1932        );
1933
1934        assert!(updated);
1935        let guard = inner.lock().expect("lock");
1936        assert_eq!(
1937            guard
1938                .remote_addr
1939                .as_ref()
1940                .expect("remote")
1941                .addr
1942                .host
1943                .to_string(),
1944            "203.0.113.5"
1945        );
1946        assert_eq!(
1947            guard
1948                .remote_rtcp_addr
1949                .as_ref()
1950                .expect("rtcp")
1951                .addr
1952                .host
1953                .to_string(),
1954            "203.0.113.5"
1955        );
1956    }
1957
1958    #[test]
1959    fn test_stun_packet_structure() {
1960        // Test STUN constants
1961        assert_eq!(STUN_BINDING_REQUEST, 0x0001);
1962        assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442);
1963        assert_eq!(STUN_TRANSACTION_ID_SIZE, 12);
1964
1965        // Test STUN packet construction would be valid
1966        let mut packet = vec![0u8; 20];
1967        packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
1968        packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
1969
1970        // Verify message type
1971        let msg_type = u16::from_be_bytes([packet[0], packet[1]]);
1972        assert_eq!(msg_type, STUN_BINDING_REQUEST);
1973
1974        // Verify magic cookie
1975        let magic = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]);
1976        assert_eq!(magic, STUN_MAGIC_COOKIE);
1977    }
1978
1979    #[tokio::test]
1980    async fn test_ice_connectivity_check_builder_method() {
1981        let builder_enabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1982            .with_ice_connectivity_check(true);
1983        assert!(builder_enabled.ice_connectivity_check);
1984
1985        let builder_disabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1986            .with_ice_connectivity_check(false);
1987        assert!(!builder_disabled.ice_connectivity_check);
1988    }
1989
1990    #[test]
1991    fn test_ice_connectivity_terminology() {
1992        // Verify we're using correct ICE terminology
1993        // ICE connectivity checks use STUN Binding Requests
1994        // This is part of the ICE (Interactive Connectivity Establishment) standard
1995
1996        // The purpose is:
1997        // 1. NAT traversal and hole punching
1998        // 2. Connectivity verification
1999        // 3. Keep-alive for NAT bindings
2000        // 4. Path validation
2001
2002        assert_eq!(STUN_BINDING_REQUEST, 0x0001); // RFC 5389
2003        assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442); // RFC 5389
2004    }
2005}