1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::Samples,
6 media::TrackId,
7 media::{
8 jitter::JitterBuffer,
9 negotiate::select_peer_media,
10 processor::ProcessorChain,
11 track::{Track, TrackConfig, TrackPacketSender},
12 },
13};
14use anyhow::Result;
15use async_trait::async_trait;
16use audio_codec::CodecType;
17use bytes::Bytes;
18use rsip::HostWithPort;
19use rsipstack::transport::{SipAddr, udp::UdpConnection};
20use std::{
21 io::Cursor,
22 net::{IpAddr, SocketAddr},
23 sync::{
24 Arc, Mutex,
25 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
26 },
27 time::Duration,
28};
29use tokio::{select, time::Instant, time::interval_at};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, info, trace, warn};
32use webrtc::{
33 rtcp::{
34 goodbye::Goodbye,
35 receiver_report::ReceiverReport,
36 reception_report::ReceptionReport,
37 sender_report::SenderReport,
38 source_description::{
39 SdesType, SourceDescription, SourceDescriptionChunk, SourceDescriptionItem,
40 },
41 },
42 rtp::{
43 codecs::g7xx::G7xxPayloader,
44 packet::Packet,
45 packetizer::{Packetizer, new_packetizer},
46 sequence::{Sequencer, new_random_sequencer},
47 },
48 sdp::{
49 MediaDescription, SessionDescription,
50 description::{
51 common::{Address, Attribute, ConnectionInformation},
52 media::{MediaName, RangedPort},
53 session::{
54 ATTR_KEY_RTCPMUX, ATTR_KEY_SEND_ONLY, ATTR_KEY_SEND_RECV, ATTR_KEY_SSRC, Origin,
55 TimeDescription, Timing,
56 },
57 },
58 },
59 util::{Marshal, Unmarshal},
60};
61const RTP_MTU: usize = 1500; const RTP_OUTBOUND_MTU: usize = 1200; const RTCP_SR_INTERVAL_MS: u64 = 5000; const DTMF_EVENT_DURATION_MS: u64 = 160; const DTMF_EVENT_VOLUME: u8 = 10; const RTP_RESYNC_MIN_SKIP_PACKETS: u32 = 3; const RTP_RESYNC_COOLDOWN_FRAMES: u64 = 3; const STUN_BINDING_REQUEST: u16 = 0x0001;
71const STUN_BINDING_RESPONSE: u16 = 0x0101;
72const STUN_MAGIC_COOKIE: u32 = 0x2112A442;
73const STUN_TRANSACTION_ID_SIZE: usize = 12;
74
75struct RtpTrackStats {
76 timestamp: Arc<AtomicU32>,
77 packet_count: Arc<AtomicU32>,
78 octet_count: Arc<AtomicU32>,
79 last_timestamp_update: Arc<AtomicU64>,
80 last_resync_ts: Arc<AtomicU64>,
81 received_packets: Arc<AtomicU32>,
82 received_octets: Arc<AtomicU32>,
83 expected_packets: Arc<AtomicU32>,
84 lost_packets: Arc<AtomicU32>,
85 highest_seq_num: Arc<AtomicU32>,
86 base_seq: Arc<AtomicU32>,
87 last_receive_seq: Arc<AtomicU32>,
88 jitter: Arc<AtomicU32>,
89 last_sr_timestamp: Arc<AtomicU64>,
90 last_sr_ntp: Arc<AtomicU64>,
91}
92
93impl RtpTrackStats {
94 fn new() -> Self {
95 Self {
96 timestamp: Arc::new(AtomicU32::new(0)),
97 packet_count: Arc::new(AtomicU32::new(0)),
98 octet_count: Arc::new(AtomicU32::new(0)),
99 last_timestamp_update: Arc::new(AtomicU64::new(0)),
100 last_resync_ts: Arc::new(AtomicU64::new(0)),
101 received_packets: Arc::new(AtomicU32::new(0)),
102 received_octets: Arc::new(AtomicU32::new(0)),
103 expected_packets: Arc::new(AtomicU32::new(0)),
104 lost_packets: Arc::new(AtomicU32::new(0)),
105 highest_seq_num: Arc::new(AtomicU32::new(0)),
106 base_seq: Arc::new(AtomicU32::new(0)),
107 last_receive_seq: Arc::new(AtomicU32::new(0)),
108 jitter: Arc::new(AtomicU32::new(0)),
109 last_sr_timestamp: Arc::new(AtomicU64::new(0)),
110 last_sr_ntp: Arc::new(AtomicU64::new(0)),
111 }
112 }
113
114 fn update_send_stats(&self, packet_len: u32, samples_per_packet: u32) {
115 self.packet_count.fetch_add(1, Ordering::Relaxed);
116 self.octet_count.fetch_add(packet_len, Ordering::Relaxed);
117 self.timestamp
118 .fetch_add(samples_per_packet, Ordering::Relaxed);
119 }
120
121 fn update_receive_stats(&self, seq_num: u32, payload_len: u32) {
122 let prev_received = self.received_packets.fetch_add(1, Ordering::Relaxed);
123 let received = prev_received + 1;
124 self.received_octets
125 .fetch_add(payload_len, Ordering::Relaxed);
126
127 if prev_received == 0 {
128 self.base_seq.store(seq_num, Ordering::Relaxed);
129 self.last_receive_seq.store(seq_num, Ordering::Relaxed);
130 self.highest_seq_num.store(seq_num, Ordering::Relaxed);
131 self.lost_packets.store(0, Ordering::Relaxed);
132 self.expected_packets.store(received, Ordering::Relaxed);
133 } else {
134 let last_seq = self.last_receive_seq.load(Ordering::Relaxed);
135 let gap = (seq_num as u16).wrapping_sub(last_seq as u16) as u32;
136
137 if gap > 0 && gap < 0x8000 {
138 if gap > 1 {
139 self.lost_packets.fetch_add(gap - 1, Ordering::Relaxed);
140 }
141 self.last_receive_seq.store(seq_num, Ordering::Relaxed);
142 self.highest_seq_num.store(seq_num, Ordering::Relaxed);
143 }
144
145 let lost = self.lost_packets.load(Ordering::Relaxed);
146 self.expected_packets
147 .store(received + lost, Ordering::Relaxed);
148 }
149
150 let current_jitter = self.jitter.load(Ordering::Relaxed);
151 let new_jitter = (current_jitter + (seq_num % 100)) / 2;
152 self.jitter.store(new_jitter, Ordering::Relaxed);
153 }
154
155 fn store_sr_info(&self, rtp_time: u64, ntp_time: u64) {
156 self.last_sr_timestamp.store(rtp_time, Ordering::Relaxed);
157 self.last_sr_ntp.store(ntp_time, Ordering::Relaxed);
158 }
159
160 fn get_fraction_lost(&self) -> u8 {
161 let expected_packets = self.expected_packets.load(Ordering::Relaxed);
162 let lost_packets = self.lost_packets.load(Ordering::Relaxed);
163
164 if expected_packets > 0 {
165 ((lost_packets * 256) / expected_packets).min(255) as u8
166 } else {
167 0
168 }
169 }
170}
171
172pub struct RtpTrackBuilder {
173 cancel_token: Option<CancellationToken>,
174 track_id: TrackId,
175 config: TrackConfig,
176 local_addr: Option<IpAddr>,
177 external_addr: Option<IpAddr>,
178 rtp_socket: Option<UdpConnection>,
179 rtcp_socket: Option<UdpConnection>,
180 rtcp_mux: bool,
181 rtp_start_port: u16,
182 rtp_end_port: u16,
183 rtp_alloc_count: u32,
184 enabled_codecs: Vec<CodecType>,
185 ssrc_cname: String,
186 ssrc: u32,
187 ice_connectivity_check: bool,
188}
189pub struct RtpTrackInner {
190 dtmf_payload_type: u8,
191 payload_type: u8,
192 remote_description: Option<String>,
193 packetizer: Mutex<Option<Box<dyn Packetizer + Send + Sync>>>,
194 stats: Arc<RtpTrackStats>,
195 rtcp_mux: bool,
196 remote_addr: Option<SipAddr>,
197 remote_rtcp_addr: Option<SipAddr>,
198 enabled_codecs: Vec<CodecType>,
199 rtp_map: Vec<(u8, (CodecType, u32, u16))>,
200}
201
202pub struct RtpTrack {
203 ssrc: u32,
204 ssrc_cname: String,
205 track_id: TrackId,
206 config: TrackConfig,
207 cancel_token: CancellationToken,
208 processor_chain: ProcessorChain,
209 rtp_socket: UdpConnection,
210 rtcp_socket: UdpConnection,
211 encoder: TrackCodec,
212 sequencer: Box<dyn Sequencer + Send + Sync>,
213 sendrecv: AtomicBool,
214 ice_connectivity_check: bool,
215 inner: Arc<Mutex<RtpTrackInner>>,
216}
217
218enum PacketKind {
219 Rtp,
220 Rtcp,
221 Stun(u16),
222 Ignore,
223}
224impl RtpTrackBuilder {
225 pub fn new(track_id: TrackId, config: TrackConfig) -> Self {
226 let ssrc = rand::random::<u32>();
227 Self {
228 track_id,
229 config,
230 local_addr: None,
231 external_addr: None,
232 cancel_token: None,
233 rtp_socket: None,
234 rtcp_socket: None,
235 rtcp_mux: true,
236 rtp_start_port: 12000,
237 rtp_end_port: u16::MAX - 1,
238 rtp_alloc_count: 500,
239 enabled_codecs: vec![
240 #[cfg(feature = "opus")]
241 CodecType::Opus,
242 CodecType::G729,
243 CodecType::G722,
244 CodecType::PCMU,
245 CodecType::PCMA,
246 CodecType::TelephoneEvent,
247 ],
248 ssrc_cname: format!("rustpbx-{}", ssrc),
249 ssrc,
250 ice_connectivity_check: true, }
252 }
253
254 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
255 self.ssrc = ssrc;
256 self.ssrc_cname = format!("rustpbx-{}", ssrc);
257 self
258 }
259
260 pub fn with_rtp_start_port(mut self, rtp_start_port: u16) -> Self {
261 self.rtp_start_port = rtp_start_port;
262 self
263 }
264 pub fn with_rtp_end_port(mut self, rtp_end_port: u16) -> Self {
265 self.rtp_end_port = rtp_end_port;
266 self
267 }
268 pub fn with_rtp_alloc_count(mut self, rtp_alloc_count: u32) -> Self {
269 self.rtp_alloc_count = rtp_alloc_count;
270 self
271 }
272 pub fn with_local_addr(mut self, local_addr: IpAddr) -> Self {
273 self.local_addr = Some(local_addr);
274 self
275 }
276
277 pub fn with_external_addr(mut self, external_addr: IpAddr) -> Self {
278 self.external_addr = Some(external_addr);
279 self
280 }
281
282 pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
283 self.cancel_token = Some(cancel_token);
284 self
285 }
286
287 pub fn with_rtp_socket(mut self, rtp_socket: UdpConnection) -> Self {
288 self.rtp_socket = Some(rtp_socket);
289 self
290 }
291 pub fn with_rtcp_socket(mut self, rtcp_socket: UdpConnection) -> Self {
292 self.rtcp_socket = Some(rtcp_socket);
293 self
294 }
295 pub fn with_rtcp_mux(mut self, rtcp_mux: bool) -> Self {
296 self.rtcp_mux = rtcp_mux;
297 self
298 }
299
300 pub fn with_enabled_codecs(mut self, enabled_codecs: Vec<CodecType>) -> Self {
301 self.enabled_codecs = enabled_codecs;
302 self
303 }
304 pub fn with_session_name(mut self, session_name: String) -> Self {
305 self.ssrc_cname = session_name;
306 self
307 }
308
309 pub fn with_ice_connectivity_check(mut self, enabled: bool) -> Self {
310 self.ice_connectivity_check = enabled;
311 self
312 }
313 pub async fn build_rtp_rtcp_conn(&self) -> Result<(UdpConnection, UdpConnection)> {
314 let addr = match self.local_addr {
315 Some(addr) => addr,
316 None => crate::net_tool::get_first_non_loopback_interface()?,
317 };
318 let mut rtp_conn = None;
319 let mut rtcp_conn = None;
320
321 for _ in 0..self.rtp_alloc_count {
322 let port = rand::random_range::<u16, _>(self.rtp_start_port..=self.rtp_end_port);
323 if port % 2 != 0 {
324 continue;
325 }
326 if let Ok(c) = UdpConnection::create_connection(
327 format!("{:?}:{}", addr, port).parse()?,
328 None,
329 self.cancel_token.clone(),
330 )
331 .await
332 {
333 if !self.rtcp_mux {
334 rtcp_conn = match UdpConnection::create_connection(
336 format!("{:?}:{}", addr, port + 1).parse()?,
337 None,
338 self.cancel_token.clone(),
339 )
340 .await
341 {
342 Ok(c) => Some(c),
343 Err(_) => {
344 continue;
345 }
346 };
347 } else {
348 rtcp_conn = Some(c.clone());
349 }
350 rtp_conn = Some(c);
351 break;
352 }
353 }
354
355 let mut rtp_conn = match rtp_conn {
356 Some(c) => c,
357 None => return Err(anyhow::anyhow!("failed to bind RTP socket")),
358 };
359 let mut rtcp_conn = match rtcp_conn {
360 Some(c) => c,
361 None => return Err(anyhow::anyhow!("failed to bind RTCP socket")),
362 };
363
364 if let Some(addr) = self.external_addr {
365 rtp_conn.external = Some(
366 SocketAddr::new(
367 addr,
368 *rtp_conn
369 .get_addr()
370 .addr
371 .port
372 .clone()
373 .unwrap_or_default()
374 .value(),
375 )
376 .into(),
377 );
378 rtcp_conn.external = Some(
379 SocketAddr::new(
380 addr,
381 *rtcp_conn
382 .get_addr()
383 .addr
384 .port
385 .clone()
386 .unwrap_or_default()
387 .value(),
388 )
389 .into(),
390 );
391 }
392 Ok((rtp_conn, rtcp_conn))
393 }
394
395 pub async fn build(mut self) -> Result<RtpTrack> {
396 let mut rtp_socket = self.rtp_socket.take();
397 let mut rtcp_socket = self.rtcp_socket.take();
398
399 if rtp_socket.is_none() || rtcp_socket.is_none() {
400 let (rtp_conn, rtcp_conn) = self.build_rtp_rtcp_conn().await?;
401 rtp_socket = Some(rtp_conn);
402 rtcp_socket = Some(rtcp_conn);
403 }
404 let cancel_token = self
405 .cancel_token
406 .unwrap_or_else(|| CancellationToken::new());
407 let processor_chain = ProcessorChain::new(self.config.samplerate);
408 let ssrc = if self.ssrc != 0 {
409 self.ssrc
410 } else {
411 loop {
412 let i = rand::random::<u32>();
413 if i % 2 == 0 {
414 break i;
415 }
416 }
417 };
418 let inner = RtpTrackInner {
419 dtmf_payload_type: 101, payload_type: 0, remote_description: None,
422 packetizer: Mutex::new(None),
423 stats: Arc::new(RtpTrackStats::new()),
424 rtcp_mux: self.rtcp_mux,
425 remote_addr: None,
426 remote_rtcp_addr: None,
427 enabled_codecs: self.enabled_codecs.clone(),
428 rtp_map: vec![],
429 };
430 let track = RtpTrack {
431 ssrc,
432 ssrc_cname: self.ssrc_cname.clone(),
433 track_id: self.track_id,
434 config: self.config,
435 cancel_token,
436 processor_chain,
437 rtp_socket: rtp_socket.unwrap(),
438 rtcp_socket: rtcp_socket.unwrap(),
439 encoder: TrackCodec::new(),
440 sequencer: Box::new(new_random_sequencer()),
441 sendrecv: AtomicBool::new(true),
442 ice_connectivity_check: self.ice_connectivity_check,
443 inner: Arc::new(Mutex::new(inner)),
444 };
445 Ok(track)
446 }
447}
448
449impl RtpTrack {
450 pub fn id(&self) -> &str {
451 &self.track_id
452 }
453
454 pub fn ssrc(&self) -> u32 {
455 self.ssrc
456 }
457
458 pub fn remote_description(&self) -> Option<String> {
459 self.inner.lock().unwrap().remote_description.clone()
460 }
461
462 pub fn set_rtp_map(&self, rtp_map: Vec<(u8, (CodecType, u32, u16))>) {
463 if let Ok(mut inner) = self.inner.lock() {
464 inner.rtp_map = rtp_map;
465 }
466 }
467
468 pub fn set_remote_description(&self, answer: &str) -> Result<()> {
469 let mut inner = self.inner.lock().unwrap();
470 let mut reader = Cursor::new(answer);
471 let sdp = SessionDescription::unmarshal(&mut reader)?;
472 let peer_media = match select_peer_media(&sdp, "audio") {
473 Some(peer_media) => peer_media,
474 None => return Err(anyhow::anyhow!("no audio media in answer SDP")),
475 };
476
477 inner.rtp_map = peer_media.rtp_map.clone();
478
479 if peer_media.codecs.is_empty() {
480 return Err(anyhow::anyhow!("no audio codecs in answer SDP"));
481 }
482
483 if peer_media.rtp_addr.is_empty() {
484 return Err(anyhow::anyhow!("no rtp addr in answer SDP"));
485 }
486
487 inner.remote_description.replace(answer.to_string());
488
489 let remote_addr = SipAddr {
490 addr: HostWithPort {
491 host: peer_media.rtp_addr.parse()?,
492 port: Some(peer_media.rtp_port.into()),
493 },
494 r#type: Some(rsip::transport::Transport::Udp),
495 };
496 let remote_rtcp_addr = SipAddr {
497 addr: HostWithPort {
498 host: peer_media.rtcp_addr.parse()?,
499 port: Some(peer_media.rtcp_port.into()),
500 },
501 r#type: Some(rsip::transport::Transport::Udp),
502 };
503 let codec_type = peer_media.codecs[0];
504 info!(
505 track_id = self.track_id,
506 rtcp_mux = peer_media.rtcp_mux,
507 %remote_addr,
508 %remote_rtcp_addr,
509 ?codec_type,
510 ssrc = self.ssrc,
511 "set remote description"
512 );
513
514 inner.payload_type = codec_type.payload_type();
515 inner.enabled_codecs = vec![codec_type];
516 for (payload_type, (codec, clock_rate, _)) in peer_media.rtp_map.iter() {
517 if *codec == codec_type {
518 inner.payload_type = *payload_type;
519 }
520
521 if codec == &CodecType::TelephoneEvent && clock_rate == &codec_type.clock_rate() {
522 inner.dtmf_payload_type = *payload_type;
523 }
524 }
525
526 inner.remote_addr.replace(remote_addr);
527 inner.remote_rtcp_addr.replace(remote_rtcp_addr);
528 inner.rtcp_mux = peer_media.rtcp_mux;
529
530 let payloader = match codec_type {
531 #[cfg(feature = "opus")]
532 CodecType::Opus => Box::<webrtc::rtp::codecs::opus::OpusPayloader>::default()
533 as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
534 _ => Box::<G7xxPayloader>::default()
535 as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
536 };
537
538 inner
539 .packetizer
540 .lock()
541 .unwrap()
542 .replace(Box::new(new_packetizer(
543 RTP_OUTBOUND_MTU,
544 inner.payload_type,
545 self.ssrc,
546 payloader,
547 self.sequencer.clone(),
548 codec_type.clock_rate(),
549 )));
550 Ok(())
551 }
552
553 pub fn local_description(&self) -> Result<String> {
554 let socketaddr: SocketAddr = self.rtp_socket.get_addr().addr.to_owned().try_into()?;
555 let mut sdp = SessionDescription::default();
556
557 sdp.version = 0;
559 sdp.origin = Origin {
560 username: "-".to_string(),
561 session_id: 0,
562 session_version: 0,
563 network_type: "IN".to_string(),
564 address_type: "IP4".to_string(),
565 unicast_address: socketaddr.ip().to_string(),
566 };
567 sdp.session_name = "-".to_string();
568 sdp.connection_information = Some(ConnectionInformation {
569 address_type: "IP4".to_string(),
570 network_type: "IN".to_string(),
571 address: Some(Address {
572 address: socketaddr.ip().to_string(),
573 ttl: None,
574 range: None,
575 }),
576 });
577 sdp.time_descriptions.push(TimeDescription {
578 timing: Timing {
579 start_time: 0,
580 stop_time: 0,
581 },
582 repeat_times: vec![],
583 });
584
585 let mut media = MediaDescription::default();
587 media.media_name = MediaName {
588 media: "audio".to_string(),
589 port: RangedPort {
590 value: socketaddr.port() as isize,
591 range: None,
592 },
593 protos: vec!["RTP".to_string(), "AVP".to_string()],
594 formats: vec![],
595 };
596 let inner = self.inner.lock().unwrap();
597 for codec in inner.enabled_codecs.iter() {
598 if codec == &CodecType::TelephoneEvent {
599 continue;
600 }
601 let mut payload_type = codec.payload_type();
603 for (payload_typ, (rtp_map_codec, _, _)) in inner.rtp_map.iter() {
604 if *rtp_map_codec == *codec {
605 payload_type = *payload_typ;
606 break;
607 }
608 }
609
610 media.media_name.formats.push(payload_type.to_string());
611 media.attributes.push(Attribute {
612 key: "rtpmap".to_string(),
613 value: Some(format!("{} {}", payload_type, codec.rtpmap())),
614 });
615 if let Some(fmtp) = codec.fmtp() {
616 media.attributes.push(Attribute {
617 key: "fmtp".to_string(),
618 value: Some(format!("{} {}", payload_type, fmtp)),
619 });
620 }
621 }
622
623 let has_8khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 8000);
626 let has_48khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 48000);
627
628 if has_8khz_codec {
629 let mut payload_type = 101;
631 for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
632 if *codec == CodecType::TelephoneEvent && *clock_rate == 8000 {
633 payload_type = *typ;
634 break;
635 }
636 }
637 media.media_name.formats.push(payload_type.to_string());
638 media.attributes.push(Attribute {
639 key: "rtpmap".to_string(),
640 value: Some(format!("{} telephone-event/8000", payload_type)),
641 });
642 media.attributes.push(Attribute {
643 key: "fmtp".to_string(),
644 value: Some(format!("{} 0-16", payload_type)),
645 });
646 }
647
648 if has_48khz_codec {
649 let mut payload_type = 97;
650 for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
651 if *codec == CodecType::TelephoneEvent && *clock_rate == 48000 {
652 payload_type = *typ;
653 break;
654 }
655 }
656
657 media.media_name.formats.push(payload_type.to_string());
658 media.attributes.push(Attribute {
659 key: "rtpmap".to_string(),
660 value: Some(format!("{} telephone-event/48000", payload_type)),
661 });
662 media.attributes.push(Attribute {
663 key: "fmtp".to_string(),
664 value: Some(format!("{} 0-16", payload_type)),
665 });
666 }
667
668 if inner.rtcp_mux {
670 media.attributes.push(Attribute {
671 key: ATTR_KEY_RTCPMUX.to_string(),
672 value: None,
673 });
674 }
675 media.attributes.push(Attribute {
676 key: ATTR_KEY_SSRC.to_string(),
677 value: Some(if self.ssrc_cname.is_empty() {
678 self.ssrc.to_string()
679 } else {
680 format!("{} cname:{}", self.ssrc, self.ssrc_cname)
681 }),
682 });
683 if self.sendrecv.load(Ordering::Relaxed) {
684 media.attributes.push(Attribute {
685 key: ATTR_KEY_SEND_RECV.to_string(),
686 value: None,
687 });
688 } else {
689 media.attributes.push(Attribute {
690 key: ATTR_KEY_SEND_ONLY.to_string(),
691 value: None,
692 });
693 }
694 media.attributes.push(Attribute {
695 key: "ptime".to_string(),
696 value: Some(format!("{}", self.config.ptime.as_millis())),
697 });
698 sdp.media_descriptions.push(media);
699 Ok(sdp.marshal())
700 }
701
702 pub async fn send_dtmf(&self, digit: &str, duration_ms: Option<u64>) -> Result<()> {
704 let event_code = match digit {
706 "0" => 0,
707 "1" => 1,
708 "2" => 2,
709 "3" => 3,
710 "4" => 4,
711 "5" => 5,
712 "6" => 6,
713 "7" => 7,
714 "8" => 8,
715 "9" => 9,
716 "*" => 10,
717 "#" => 11,
718 "A" => 12,
719 "B" => 13,
720 "C" => 14,
721 "D" => 15,
722 _ => return Err(anyhow::anyhow!("Invalid DTMF digit")),
723 };
724 let inner = self.inner.lock().unwrap();
725 let socket = &self.rtp_socket;
726 let remote_addr = match inner.remote_addr.as_ref() {
727 Some(addr) => addr.clone(),
728 None => return Err(anyhow::anyhow!("Remote address not set")),
729 };
730
731 let duration = duration_ms.unwrap_or(DTMF_EVENT_DURATION_MS);
733
734 let num_packets = (duration as f64 / self.config.ptime.as_millis() as f64).ceil() as u32;
737
738 let samples_per_packet =
740 (self.config.samplerate as f64 * self.config.ptime.as_secs_f64()) as u32;
741
742 let now = crate::media::get_timestamp();
743 inner
744 .stats
745 .last_timestamp_update
746 .store(now, Ordering::Relaxed);
747
748 for i in 0..num_packets {
750 let is_end = i == num_packets - 1;
751 let event_duration = i * (self.config.ptime.as_millis() as u32 * 8); let mut payload = vec![0u8; 4];
756 payload[0] = event_code;
757 payload[1] = DTMF_EVENT_VOLUME & 0x3F; if is_end {
759 payload[1] |= 0x80; }
761
762 payload[2] = ((event_duration >> 8) & 0xFF) as u8;
764 payload[3] = (event_duration & 0xFF) as u8;
765
766 let packets = match inner.packetizer.lock().unwrap().as_mut() {
767 Some(p) => p.packetize(&Bytes::from_owner(payload), samples_per_packet)?,
768 None => return Err(anyhow::anyhow!("Packetizer not set")),
769 };
770 for mut packet in packets {
771 packet.header.payload_type = inner.dtmf_payload_type;
772 packet.header.marker = false;
773
774 match packet.marshal() {
775 Ok(ref rtp_data) => {
776 match socket.send_raw(rtp_data, &remote_addr).await {
777 Ok(_) => {}
778 Err(e) => {
779 warn!("Failed to send DTMF RTP packet: {}", e);
780 }
781 }
782
783 inner.stats.packet_count.fetch_add(1, Ordering::Relaxed);
785 inner
786 .stats
787 .octet_count
788 .fetch_add(rtp_data.len() as u32, Ordering::Relaxed);
789
790 if !is_end {
792 tokio::time::sleep(self.config.ptime).await;
793 }
794 }
795 Err(e) => {
796 warn!("Failed to create DTMF RTP packet: {:?}", e);
797 continue;
798 }
799 }
800 }
801 }
802
803 inner
805 .stats
806 .timestamp
807 .fetch_add(samples_per_packet * num_packets, Ordering::Relaxed);
808
809 Ok(())
810 }
811
812 async fn send_ice_connectivity_check(
814 socket: &UdpConnection,
815 remote_addr: &SipAddr,
816 ) -> Result<()> {
817 let mut stun_packet = vec![0u8; 20]; stun_packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
819 stun_packet[2..4].copy_from_slice(&0u16.to_be_bytes());
820 stun_packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
821 let transaction_id: [u8; STUN_TRANSACTION_ID_SIZE] = rand::random();
822 stun_packet[8..20].copy_from_slice(&transaction_id);
823
824 socket.send_raw(&stun_packet, remote_addr).await.ok();
825 Ok(())
826 }
827
828 async fn handle_rtcp_packet(
829 track_id: &TrackId,
830 buf: &[u8],
831 n: usize,
832 stats: &Arc<RtpTrackStats>,
833 ssrc: u32,
834 ) -> Result<()> {
835 use webrtc::rtcp::packet::unmarshal;
836
837 let mut buf_slice = &buf[0..n];
838 let packets = match unmarshal(&mut buf_slice) {
839 Ok(packets) => packets,
840 Err(e) => {
841 warn!(track_id, "Failed to parse RTCP packet: {:?}", e);
842 return Ok(());
843 }
844 };
845
846 for packet in packets {
847 if let Some(sr) = packet.as_any().downcast_ref::<SenderReport>() {
848 stats.store_sr_info(sr.rtp_time as u64, sr.ntp_time);
849 info!(
850 track_id,
851 ssrc = sr.ssrc,
852 packet_count = sr.packet_count,
853 octet_count = sr.octet_count,
854 rtp_time = sr.rtp_time,
855 "Received SR"
856 );
857 } else if let Some(rr) = packet.as_any().downcast_ref::<ReceiverReport>() {
858 for report in &rr.reports {
859 if report.ssrc == ssrc {
860 let packet_loss = report.fraction_lost;
861 let total_lost = report.total_lost;
862 let jitter = report.jitter;
863
864 info!(
865 track_id,
866 ssrc = report.ssrc,
867 fraction_lost = packet_loss,
868 total_lost = total_lost,
869 jitter = jitter,
870 last_sequence_number = report.last_sequence_number,
871 "Received RR for our stream"
872 );
873
874 if packet_loss > 50 {
875 warn!(track_id, "High packet loss detected: {}%", packet_loss);
876 }
877 }
878 }
879 } else if let Some(_) = packet.as_any().downcast_ref::<SourceDescription>() {
880 } else {
881 debug!(
882 track_id,
883 packet_type = %packet.header().packet_type,
884 "Received other RTCP packet type"
885 );
886 }
887 }
888
889 Ok(())
890 }
891
892 async fn classify_packet(
893 track_id: &TrackId,
894 buf: &[u8],
895 n: usize,
896 stats: &Arc<RtpTrackStats>,
897 ssrc: u32,
898 ) -> PacketKind {
899 if n >= 20 {
901 let msg_type = u16::from_be_bytes([buf[0], buf[1]]);
902 let msg_length = u16::from_be_bytes([buf[2], buf[3]]);
903 let magic_cookie = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
904
905 if magic_cookie == STUN_MAGIC_COOKIE
906 || ((msg_type & 0xC000) == 0x0000 && (msg_length as usize + 20) <= n)
907 {
908 debug!(
909 track_id = track_id.as_str(),
910 "Received STUN packet with message type: 0x{:04X}, length: {}", msg_type, n
911 );
912 return PacketKind::Stun(msg_type);
913 }
914 }
915
916 let version = (buf[0] >> 6) & 0x03;
918 let rtcp_pt = buf[1];
919 if version == 2 && rtcp_pt >= 200 && rtcp_pt <= 207 {
920 if let Err(e) = Self::handle_rtcp_packet(track_id, buf, n, stats, ssrc).await {
921 warn!(
922 track_id = track_id.as_str(),
923 "Failed to handle RTCP packet: {:?}", e
924 );
925 }
926 return PacketKind::Rtcp;
927 }
928
929 let rtp_pt = buf[1] & 0x7F;
931 if version != 2 {
932 info!(
933 track_id = track_id.as_str(),
934 "Received packet with invalid RTP version: {}, skipping", version
935 );
936 return PacketKind::Ignore;
937 }
938
939 if rtp_pt >= 128 {
940 debug!(
941 track_id = track_id.as_str(),
942 "Received packet with invalid RTP payload type: {}, might be unrecognized protocol",
943 rtp_pt
944 );
945 return PacketKind::Ignore;
946 }
947
948 PacketKind::Rtp
949 }
950
951 async fn recv_rtp_packets(
952 inner: Arc<Mutex<RtpTrackInner>>,
953 ptime: Duration,
954 rtp_socket: UdpConnection,
955 track_id: TrackId,
956 processor_chain: ProcessorChain,
957 packet_sender: TrackPacketSender,
958 _rtcp_socket: UdpConnection,
959 ssrc: u32,
960 ) -> Result<()> {
961 let mut buf = vec![0u8; RTP_MTU];
962 let mut send_ticker = tokio::time::interval(ptime);
963 let mut jitter = JitterBuffer::new();
964 let stats = inner.lock().unwrap().stats.clone();
965
966 loop {
967 select! {
968 Ok((n, src_addr)) = rtp_socket.recv_raw(&mut buf) => {
969 if n == 0 {
970 continue;
971 }
972
973
974 let packet_kind = Self::classify_packet(&track_id, &buf, n, &stats, ssrc).await;
975 match packet_kind {
976 PacketKind::Stun(msg_type) => {
977 let force = msg_type == STUN_BINDING_RESPONSE;
978 Self::maybe_update_remote_addr(&inner, &src_addr, force, &track_id, "stun");
979 continue;
980 }
981 PacketKind::Rtcp => {
982 Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtcp");
983 continue;
984 }
985 PacketKind::Ignore => {
986 continue;
987 }
988 PacketKind::Rtp => {
989 Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtp-private");
990 }
991 }
992 let packet = match Packet::unmarshal(&mut &buf[0..n]) {
993 Ok(packet) => packet,
994 Err(e) => {
995 info!(track_id, "Error creating RTP reader: {:?}", e);
996 continue;
997 }
998 };
999
1000 let seq_num = packet.header.sequence_number as u32;
1001 let payload_len = packet.payload.len() as u32;
1002 stats.update_receive_stats(seq_num, payload_len);
1003
1004 let payload_type = packet.header.payload_type;
1005 let payload = packet.payload.to_vec();
1006 let sample_rate = match payload_type {
1007 9 => 16000, 111 => 48000, _ => 8000,
1010 };
1011
1012 let frame = AudioFrame {
1013 track_id: track_id.clone(),
1014 samples: Samples::RTP {
1015 payload_type,
1016 payload,
1017 sequence_number: packet.header.sequence_number.into(),
1018 },
1019 timestamp: crate::media::get_timestamp(),
1020 sample_rate,
1021 };
1022
1023 jitter.push(frame);
1024 }
1025 _ = send_ticker.tick() => {
1026 let mut frame = match jitter.pop() {
1027 Some(f) => f,
1028 None => continue,
1029 };
1030
1031 if let Err(e) = processor_chain.process_frame(&mut frame) {
1032 trace!(track_id, "Failed to process frame: {}", e);
1033 continue;
1034 }
1035 match packet_sender.send(frame) {
1036 Ok(_) => {}
1037 Err(e) => {
1038 warn!(track_id, "Error sending audio frame: {}", e);
1039 break;
1040 }
1041 }
1042 }
1043 }
1044 }
1045 Ok(())
1046 }
1047
1048 fn maybe_update_remote_addr(
1049 inner: &Arc<Mutex<RtpTrackInner>>,
1050 src_addr: &SipAddr,
1051 force: bool,
1052 track_id: &TrackId,
1053 reason: &'static str,
1054 ) -> bool {
1055 let mut guard = inner.lock().unwrap();
1056 let src_ip = Self::sip_addr_ip(src_addr);
1057
1058 let should_update = if force {
1059 true
1060 } else {
1061 match (guard.remote_addr.as_ref(), src_ip) {
1062 (Some(remote), Some(src_ip)) => match Self::sip_addr_ip(remote) {
1063 Some(remote_ip) => remote_ip != src_ip && Self::is_private_ip(&remote_ip),
1064 None => false,
1065 },
1066 (None, _) => true,
1067 _ => false,
1068 }
1069 };
1070
1071 if should_update {
1072 let old = guard.remote_addr.replace(src_addr.clone());
1073 if guard.rtcp_mux {
1074 guard.remote_rtcp_addr = Some(src_addr.clone());
1075 } else if let Some(rtcp_addr) = guard.remote_rtcp_addr.as_mut() {
1076 rtcp_addr.addr.host = src_addr.addr.host.clone();
1077 }
1078 info!(
1079 track_id = track_id.as_str(),
1080 ?old,
1081 ?src_addr,
1082 reason = reason,
1083 "Updating remote RTP address"
1084 );
1085 return true;
1086 }
1087 false
1088 }
1089
1090 fn sip_addr_ip(addr: &SipAddr) -> Option<IpAddr> {
1091 addr.addr.host.to_string().parse().ok()
1092 }
1093
1094 fn is_private_ip(ip: &IpAddr) -> bool {
1095 match ip {
1096 IpAddr::V4(v4) => {
1097 v4.is_private()
1098 || v4.is_loopback()
1099 || v4.is_link_local()
1100 || v4.is_broadcast()
1101 || v4.is_documentation()
1102 || v4.is_unspecified()
1103 }
1104 IpAddr::V6(v6) => {
1105 v6.is_unique_local()
1106 || v6.is_loopback()
1107 || v6.is_unspecified()
1108 || v6.is_unicast_link_local()
1109 }
1110 }
1111 }
1112
1113 async fn send_rtcp_reports(
1115 inner: Arc<Mutex<RtpTrackInner>>,
1116 track_id: TrackId,
1117 token: CancellationToken,
1118 rtcp_socket: &UdpConnection,
1119 ssrc: u32,
1120 ssrc_cname: String,
1121 ) -> Result<()> {
1122 let mut interval = interval_at(
1123 Instant::now() + Duration::from_millis(RTCP_SR_INTERVAL_MS),
1124 Duration::from_millis(RTCP_SR_INTERVAL_MS),
1125 );
1126 let stats = inner.lock().unwrap().stats.clone();
1127 let mut last_sent_octets = stats.octet_count.load(Ordering::Relaxed);
1128 let mut last_recv_octets = stats.received_octets.load(Ordering::Relaxed);
1129 let mut last_rate_instant = Instant::now();
1130 loop {
1131 select! {
1132 _ = token.cancelled() => {
1133 info!(track_id, "RTCP reports task cancelled");
1134 break;
1135 }
1136 _ = interval.tick() => {
1137 let packet_count = stats.packet_count.load(Ordering::Relaxed);
1139 let octet_count = stats.octet_count.load(Ordering::Relaxed);
1140 let rtp_timestamp = stats.timestamp.load(Ordering::Relaxed);
1141
1142 let sent_octets = octet_count;
1143 let recv_octets = stats.received_octets.load(Ordering::Relaxed);
1144 let now = Instant::now();
1145 let elapsed = now.saturating_duration_since(last_rate_instant).as_secs_f64();
1146 if elapsed > 0.0 {
1147 let delta_sent = if sent_octets >= last_sent_octets {
1148 (sent_octets - last_sent_octets) as u64
1149 } else {
1150 (u32::MAX as u64 - last_sent_octets as u64) + sent_octets as u64 + 1
1151 };
1152 let delta_recv = if recv_octets >= last_recv_octets {
1153 (recv_octets - last_recv_octets) as u64
1154 } else {
1155 (u32::MAX as u64 - last_recv_octets as u64) + recv_octets as u64 + 1
1156 };
1157
1158 let send_bps = (delta_sent as f64 * 8.0) / elapsed;
1159 let recv_bps = (delta_recv as f64 * 8.0) / elapsed;
1160 let received_packets = stats.received_packets.load(Ordering::Relaxed);
1161 let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1162 let expected_packets = stats.expected_packets.load(Ordering::Relaxed);
1163 let fraction_lost = stats.get_fraction_lost();
1164 let loss_pct = (fraction_lost as f64) * 100.0 / 256.0;
1165 let jitter = stats.jitter.load(Ordering::Relaxed);
1166
1167 info!(
1168 track_id = track_id.as_str(),
1169 send_kbps = send_bps / 1000.0,
1170 recv_kbps = recv_bps / 1000.0,
1171 sent_packets = packet_count,
1172 recv_packets = received_packets,
1173 expected_packets,
1174 lost_packets,
1175 loss_pct,
1176 jitter,
1177 "RTP throughput"
1178 );
1179
1180 last_rate_instant = now;
1181 last_sent_octets = sent_octets;
1182 last_recv_octets = recv_octets;
1183 }
1184
1185 let mut pkts = vec![Box::new(SenderReport {
1186 ssrc,
1187 ntp_time: Instant::now().elapsed().as_secs() as u64,
1188 rtp_time: rtp_timestamp,
1189 packet_count,
1190 octet_count,
1191 profile_extensions: Bytes::new(),
1192 reports: vec![],
1193 })
1194 as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1195
1196 if !ssrc_cname.is_empty() {
1197 pkts.push(Box::new(SourceDescription {
1198 chunks: vec![SourceDescriptionChunk {
1199 source: ssrc,
1200 items: vec![SourceDescriptionItem {
1201 sdes_type: SdesType::SdesCname,
1202 text: ssrc_cname.clone().into(),
1203 }],
1204 }],
1205 })
1206 as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1207 }
1208
1209 let received_packets = stats.received_packets.load(Ordering::Relaxed);
1210 let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1211 let highest_seq = stats.highest_seq_num.load(Ordering::Relaxed);
1212 let jitter = stats.jitter.load(Ordering::Relaxed);
1213 let fraction_lost = stats.get_fraction_lost();
1214
1215 if received_packets > 0 || lost_packets > 0 {
1216 let remote_ssrc = ssrc + 1;
1217 let report = ReceptionReport {
1218 ssrc: remote_ssrc,
1219 fraction_lost,
1220 total_lost: lost_packets,
1221 last_sequence_number: highest_seq,
1222 jitter,
1223 last_sender_report: (stats.last_sr_timestamp.load(Ordering::Relaxed) >> 16) as u32,
1224 delay: 0,
1225 };
1226
1227 let rr = ReceiverReport {
1228 ssrc,
1229 reports: vec![report],
1230 profile_extensions: Bytes::new(),
1231 };
1232 pkts.push(Box::new(rr) as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1233 }
1234
1235 let rtcp_data = webrtc::rtcp::packet::marshal(&pkts)?;
1236 let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1237 match remote_rtcp_addr{
1238 Some(ref addr) => {
1239 if let Err(e) = rtcp_socket.send_raw(&rtcp_data, addr).await {
1240 warn!(track_id, "Failed to send RTCP report: {}", e);
1241 }
1242 }
1243 None => {}
1244 }
1245 }
1246 }
1247 }
1248 Ok(())
1249 }
1250
1251 async fn try_ice_connectivity_check(&self) {
1252 let remote_addr = self.inner.lock().unwrap().remote_addr.clone();
1253 let remote_rtcp_addr = self.inner.lock().unwrap().remote_rtcp_addr.clone();
1254
1255 if let Some(ref addr) = remote_addr {
1256 Self::send_ice_connectivity_check(&self.rtp_socket, addr)
1257 .await
1258 .ok();
1259 if let Some(ref rtcp_addr) = remote_rtcp_addr {
1260 if rtcp_addr != addr {
1261 Self::send_ice_connectivity_check(&self.rtcp_socket, rtcp_addr)
1262 .await
1263 .ok();
1264 }
1265 }
1266 }
1267 }
1268}
1269
1270#[async_trait]
1271impl Track for RtpTrack {
1272 fn ssrc(&self) -> u32 {
1273 self.ssrc
1274 }
1275 fn id(&self) -> &TrackId {
1276 &self.track_id
1277 }
1278 fn config(&self) -> &TrackConfig {
1279 &self.config
1280 }
1281 fn processor_chain(&mut self) -> &mut ProcessorChain {
1282 &mut self.processor_chain
1283 }
1284
1285 async fn handshake(&mut self, offer: String, _timeout: Option<Duration>) -> Result<String> {
1286 self.set_remote_description(&offer)?;
1287 self.local_description()
1288 }
1289
1290 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
1291 self.set_remote_description(&answer).ok();
1292
1293 if self.ice_connectivity_check {
1294 self.try_ice_connectivity_check().await;
1295 }
1296 Ok(())
1297 }
1298
1299 async fn start(
1300 &self,
1301 event_sender: EventSender,
1302 packet_sender: TrackPacketSender,
1303 ) -> Result<()> {
1304 let track_id = self.track_id.clone();
1305 let rtcp_socket = self.rtcp_socket.clone();
1306 let ssrc = self.ssrc;
1307 let rtp_socket = self.rtp_socket.clone();
1308 let processor_chain = self.processor_chain.clone();
1309 let token = self.cancel_token.clone();
1310 let ssrc_cname = self.ssrc_cname.clone();
1311 let start_time = crate::media::get_timestamp();
1312 let ptime = self.config.ptime;
1313
1314 if self.ice_connectivity_check {
1316 self.try_ice_connectivity_check().await;
1317 }
1318
1319 let inner = self.inner.clone();
1320
1321 tokio::spawn(async move {
1322 select! {
1323 _ = token.cancelled() => {
1324 debug!(track_id, "RTP processor task cancelled");
1325 },
1326 _ = Self::send_rtcp_reports(inner.clone(),track_id.clone(), token.clone(), &rtcp_socket, ssrc, ssrc_cname) => {
1327 }
1328 _ = Self::recv_rtp_packets(
1329 inner.clone(),
1330 ptime,
1331 rtp_socket,
1332 track_id.clone(),
1333 processor_chain,
1334 packet_sender,
1335 rtcp_socket.clone(),
1336 ssrc,
1337 ) => {
1338 }
1339 };
1340 let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1341 match remote_rtcp_addr {
1343 Some(ref addr) => {
1344 let pkts = vec![Box::new(Goodbye {
1345 sources: vec![ssrc],
1346 reason: "end of call".into(),
1347 })
1348 as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1349 if let Ok(data) = webrtc::rtcp::packet::marshal(&pkts) {
1350 if let Err(e) = rtcp_socket.send_raw(&data, addr).await {
1351 warn!(track_id, "Failed to send RTCP goodbye packet: {}", e);
1352 }
1353 }
1354 }
1355 None => {}
1356 }
1357 info!(track_id, "RTP processor completed");
1358 event_sender
1359 .send(SessionEvent::TrackEnd {
1360 track_id,
1361 timestamp: crate::media::get_timestamp(),
1362 duration: crate::media::get_timestamp() - start_time,
1363 ssrc,
1364 play_id: None,
1365 })
1366 .ok();
1367 });
1368
1369 Ok(())
1370 }
1371
1372 async fn stop(&self) -> Result<()> {
1373 self.cancel_token.cancel();
1374 Ok(())
1375 }
1376
1377 async fn send_packet(&self, packet: &AudioFrame) -> Result<()> {
1378 let remote_addr = match self.inner.lock().unwrap().remote_addr.clone() {
1379 Some(addr) => addr,
1380 None => return Ok(()),
1381 };
1382 let stats = self.inner.lock().unwrap().stats.clone();
1383
1384 let (payload_type, payload) = self
1385 .encoder
1386 .encode(self.inner.lock().unwrap().payload_type, packet.clone());
1387 if payload.is_empty() {
1388 return Ok(());
1389 }
1390
1391 let clock_rate = match payload_type {
1392 9 => 8000, 111 => 48000, _ => 8000,
1395 };
1396
1397 let now = crate::media::get_timestamp();
1398 let last_update = stats.last_timestamp_update.load(Ordering::Relaxed);
1399 let mut skipped_packets: u32 = 0;
1400
1401 if last_update > 0 {
1402 let frame_duration_ms = self.config.ptime.as_millis() as u64;
1403 if frame_duration_ms > 0 {
1404 let delta_ms = now.saturating_sub(last_update);
1405 let delta_frames = delta_ms / frame_duration_ms;
1406 let prospective_skip = delta_frames.saturating_sub(1);
1407
1408 if prospective_skip >= RTP_RESYNC_MIN_SKIP_PACKETS as u64 {
1409 let last_resync = stats.last_resync_ts.load(Ordering::Relaxed);
1410 let cooldown_ms = frame_duration_ms.saturating_mul(RTP_RESYNC_COOLDOWN_FRAMES);
1411 if last_resync == 0 || now.saturating_sub(last_resync) >= cooldown_ms {
1412 skipped_packets = prospective_skip.min(u32::MAX as u64) as u32;
1413 debug!(
1414 track_id = self.track_id,
1415 delta_ms, skipped_packets, "Resyncing RTP timestamp"
1416 );
1417 for _ in 0..skipped_packets {
1418 self.sequencer.next_sequence_number();
1419 }
1420 stats.last_resync_ts.store(now, Ordering::Relaxed);
1421 }
1422 }
1423 }
1424 }
1425
1426 stats.last_timestamp_update.store(now, Ordering::Relaxed);
1427
1428 let samples_per_packet = (clock_rate as f64 * self.config.ptime.as_secs_f64()) as u32;
1429 let packets = match self
1430 .inner
1431 .lock()
1432 .unwrap()
1433 .packetizer
1434 .lock()
1435 .unwrap()
1436 .as_mut()
1437 {
1438 Some(p) => {
1439 if skipped_packets > 0 {
1440 let skip_samples = (skipped_packets as u64)
1441 .saturating_mul(samples_per_packet as u64)
1442 .min(u32::MAX as u64) as u32;
1443 p.skip_samples(skip_samples);
1444 }
1445 p.packetize(&Bytes::from_owner(payload), samples_per_packet)?
1446 }
1447 None => return Err(anyhow::anyhow!("Packetizer not set")),
1448 };
1449 for mut packet in packets {
1450 packet.header.marker = false;
1451 packet.header.payload_type = payload_type;
1452 match packet.marshal() {
1453 Ok(ref rtp_data) => match self.rtp_socket.send_raw(rtp_data, &remote_addr).await {
1454 Ok(_) => {
1455 stats.update_send_stats(rtp_data.len() as u32, samples_per_packet);
1456 }
1457 Err(e) => {
1458 warn!(track_id = self.track_id, "Failed to send RTP packet: {}", e);
1459 }
1460 },
1461 Err(e) => {
1462 warn!(
1463 track_id = self.track_id,
1464 "Failed to build RTP packet: {:?}", e
1465 );
1466 return Err(anyhow::anyhow!("Failed to build RTP packet"));
1467 }
1468 }
1469 }
1470 Ok(())
1471 }
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476 use super::*;
1477
1478 #[test]
1479 fn test_rtp_track_stats_new() {
1480 let stats = RtpTrackStats::new();
1481 assert_eq!(stats.packet_count.load(Ordering::Relaxed), 0);
1482 assert_eq!(stats.octet_count.load(Ordering::Relaxed), 0);
1483 assert_eq!(stats.received_packets.load(Ordering::Relaxed), 0);
1484 assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1485 assert_eq!(stats.jitter.load(Ordering::Relaxed), 0);
1486 }
1487
1488 #[test]
1489 fn test_update_send_stats() {
1490 let stats = RtpTrackStats::new();
1491 stats.update_send_stats(1200, 160);
1492
1493 assert_eq!(stats.packet_count.load(Ordering::Relaxed), 1);
1494 assert_eq!(stats.octet_count.load(Ordering::Relaxed), 1200);
1495 assert_eq!(stats.timestamp.load(Ordering::Relaxed), 160);
1496
1497 stats.update_send_stats(800, 160);
1499 assert_eq!(stats.packet_count.load(Ordering::Relaxed), 2);
1500 assert_eq!(stats.octet_count.load(Ordering::Relaxed), 2000);
1501 assert_eq!(stats.timestamp.load(Ordering::Relaxed), 320);
1502 }
1503
1504 #[test]
1505 fn test_update_receive_stats() {
1506 let stats = RtpTrackStats::new();
1507
1508 stats.update_receive_stats(1000, 160);
1510 assert_eq!(stats.received_packets.load(Ordering::Relaxed), 1);
1511 assert_eq!(stats.received_octets.load(Ordering::Relaxed), 160);
1512 assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1000);
1513 assert_eq!(stats.base_seq.load(Ordering::Relaxed), 1000);
1514 assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1000);
1515 assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 1);
1516 assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1517
1518 stats.update_receive_stats(1002, 160);
1520 assert_eq!(stats.received_packets.load(Ordering::Relaxed), 2);
1521 assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1002);
1522 assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1002);
1523 assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 1);
1524 assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 3);
1525 }
1526
1527 #[test]
1528 fn test_get_fraction_lost() {
1529 let stats = RtpTrackStats::new();
1530
1531 assert_eq!(stats.get_fraction_lost(), 0);
1533
1534 stats.expected_packets.store(100, Ordering::Relaxed);
1536 stats.lost_packets.store(5, Ordering::Relaxed);
1537
1538 let fraction_lost = stats.get_fraction_lost();
1539 assert_eq!(fraction_lost, 12); stats.lost_packets.store(100, Ordering::Relaxed);
1543 assert_eq!(stats.get_fraction_lost(), 255); }
1545
1546 #[test]
1547 fn test_store_sr_info() {
1548 let stats = RtpTrackStats::new();
1549 stats.store_sr_info(123456, 789012);
1550
1551 assert_eq!(stats.last_sr_timestamp.load(Ordering::Relaxed), 123456);
1552 assert_eq!(stats.last_sr_ntp.load(Ordering::Relaxed), 789012);
1553 }
1554
1555 #[tokio::test]
1556 async fn test_parse_pjsip_sdp() {
1557 let sdp = r#"v=0
1558o=- 3954304612 3954304613 IN IP4 192.168.1.202
1559s=pjmedia
1560b=AS:117
1561t=0 0
1562a=X-nat:3
1563m=audio 4002 RTP/AVP 9 101
1564c=IN IP4 192.168.1.202
1565b=TIAS:96000
1566a=rtcp:4003 IN IP4 192.168.1.202
1567a=sendrecv
1568a=rtpmap:9 G722/8000
1569a=ssrc:1089147397 cname:61753255553b9c6f
1570a=rtpmap:101 telephone-event/8000
1571a=fmtp:101 0-16"#;
1572 let rtp_track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1573 .build()
1574 .await
1575 .expect("Failed to build rtp track");
1576 rtp_track
1577 .set_remote_description(sdp)
1578 .expect("Failed to set remote description");
1579 let inner = rtp_track.inner.lock().unwrap();
1580 assert_eq!(inner.payload_type, 9);
1581 assert!(!inner.rtcp_mux); }
1583
1584 #[tokio::test]
1585 async fn test_parse_rtcp_mux() {
1586 let answer = r#"v=0
1587o=- 723884243 723884244 IN IP4 11.22.33.44
1588s=-
1589c=IN IP4 11.22.33.44
1590t=0 0
1591m=audio 10638 RTP/AVP 8 101
1592a=rtpmap:8 PCMA/8000
1593a=rtpmap:101 telephone-event/8000
1594a=fmtp:101 0-15
1595a=sendrecv
1596a=rtcp-mux"#;
1597 let mut reader = Cursor::new(answer);
1598 let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1599 let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1600 assert!(peer_media.rtcp_mux);
1601 assert_eq!(peer_media.rtcp_port, 10638);
1602 }
1603
1604 #[tokio::test]
1605 async fn test_parse_linphone_candidate() {
1606 let answer = r#"v=0
1607o=mpi 2590 792 IN IP4 192.168.3.181
1608s=Talk
1609c=IN IP4 192.168.3.181
1610t=0 0
1611a=ice-pwd:96adb77560869c783656fe0a
1612a=ice-ufrag:409dfd53
1613a=rtcp-xr:rcvr-rtt=all:10000 stat-summary=loss,dup,jitt,TTL voip-metrics
1614a=record:off
1615m=audio 61794 RTP/AVP 8 101
1616c=IN IP4 115.205.103.101
1617a=rtpmap:101 telephone-event/8000
1618a=rtcp:50735
1619a=candidate:1 1 UDP 2130706303 192.168.3.181 61794 typ host
1620a=candidate:1 2 UDP 2130706302 192.168.3.181 50735 typ host
1621a=candidate:2 1 UDP 1694498687 115.205.103.101 61794 typ srflx raddr 192.168.3.181 rport 61794
1622a=candidate:2 2 UDP 1694498686 115.205.103.101 50735 typ srflx raddr 192.168.3.181 rport 50735
1623a=rtcp-fb:* trr-int 5000
1624a=rtcp-fb:* ccm tmmbr"#;
1625 let mut reader = Cursor::new(answer);
1626 let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1627 let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1628 assert_eq!(peer_media.rtp_addr, "192.168.3.181");
1629 }
1630
1631 #[tokio::test]
1632 async fn test_rtp_track_builder() {
1633 let track_id = "test_track".to_string();
1634 let config = TrackConfig::default();
1635
1636 let track = RtpTrackBuilder::new(track_id.clone(), config)
1637 .with_rtp_start_port(20000)
1638 .with_rtp_end_port(20100)
1639 .with_session_name("test_session".to_string())
1640 .build()
1641 .await
1642 .expect("Failed to build track");
1643
1644 assert_eq!(track.track_id, track_id);
1645 assert_ne!(track.ssrc, 0); assert_eq!(track.ssrc_cname, "test_session");
1648 let inner = track.inner.lock().unwrap();
1649 assert!(inner.rtcp_mux);
1650 }
1651
1652 #[tokio::test]
1653 async fn test_local_description_generation() {
1654 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1655 .build()
1656 .await
1657 .expect("Failed to build track");
1658
1659 let local_desc = track
1660 .local_description()
1661 .expect("Failed to generate local description");
1662
1663 assert!(local_desc.contains("m=audio"));
1665 assert!(local_desc.contains("RTP/AVP"));
1666 assert!(local_desc.contains("a=rtcp-mux")); assert!(local_desc.contains("a=sendrecv"));
1668 assert!(local_desc.contains(&format!("a=ssrc:{}", track.ssrc)));
1669 }
1670
1671 #[tokio::test]
1672 async fn test_double_set_remote_description() {
1673 let sdp = r#"v=0
1674o=- 123 124 IN IP4 192.168.1.1
1675s=-
1676c=IN IP4 192.168.1.1
1677t=0 0
1678m=audio 5004 RTP/AVP 0
1679a=rtpmap:0 PCMU/8000"#;
1680
1681 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1682 .build()
1683 .await
1684 .expect("Failed to build track");
1685
1686 assert!(track.set_remote_description(sdp).is_ok());
1688 assert!(track.remote_description().is_some());
1689
1690 assert!(track.set_remote_description(sdp).is_ok());
1692 }
1693
1694 #[tokio::test]
1695 async fn test_invalid_sdp() {
1696 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1697 .build()
1698 .await
1699 .expect("Failed to build track");
1700
1701 let invalid_sdp = r#"v=0
1703o=- 123 124 IN IP4 192.168.1.1
1704s=-
1705c=IN IP4 192.168.1.1
1706t=0 0"#;
1707
1708 assert!(track.set_remote_description(invalid_sdp).is_err());
1709 }
1710
1711 #[tokio::test]
1712 async fn test_dtmf_digit_mapping() {
1713 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1714 .build()
1715 .await
1716 .expect("Failed to build track");
1717
1718 let valid_digits = [
1720 "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "*", "#", "A", "B", "C", "D",
1721 ];
1722
1723 for digit in &valid_digits {
1724 let result = track.send_dtmf(digit, Some(100)).await;
1727 assert!(result.is_err());
1728 let error_msg = result.unwrap_err().to_string();
1729 assert!(error_msg.contains("Remote address not set"));
1730 }
1731
1732 let result = track.send_dtmf("X", Some(100)).await;
1734 assert!(result.is_err());
1735 let error_msg = result.unwrap_err().to_string();
1736 assert!(error_msg.contains("Invalid DTMF digit"));
1737 }
1738
1739 #[test]
1740 fn test_rtcp_packet_type_detection() {
1741 assert!(200 >= 200 && 200 <= 207); assert!(201 >= 200 && 201 <= 207); assert!(202 >= 200 && 202 <= 207); assert!(203 >= 200 && 203 <= 207); assert!(204 >= 200 && 204 <= 207); let rtp_byte = 0b10001001; let version = (rtp_byte >> 6) & 0x03;
1751 let pt = rtp_byte & 0x7F;
1752
1753 assert_eq!(version, 2);
1754 assert_eq!(pt, 9);
1755 }
1756
1757 #[test]
1758 fn test_stun_magic_cookie_detection() {
1759 let stun_magic_cookie = STUN_MAGIC_COOKIE;
1760 let bytes = stun_magic_cookie.to_be_bytes();
1761 let reconstructed = u32::from_be_bytes(bytes);
1762
1763 assert_eq!(reconstructed, stun_magic_cookie);
1764 }
1765
1766 #[tokio::test]
1767 async fn test_track_ssrc_and_id() {
1768 let track_id = "unique_track_123".to_string();
1769 let custom_ssrc = 0x12345678;
1770
1771 let track = RtpTrackBuilder::new(track_id.clone(), TrackConfig::default())
1772 .with_ssrc(custom_ssrc)
1773 .build()
1774 .await
1775 .expect("Failed to build track");
1776
1777 let builder =
1779 RtpTrackBuilder::new(track_id.clone(), TrackConfig::default()).with_ssrc(custom_ssrc);
1780 assert_eq!(builder.ssrc, custom_ssrc);
1781 assert_eq!(track.id(), &track_id);
1782 }
1783
1784 #[test]
1785 fn test_codec_type_payload_mapping() {
1786 assert_eq!(CodecType::PCMU.payload_type(), 0);
1788 assert_eq!(CodecType::G722.payload_type(), 9);
1789 assert_eq!(CodecType::PCMA.payload_type(), 8);
1790 assert_eq!(CodecType::TelephoneEvent.payload_type(), 101);
1791 }
1792
1793 #[tokio::test]
1794 async fn test_stats_initialization() {
1795 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1796 .build()
1797 .await
1798 .expect("Failed to build track");
1799 let inner = track.inner.lock().unwrap();
1800 assert_eq!(inner.stats.packet_count.load(Ordering::Relaxed), 0);
1802 assert_eq!(inner.stats.octet_count.load(Ordering::Relaxed), 0);
1803 assert_eq!(inner.stats.received_packets.load(Ordering::Relaxed), 0);
1804 assert_eq!(inner.stats.lost_packets.load(Ordering::Relaxed), 0);
1805 assert_eq!(inner.stats.highest_seq_num.load(Ordering::Relaxed), 0);
1806 assert_eq!(inner.stats.jitter.load(Ordering::Relaxed), 0);
1807 assert_eq!(inner.stats.last_sr_timestamp.load(Ordering::Relaxed), 0);
1808 assert_eq!(inner.stats.last_sr_ntp.load(Ordering::Relaxed), 0);
1809 assert_eq!(inner.stats.base_seq.load(Ordering::Relaxed), 0);
1810 assert_eq!(inner.stats.last_receive_seq.load(Ordering::Relaxed), 0);
1811 assert_eq!(inner.stats.last_resync_ts.load(Ordering::Relaxed), 0);
1812 }
1813
1814 #[test]
1815 fn test_sequence_number_gap_calculation() {
1816 let stats = RtpTrackStats::new();
1817
1818 stats.update_receive_stats(1000, 160); stats.update_receive_stats(1002, 160); stats.update_receive_stats(1003, 160); stats.update_receive_stats(1005, 160); assert_eq!(stats.received_packets.load(Ordering::Relaxed), 4);
1825 assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1005);
1826 assert!(stats.lost_packets.load(Ordering::Relaxed) > 0);
1828 }
1829
1830 #[test]
1831 fn test_jitter_calculation() {
1832 let stats = RtpTrackStats::new();
1833
1834 stats.update_receive_stats(1000, 160);
1836 let _initial_jitter = stats.jitter.load(Ordering::Relaxed);
1837
1838 stats.update_receive_stats(1001, 160);
1839 let updated_jitter = stats.jitter.load(Ordering::Relaxed);
1840
1841 assert!(updated_jitter < 1000); }
1845
1846 #[test]
1847 fn test_builder_with_custom_ssrc() {
1848 let custom_ssrc = 0x12345678u32;
1849 let builder =
1850 RtpTrackBuilder::new("test".to_string(), TrackConfig::default()).with_ssrc(custom_ssrc);
1851
1852 assert_eq!(builder.ssrc, custom_ssrc);
1854 assert_eq!(builder.ssrc_cname, format!("rustpbx-{}", custom_ssrc));
1855 }
1856
1857 #[test]
1858 fn test_builder_configuration() {
1859 let builder = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1860 .with_rtp_start_port(10000)
1861 .with_rtp_end_port(20000)
1862 .with_rtp_alloc_count(100)
1863 .with_rtcp_mux(false)
1864 .with_session_name("custom_session".to_string());
1865
1866 assert_eq!(builder.rtp_start_port, 10000);
1867 assert_eq!(builder.rtp_end_port, 20000);
1868 assert_eq!(builder.rtp_alloc_count, 100);
1869 assert!(!builder.rtcp_mux);
1870 assert_eq!(builder.ssrc_cname, "custom_session");
1871 }
1872
1873 #[tokio::test]
1874 async fn test_ice_connectivity_check_enabled_by_default() {
1875 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1876 .build()
1877 .await
1878 .expect("Failed to build track");
1879
1880 assert!(track.ice_connectivity_check); }
1882
1883 #[tokio::test]
1884 async fn test_ice_connectivity_check_can_be_disabled() {
1885 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1886 .with_ice_connectivity_check(false)
1887 .build()
1888 .await
1889 .expect("Failed to build track");
1890
1891 assert!(!track.ice_connectivity_check);
1892 }
1893
1894 #[tokio::test]
1895 async fn test_maybe_update_remote_addr_private_peer() {
1896 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1897 .build()
1898 .await
1899 .expect("Failed to build track");
1900 let inner = track.inner.clone();
1901
1902 let private_addr = SipAddr {
1903 addr: HostWithPort {
1904 host: "192.168.0.10".parse().expect("host"),
1905 port: Some(4000.into()),
1906 },
1907 r#type: Some(rsip::transport::Transport::Udp),
1908 };
1909
1910 let public_addr = SipAddr {
1911 addr: HostWithPort {
1912 host: "203.0.113.5".parse().expect("host"),
1913 port: Some(5004.into()),
1914 },
1915 r#type: Some(rsip::transport::Transport::Udp),
1916 };
1917
1918 {
1919 let mut guard = inner.lock().expect("lock");
1920 guard.remote_addr = Some(private_addr.clone());
1921 guard.remote_rtcp_addr = Some(private_addr.clone());
1922 guard.rtcp_mux = true;
1923 }
1924
1925 let updated = RtpTrack::maybe_update_remote_addr(
1926 &inner,
1927 &public_addr,
1928 false,
1929 &track.track_id,
1930 "test",
1931 );
1932
1933 assert!(updated);
1934 let guard = inner.lock().expect("lock");
1935 assert_eq!(
1936 guard
1937 .remote_addr
1938 .as_ref()
1939 .expect("remote")
1940 .addr
1941 .host
1942 .to_string(),
1943 "203.0.113.5"
1944 );
1945 assert_eq!(
1946 guard
1947 .remote_rtcp_addr
1948 .as_ref()
1949 .expect("rtcp")
1950 .addr
1951 .host
1952 .to_string(),
1953 "203.0.113.5"
1954 );
1955 }
1956
1957 #[test]
1958 fn test_stun_packet_structure() {
1959 assert_eq!(STUN_BINDING_REQUEST, 0x0001);
1961 assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442);
1962 assert_eq!(STUN_TRANSACTION_ID_SIZE, 12);
1963
1964 let mut packet = vec![0u8; 20];
1966 packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
1967 packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
1968
1969 let msg_type = u16::from_be_bytes([packet[0], packet[1]]);
1971 assert_eq!(msg_type, STUN_BINDING_REQUEST);
1972
1973 let magic = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]);
1975 assert_eq!(magic, STUN_MAGIC_COOKIE);
1976 }
1977
1978 #[tokio::test]
1979 async fn test_ice_connectivity_check_builder_method() {
1980 let builder_enabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1981 .with_ice_connectivity_check(true);
1982 assert!(builder_enabled.ice_connectivity_check);
1983
1984 let builder_disabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1985 .with_ice_connectivity_check(false);
1986 assert!(!builder_disabled.ice_connectivity_check);
1987 }
1988
1989 #[test]
1990 fn test_ice_connectivity_terminology() {
1991 assert_eq!(STUN_BINDING_REQUEST, 0x0001); assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442); }
2004}