1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::Samples,
6 media::TrackId,
7 media::{
8 codecs::CodecType,
9 jitter::JitterBuffer,
10 negotiate::select_peer_media,
11 processor::ProcessorChain,
12 track::{Track, TrackConfig, TrackPacketSender},
13 },
14};
15use anyhow::Result;
16use async_trait::async_trait;
17use bytes::Bytes;
18use rsip::HostWithPort;
19use rsipstack::transport::{SipAddr, udp::UdpConnection};
20use std::{
21 io::Cursor,
22 net::{IpAddr, SocketAddr},
23 sync::{
24 Arc, Mutex,
25 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
26 },
27 time::Duration,
28};
29use tokio::{select, time::Instant, time::interval_at};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, info, warn};
32use webrtc::{
33 rtcp::{
34 goodbye::Goodbye,
35 receiver_report::ReceiverReport,
36 reception_report::ReceptionReport,
37 sender_report::SenderReport,
38 source_description::{
39 SdesType, SourceDescription, SourceDescriptionChunk, SourceDescriptionItem,
40 },
41 },
42 rtp::{
43 codecs::g7xx::G7xxPayloader,
44 packet::Packet,
45 packetizer::{Packetizer, new_packetizer},
46 sequence::{Sequencer, new_random_sequencer},
47 },
48 sdp::{
49 MediaDescription, SessionDescription,
50 description::{
51 common::{Address, Attribute, ConnectionInformation},
52 media::{MediaName, RangedPort},
53 session::{
54 ATTR_KEY_RTCPMUX, ATTR_KEY_SEND_ONLY, ATTR_KEY_SEND_RECV, ATTR_KEY_SSRC, Origin,
55 TimeDescription, Timing,
56 },
57 },
58 },
59 util::{Marshal, Unmarshal},
60};
61const RTP_MTU: usize = 1500; const RTP_OUTBOUND_MTU: usize = 1200; const RTCP_SR_INTERVAL_MS: u64 = 5000; const DTMF_EVENT_DURATION_MS: u64 = 160; const DTMF_EVENT_VOLUME: u8 = 10; const RTP_RESYNC_MIN_SKIP_PACKETS: u32 = 3; const RTP_RESYNC_COOLDOWN_FRAMES: u64 = 3; const STUN_BINDING_REQUEST: u16 = 0x0001;
71const STUN_BINDING_RESPONSE: u16 = 0x0101;
72const STUN_MAGIC_COOKIE: u32 = 0x2112A442;
73const STUN_TRANSACTION_ID_SIZE: usize = 12;
74
75struct RtpTrackStats {
76 timestamp: Arc<AtomicU32>,
77 packet_count: Arc<AtomicU32>,
78 octet_count: Arc<AtomicU32>,
79 last_timestamp_update: Arc<AtomicU64>,
80 last_resync_ts: Arc<AtomicU64>,
81 received_packets: Arc<AtomicU32>,
82 received_octets: Arc<AtomicU32>,
83 expected_packets: Arc<AtomicU32>,
84 lost_packets: Arc<AtomicU32>,
85 highest_seq_num: Arc<AtomicU32>,
86 base_seq: Arc<AtomicU32>,
87 last_receive_seq: Arc<AtomicU32>,
88 jitter: Arc<AtomicU32>,
89 last_sr_timestamp: Arc<AtomicU64>,
90 last_sr_ntp: Arc<AtomicU64>,
91}
92
93impl RtpTrackStats {
94 fn new() -> Self {
95 Self {
96 timestamp: Arc::new(AtomicU32::new(0)),
97 packet_count: Arc::new(AtomicU32::new(0)),
98 octet_count: Arc::new(AtomicU32::new(0)),
99 last_timestamp_update: Arc::new(AtomicU64::new(0)),
100 last_resync_ts: Arc::new(AtomicU64::new(0)),
101 received_packets: Arc::new(AtomicU32::new(0)),
102 received_octets: Arc::new(AtomicU32::new(0)),
103 expected_packets: Arc::new(AtomicU32::new(0)),
104 lost_packets: Arc::new(AtomicU32::new(0)),
105 highest_seq_num: Arc::new(AtomicU32::new(0)),
106 base_seq: Arc::new(AtomicU32::new(0)),
107 last_receive_seq: Arc::new(AtomicU32::new(0)),
108 jitter: Arc::new(AtomicU32::new(0)),
109 last_sr_timestamp: Arc::new(AtomicU64::new(0)),
110 last_sr_ntp: Arc::new(AtomicU64::new(0)),
111 }
112 }
113
114 fn update_send_stats(&self, packet_len: u32, samples_per_packet: u32) {
115 self.packet_count.fetch_add(1, Ordering::Relaxed);
116 self.octet_count.fetch_add(packet_len, Ordering::Relaxed);
117 self.timestamp
118 .fetch_add(samples_per_packet, Ordering::Relaxed);
119 }
120
121 fn update_receive_stats(&self, seq_num: u32, payload_len: u32) {
122 let prev_received = self.received_packets.fetch_add(1, Ordering::Relaxed);
123 let received = prev_received + 1;
124 self.received_octets
125 .fetch_add(payload_len, Ordering::Relaxed);
126
127 if prev_received == 0 {
128 self.base_seq.store(seq_num, Ordering::Relaxed);
129 self.last_receive_seq.store(seq_num, Ordering::Relaxed);
130 self.highest_seq_num.store(seq_num, Ordering::Relaxed);
131 self.lost_packets.store(0, Ordering::Relaxed);
132 self.expected_packets.store(received, Ordering::Relaxed);
133 } else {
134 let last_seq = self.last_receive_seq.load(Ordering::Relaxed);
135 let gap = (seq_num as u16).wrapping_sub(last_seq as u16) as u32;
136
137 if gap > 0 && gap < 0x8000 {
138 if gap > 1 {
139 self.lost_packets.fetch_add(gap - 1, Ordering::Relaxed);
140 }
141 self.last_receive_seq.store(seq_num, Ordering::Relaxed);
142 self.highest_seq_num.store(seq_num, Ordering::Relaxed);
143 }
144
145 let lost = self.lost_packets.load(Ordering::Relaxed);
146 self.expected_packets
147 .store(received + lost, Ordering::Relaxed);
148 }
149
150 let current_jitter = self.jitter.load(Ordering::Relaxed);
151 let new_jitter = (current_jitter + (seq_num % 100)) / 2;
152 self.jitter.store(new_jitter, Ordering::Relaxed);
153 }
154
155 fn store_sr_info(&self, rtp_time: u64, ntp_time: u64) {
156 self.last_sr_timestamp.store(rtp_time, Ordering::Relaxed);
157 self.last_sr_ntp.store(ntp_time, Ordering::Relaxed);
158 }
159
160 fn get_fraction_lost(&self) -> u8 {
161 let expected_packets = self.expected_packets.load(Ordering::Relaxed);
162 let lost_packets = self.lost_packets.load(Ordering::Relaxed);
163
164 if expected_packets > 0 {
165 ((lost_packets * 256) / expected_packets).min(255) as u8
166 } else {
167 0
168 }
169 }
170}
171
172pub struct RtpTrackBuilder {
173 cancel_token: Option<CancellationToken>,
174 track_id: TrackId,
175 config: TrackConfig,
176 local_addr: Option<IpAddr>,
177 external_addr: Option<IpAddr>,
178 rtp_socket: Option<UdpConnection>,
179 rtcp_socket: Option<UdpConnection>,
180 rtcp_mux: bool,
181 rtp_start_port: u16,
182 rtp_end_port: u16,
183 rtp_alloc_count: u32,
184 enabled_codecs: Vec<CodecType>,
185 ssrc_cname: String,
186 ssrc: u32,
187 ice_connectivity_check: bool,
188}
189pub struct RtpTrackInner {
190 dtmf_payload_type: u8,
191 payload_type: u8,
192 remote_description: Option<String>,
193 packetizer: Mutex<Option<Box<dyn Packetizer + Send + Sync>>>,
194 stats: Arc<RtpTrackStats>,
195 rtcp_mux: bool,
196 remote_addr: Option<SipAddr>,
197 remote_rtcp_addr: Option<SipAddr>,
198 enabled_codecs: Vec<CodecType>,
199 rtp_map: Vec<(u8, (CodecType, u32, u16))>,
200}
201
202pub struct RtpTrack {
203 ssrc: u32,
204 ssrc_cname: String,
205 track_id: TrackId,
206 config: TrackConfig,
207 cancel_token: CancellationToken,
208 processor_chain: ProcessorChain,
209 rtp_socket: UdpConnection,
210 rtcp_socket: UdpConnection,
211 encoder: TrackCodec,
212 sequencer: Box<dyn Sequencer + Send + Sync>,
213 sendrecv: AtomicBool,
214 ice_connectivity_check: bool,
215 inner: Arc<Mutex<RtpTrackInner>>,
216}
217
218enum PacketKind {
219 Rtp,
220 Rtcp,
221 Stun(u16),
222 Ignore,
223}
224impl RtpTrackBuilder {
225 pub fn new(track_id: TrackId, config: TrackConfig) -> Self {
226 let ssrc = rand::random::<u32>();
227 Self {
228 track_id,
229 config,
230 local_addr: None,
231 external_addr: None,
232 cancel_token: None,
233 rtp_socket: None,
234 rtcp_socket: None,
235 rtcp_mux: true,
236 rtp_start_port: 12000,
237 rtp_end_port: u16::MAX - 1,
238 rtp_alloc_count: 500,
239 enabled_codecs: vec![
240 #[cfg(feature = "opus")]
241 CodecType::Opus,
242 #[cfg(feature = "g729")]
243 CodecType::G729,
244 CodecType::G722,
245 CodecType::PCMU,
246 CodecType::PCMA,
247 CodecType::TelephoneEvent,
248 ],
249 ssrc_cname: format!("rustpbx-{}", ssrc),
250 ssrc,
251 ice_connectivity_check: true, }
253 }
254
255 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
256 self.ssrc = ssrc;
257 self.ssrc_cname = format!("rustpbx-{}", ssrc);
258 self
259 }
260
261 pub fn with_rtp_start_port(mut self, rtp_start_port: u16) -> Self {
262 self.rtp_start_port = rtp_start_port;
263 self
264 }
265 pub fn with_rtp_end_port(mut self, rtp_end_port: u16) -> Self {
266 self.rtp_end_port = rtp_end_port;
267 self
268 }
269 pub fn with_rtp_alloc_count(mut self, rtp_alloc_count: u32) -> Self {
270 self.rtp_alloc_count = rtp_alloc_count;
271 self
272 }
273 pub fn with_local_addr(mut self, local_addr: IpAddr) -> Self {
274 self.local_addr = Some(local_addr);
275 self
276 }
277
278 pub fn with_external_addr(mut self, external_addr: IpAddr) -> Self {
279 self.external_addr = Some(external_addr);
280 self
281 }
282
283 pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
284 self.cancel_token = Some(cancel_token);
285 self
286 }
287
288 pub fn with_rtp_socket(mut self, rtp_socket: UdpConnection) -> Self {
289 self.rtp_socket = Some(rtp_socket);
290 self
291 }
292 pub fn with_rtcp_socket(mut self, rtcp_socket: UdpConnection) -> Self {
293 self.rtcp_socket = Some(rtcp_socket);
294 self
295 }
296 pub fn with_rtcp_mux(mut self, rtcp_mux: bool) -> Self {
297 self.rtcp_mux = rtcp_mux;
298 self
299 }
300
301 pub fn with_enabled_codecs(mut self, enabled_codecs: Vec<CodecType>) -> Self {
302 self.enabled_codecs = enabled_codecs;
303 self
304 }
305 pub fn with_session_name(mut self, session_name: String) -> Self {
306 self.ssrc_cname = session_name;
307 self
308 }
309
310 pub fn with_ice_connectivity_check(mut self, enabled: bool) -> Self {
311 self.ice_connectivity_check = enabled;
312 self
313 }
314 pub async fn build_rtp_rtcp_conn(&self) -> Result<(UdpConnection, UdpConnection)> {
315 let addr = match self.local_addr {
316 Some(addr) => addr,
317 None => crate::net_tool::get_first_non_loopback_interface()?,
318 };
319 let mut rtp_conn = None;
320 let mut rtcp_conn = None;
321
322 for _ in 0..self.rtp_alloc_count {
323 let port = rand::random_range::<u16, _>(self.rtp_start_port..=self.rtp_end_port);
324 if port % 2 != 0 {
325 continue;
326 }
327 if let Ok(c) = UdpConnection::create_connection(
328 format!("{:?}:{}", addr, port).parse()?,
329 None,
330 self.cancel_token.clone(),
331 )
332 .await
333 {
334 if !self.rtcp_mux {
335 rtcp_conn = match UdpConnection::create_connection(
337 format!("{:?}:{}", addr, port + 1).parse()?,
338 None,
339 self.cancel_token.clone(),
340 )
341 .await
342 {
343 Ok(c) => Some(c),
344 Err(_) => {
345 continue;
346 }
347 };
348 } else {
349 rtcp_conn = Some(c.clone());
350 }
351 rtp_conn = Some(c);
352 break;
353 }
354 }
355
356 let mut rtp_conn = match rtp_conn {
357 Some(c) => c,
358 None => return Err(anyhow::anyhow!("failed to bind RTP socket")),
359 };
360 let mut rtcp_conn = match rtcp_conn {
361 Some(c) => c,
362 None => return Err(anyhow::anyhow!("failed to bind RTCP socket")),
363 };
364
365 if let Some(addr) = self.external_addr {
366 rtp_conn.external = Some(
367 SocketAddr::new(
368 addr,
369 *rtp_conn
370 .get_addr()
371 .addr
372 .port
373 .clone()
374 .unwrap_or_default()
375 .value(),
376 )
377 .into(),
378 );
379 rtcp_conn.external = Some(
380 SocketAddr::new(
381 addr,
382 *rtcp_conn
383 .get_addr()
384 .addr
385 .port
386 .clone()
387 .unwrap_or_default()
388 .value(),
389 )
390 .into(),
391 );
392 }
393 Ok((rtp_conn, rtcp_conn))
394 }
395
396 pub async fn build(mut self) -> Result<RtpTrack> {
397 let mut rtp_socket = self.rtp_socket.take();
398 let mut rtcp_socket = self.rtcp_socket.take();
399
400 if rtp_socket.is_none() || rtcp_socket.is_none() {
401 let (rtp_conn, rtcp_conn) = self.build_rtp_rtcp_conn().await?;
402 rtp_socket = Some(rtp_conn);
403 rtcp_socket = Some(rtcp_conn);
404 }
405 let cancel_token = self
406 .cancel_token
407 .unwrap_or_else(|| CancellationToken::new());
408 let processor_chain = ProcessorChain::new(self.config.samplerate);
409 let ssrc = if self.ssrc != 0 {
410 self.ssrc
411 } else {
412 loop {
413 let i = rand::random::<u32>();
414 if i % 2 == 0 {
415 break i;
416 }
417 }
418 };
419 let inner = RtpTrackInner {
420 dtmf_payload_type: 101, payload_type: 0, remote_description: None,
423 packetizer: Mutex::new(None),
424 stats: Arc::new(RtpTrackStats::new()),
425 rtcp_mux: self.rtcp_mux,
426 remote_addr: None,
427 remote_rtcp_addr: None,
428 enabled_codecs: self.enabled_codecs.clone(),
429 rtp_map: vec![],
430 };
431 let track = RtpTrack {
432 ssrc,
433 ssrc_cname: self.ssrc_cname.clone(),
434 track_id: self.track_id,
435 config: self.config,
436 cancel_token,
437 processor_chain,
438 rtp_socket: rtp_socket.unwrap(),
439 rtcp_socket: rtcp_socket.unwrap(),
440 encoder: TrackCodec::new(),
441 sequencer: Box::new(new_random_sequencer()),
442 sendrecv: AtomicBool::new(true),
443 ice_connectivity_check: self.ice_connectivity_check,
444 inner: Arc::new(Mutex::new(inner)),
445 };
446 Ok(track)
447 }
448}
449
450impl RtpTrack {
451 pub fn id(&self) -> &str {
452 &self.track_id
453 }
454
455 pub fn ssrc(&self) -> u32 {
456 self.ssrc
457 }
458
459 pub fn remote_description(&self) -> Option<String> {
460 self.inner.lock().unwrap().remote_description.clone()
461 }
462
463 pub fn set_rtp_map(&self, rtp_map: Vec<(u8, (CodecType, u32, u16))>) {
464 if let Ok(mut inner) = self.inner.lock() {
465 inner.rtp_map = rtp_map;
466 }
467 }
468
469 pub fn set_remote_description(&self, answer: &str) -> Result<()> {
470 let mut inner = self.inner.lock().unwrap();
471 let mut reader = Cursor::new(answer);
472 let sdp = SessionDescription::unmarshal(&mut reader)?;
473 let peer_media = match select_peer_media(&sdp, "audio") {
474 Some(peer_media) => peer_media,
475 None => return Err(anyhow::anyhow!("no audio media in answer SDP")),
476 };
477
478 inner.rtp_map = peer_media.rtp_map.clone();
479
480 if peer_media.codecs.is_empty() {
481 return Err(anyhow::anyhow!("no audio codecs in answer SDP"));
482 }
483
484 if peer_media.rtp_addr.is_empty() {
485 return Err(anyhow::anyhow!("no rtp addr in answer SDP"));
486 }
487
488 inner.remote_description.replace(answer.to_string());
489
490 let remote_addr = SipAddr {
491 addr: HostWithPort {
492 host: peer_media.rtp_addr.parse()?,
493 port: Some(peer_media.rtp_port.into()),
494 },
495 r#type: Some(rsip::transport::Transport::Udp),
496 };
497 let remote_rtcp_addr = SipAddr {
498 addr: HostWithPort {
499 host: peer_media.rtcp_addr.parse()?,
500 port: Some(peer_media.rtcp_port.into()),
501 },
502 r#type: Some(rsip::transport::Transport::Udp),
503 };
504 let codec_type = peer_media.codecs[0];
505 info!(
506 track_id = self.track_id,
507 rtcp_mux = peer_media.rtcp_mux,
508 %remote_addr,
509 %remote_rtcp_addr,
510 ?codec_type,
511 ssrc = self.ssrc,
512 "set remote description"
513 );
514
515 inner.payload_type = codec_type.payload_type();
516 inner.enabled_codecs = vec![codec_type];
517 for (payload_type, (codec, clock_rate, _)) in peer_media.rtp_map.iter() {
518 if *codec == codec_type {
519 inner.payload_type = *payload_type;
520 }
521
522 if codec == &CodecType::TelephoneEvent && clock_rate == &codec_type.clock_rate() {
523 inner.dtmf_payload_type = *payload_type;
524 }
525 }
526
527 inner.remote_addr.replace(remote_addr);
528 inner.remote_rtcp_addr.replace(remote_rtcp_addr);
529 inner.rtcp_mux = peer_media.rtcp_mux;
530
531 let payloader = match codec_type {
532 #[cfg(feature = "opus")]
533 CodecType::Opus => Box::<webrtc::rtp::codecs::opus::OpusPayloader>::default()
534 as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
535 _ => Box::<G7xxPayloader>::default()
536 as Box<dyn webrtc::rtp::packetizer::Payloader + Send + Sync>,
537 };
538
539 inner
540 .packetizer
541 .lock()
542 .unwrap()
543 .replace(Box::new(new_packetizer(
544 RTP_OUTBOUND_MTU,
545 inner.payload_type,
546 self.ssrc,
547 payloader,
548 self.sequencer.clone(),
549 codec_type.clock_rate(),
550 )));
551 Ok(())
552 }
553
554 pub fn local_description(&self) -> Result<String> {
555 let socketaddr: SocketAddr = self.rtp_socket.get_addr().addr.to_owned().try_into()?;
556 let mut sdp = SessionDescription::default();
557
558 sdp.version = 0;
560 sdp.origin = Origin {
561 username: "-".to_string(),
562 session_id: 0,
563 session_version: 0,
564 network_type: "IN".to_string(),
565 address_type: "IP4".to_string(),
566 unicast_address: socketaddr.ip().to_string(),
567 };
568 sdp.session_name = "-".to_string();
569 sdp.connection_information = Some(ConnectionInformation {
570 address_type: "IP4".to_string(),
571 network_type: "IN".to_string(),
572 address: Some(Address {
573 address: socketaddr.ip().to_string(),
574 ttl: None,
575 range: None,
576 }),
577 });
578 sdp.time_descriptions.push(TimeDescription {
579 timing: Timing {
580 start_time: 0,
581 stop_time: 0,
582 },
583 repeat_times: vec![],
584 });
585
586 let mut media = MediaDescription::default();
588 media.media_name = MediaName {
589 media: "audio".to_string(),
590 port: RangedPort {
591 value: socketaddr.port() as isize,
592 range: None,
593 },
594 protos: vec!["RTP".to_string(), "AVP".to_string()],
595 formats: vec![],
596 };
597 let inner = self.inner.lock().unwrap();
598 for codec in inner.enabled_codecs.iter() {
599 if codec == &CodecType::TelephoneEvent {
600 continue;
601 }
602 let mut payload_type = codec.payload_type();
604 for (payload_typ, (rtp_map_codec, _, _)) in inner.rtp_map.iter() {
605 if *rtp_map_codec == *codec {
606 payload_type = *payload_typ;
607 break;
608 }
609 }
610
611 media.media_name.formats.push(payload_type.to_string());
612 media.attributes.push(Attribute {
613 key: "rtpmap".to_string(),
614 value: Some(format!("{} {}", payload_type, codec.rtpmap())),
615 });
616 if let Some(fmtp) = codec.fmtp() {
617 media.attributes.push(Attribute {
618 key: "fmtp".to_string(),
619 value: Some(format!("{} {}", payload_type, fmtp)),
620 });
621 }
622 }
623
624 let has_8khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 8000);
627 let has_48khz_codec = inner.enabled_codecs.iter().any(|c| c.clock_rate() == 48000);
628
629 if has_8khz_codec {
630 let mut payload_type = 101;
632 for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
633 if *codec == CodecType::TelephoneEvent && *clock_rate == 8000 {
634 payload_type = *typ;
635 break;
636 }
637 }
638 media.media_name.formats.push(payload_type.to_string());
639 media.attributes.push(Attribute {
640 key: "rtpmap".to_string(),
641 value: Some(format!("{} telephone-event/8000", payload_type)),
642 });
643 media.attributes.push(Attribute {
644 key: "fmtp".to_string(),
645 value: Some(format!("{} 0-16", payload_type)),
646 });
647 }
648
649 if has_48khz_codec {
650 let mut payload_type = 97;
651 for (typ, (codec, clock_rate, _)) in inner.rtp_map.iter() {
652 if *codec == CodecType::TelephoneEvent && *clock_rate == 48000 {
653 payload_type = *typ;
654 break;
655 }
656 }
657
658 media.media_name.formats.push(payload_type.to_string());
659 media.attributes.push(Attribute {
660 key: "rtpmap".to_string(),
661 value: Some(format!("{} telephone-event/48000", payload_type)),
662 });
663 media.attributes.push(Attribute {
664 key: "fmtp".to_string(),
665 value: Some(format!("{} 0-16", payload_type)),
666 });
667 }
668
669 if inner.rtcp_mux {
671 media.attributes.push(Attribute {
672 key: ATTR_KEY_RTCPMUX.to_string(),
673 value: None,
674 });
675 }
676 media.attributes.push(Attribute {
677 key: ATTR_KEY_SSRC.to_string(),
678 value: Some(if self.ssrc_cname.is_empty() {
679 self.ssrc.to_string()
680 } else {
681 format!("{} cname:{}", self.ssrc, self.ssrc_cname)
682 }),
683 });
684 if self.sendrecv.load(Ordering::Relaxed) {
685 media.attributes.push(Attribute {
686 key: ATTR_KEY_SEND_RECV.to_string(),
687 value: None,
688 });
689 } else {
690 media.attributes.push(Attribute {
691 key: ATTR_KEY_SEND_ONLY.to_string(),
692 value: None,
693 });
694 }
695 media.attributes.push(Attribute {
696 key: "ptime".to_string(),
697 value: Some(format!("{}", self.config.ptime.as_millis())),
698 });
699 sdp.media_descriptions.push(media);
700 Ok(sdp.marshal())
701 }
702
703 pub async fn send_dtmf(&self, digit: &str, duration_ms: Option<u64>) -> Result<()> {
705 let event_code = match digit {
707 "0" => 0,
708 "1" => 1,
709 "2" => 2,
710 "3" => 3,
711 "4" => 4,
712 "5" => 5,
713 "6" => 6,
714 "7" => 7,
715 "8" => 8,
716 "9" => 9,
717 "*" => 10,
718 "#" => 11,
719 "A" => 12,
720 "B" => 13,
721 "C" => 14,
722 "D" => 15,
723 _ => return Err(anyhow::anyhow!("Invalid DTMF digit")),
724 };
725 let inner = self.inner.lock().unwrap();
726 let socket = &self.rtp_socket;
727 let remote_addr = match inner.remote_addr.as_ref() {
728 Some(addr) => addr.clone(),
729 None => return Err(anyhow::anyhow!("Remote address not set")),
730 };
731
732 let duration = duration_ms.unwrap_or(DTMF_EVENT_DURATION_MS);
734
735 let num_packets = (duration as f64 / self.config.ptime.as_millis() as f64).ceil() as u32;
738
739 let samples_per_packet =
741 (self.config.samplerate as f64 * self.config.ptime.as_secs_f64()) as u32;
742
743 let now = crate::media::get_timestamp();
744 inner
745 .stats
746 .last_timestamp_update
747 .store(now, Ordering::Relaxed);
748
749 for i in 0..num_packets {
751 let is_end = i == num_packets - 1;
752 let event_duration = i * (self.config.ptime.as_millis() as u32 * 8); let mut payload = vec![0u8; 4];
757 payload[0] = event_code;
758 payload[1] = DTMF_EVENT_VOLUME & 0x3F; if is_end {
760 payload[1] |= 0x80; }
762
763 payload[2] = ((event_duration >> 8) & 0xFF) as u8;
765 payload[3] = (event_duration & 0xFF) as u8;
766
767 let packets = match inner.packetizer.lock().unwrap().as_mut() {
768 Some(p) => p.packetize(&Bytes::from_owner(payload), samples_per_packet)?,
769 None => return Err(anyhow::anyhow!("Packetizer not set")),
770 };
771 for mut packet in packets {
772 packet.header.payload_type = inner.dtmf_payload_type;
773 packet.header.marker = false;
774
775 match packet.marshal() {
776 Ok(ref rtp_data) => {
777 match socket.send_raw(rtp_data, &remote_addr).await {
778 Ok(_) => {}
779 Err(e) => {
780 warn!("Failed to send DTMF RTP packet: {}", e);
781 }
782 }
783
784 inner.stats.packet_count.fetch_add(1, Ordering::Relaxed);
786 inner
787 .stats
788 .octet_count
789 .fetch_add(rtp_data.len() as u32, Ordering::Relaxed);
790
791 if !is_end {
793 tokio::time::sleep(self.config.ptime).await;
794 }
795 }
796 Err(e) => {
797 warn!("Failed to create DTMF RTP packet: {:?}", e);
798 continue;
799 }
800 }
801 }
802 }
803
804 inner
806 .stats
807 .timestamp
808 .fetch_add(samples_per_packet * num_packets, Ordering::Relaxed);
809
810 Ok(())
811 }
812
813 async fn send_ice_connectivity_check(
815 socket: &UdpConnection,
816 remote_addr: &SipAddr,
817 ) -> Result<()> {
818 let mut stun_packet = vec![0u8; 20]; stun_packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
820 stun_packet[2..4].copy_from_slice(&0u16.to_be_bytes());
821 stun_packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
822 let transaction_id: [u8; STUN_TRANSACTION_ID_SIZE] = rand::random();
823 stun_packet[8..20].copy_from_slice(&transaction_id);
824
825 socket.send_raw(&stun_packet, remote_addr).await.ok();
826 Ok(())
827 }
828
829 async fn handle_rtcp_packet(
830 track_id: &TrackId,
831 buf: &[u8],
832 n: usize,
833 stats: &Arc<RtpTrackStats>,
834 ssrc: u32,
835 ) -> Result<()> {
836 use webrtc::rtcp::packet::unmarshal;
837
838 let mut buf_slice = &buf[0..n];
839 let packets = match unmarshal(&mut buf_slice) {
840 Ok(packets) => packets,
841 Err(e) => {
842 warn!(track_id, "Failed to parse RTCP packet: {:?}", e);
843 return Ok(());
844 }
845 };
846
847 for packet in packets {
848 if let Some(sr) = packet.as_any().downcast_ref::<SenderReport>() {
849 stats.store_sr_info(sr.rtp_time as u64, sr.ntp_time);
850 info!(
851 track_id,
852 ssrc = sr.ssrc,
853 packet_count = sr.packet_count,
854 octet_count = sr.octet_count,
855 rtp_time = sr.rtp_time,
856 "Received SR"
857 );
858 } else if let Some(rr) = packet.as_any().downcast_ref::<ReceiverReport>() {
859 for report in &rr.reports {
860 if report.ssrc == ssrc {
861 let packet_loss = report.fraction_lost;
862 let total_lost = report.total_lost;
863 let jitter = report.jitter;
864
865 info!(
866 track_id,
867 ssrc = report.ssrc,
868 fraction_lost = packet_loss,
869 total_lost = total_lost,
870 jitter = jitter,
871 last_sequence_number = report.last_sequence_number,
872 "Received RR for our stream"
873 );
874
875 if packet_loss > 50 {
876 warn!(track_id, "High packet loss detected: {}%", packet_loss);
877 }
878 }
879 }
880 } else if let Some(_) = packet.as_any().downcast_ref::<SourceDescription>() {
881 } else {
882 debug!(
883 track_id,
884 packet_type = %packet.header().packet_type,
885 "Received other RTCP packet type"
886 );
887 }
888 }
889
890 Ok(())
891 }
892
893 async fn classify_packet(
894 track_id: &TrackId,
895 buf: &[u8],
896 n: usize,
897 stats: &Arc<RtpTrackStats>,
898 ssrc: u32,
899 ) -> PacketKind {
900 if n >= 20 {
902 let msg_type = u16::from_be_bytes([buf[0], buf[1]]);
903 let msg_length = u16::from_be_bytes([buf[2], buf[3]]);
904 let magic_cookie = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
905
906 if magic_cookie == STUN_MAGIC_COOKIE
907 || ((msg_type & 0xC000) == 0x0000 && (msg_length as usize + 20) <= n)
908 {
909 debug!(
910 track_id = track_id.as_str(),
911 "Received STUN packet with message type: 0x{:04X}, length: {}", msg_type, n
912 );
913 return PacketKind::Stun(msg_type);
914 }
915 }
916
917 let version = (buf[0] >> 6) & 0x03;
919 let rtcp_pt = buf[1];
920 if version == 2 && rtcp_pt >= 200 && rtcp_pt <= 207 {
921 if let Err(e) = Self::handle_rtcp_packet(track_id, buf, n, stats, ssrc).await {
922 warn!(
923 track_id = track_id.as_str(),
924 "Failed to handle RTCP packet: {:?}", e
925 );
926 }
927 return PacketKind::Rtcp;
928 }
929
930 let rtp_pt = buf[1] & 0x7F;
932 if version != 2 {
933 info!(
934 track_id = track_id.as_str(),
935 "Received packet with invalid RTP version: {}, skipping", version
936 );
937 return PacketKind::Ignore;
938 }
939
940 if rtp_pt >= 128 {
941 debug!(
942 track_id = track_id.as_str(),
943 "Received packet with invalid RTP payload type: {}, might be unrecognized protocol",
944 rtp_pt
945 );
946 return PacketKind::Ignore;
947 }
948
949 PacketKind::Rtp
950 }
951
952 async fn recv_rtp_packets(
953 inner: Arc<Mutex<RtpTrackInner>>,
954 ptime: Duration,
955 rtp_socket: UdpConnection,
956 track_id: TrackId,
957 processor_chain: ProcessorChain,
958 packet_sender: TrackPacketSender,
959 _rtcp_socket: UdpConnection,
960 ssrc: u32,
961 ) -> Result<()> {
962 let mut buf = vec![0u8; RTP_MTU];
963 let mut send_ticker = tokio::time::interval(ptime);
964 let mut jitter = JitterBuffer::new();
965 let stats = inner.lock().unwrap().stats.clone();
966
967 loop {
968 select! {
969 Ok((n, src_addr)) = rtp_socket.recv_raw(&mut buf) => {
970 if n == 0 {
971 continue;
972 }
973
974
975 let packet_kind = Self::classify_packet(&track_id, &buf, n, &stats, ssrc).await;
976 match packet_kind {
977 PacketKind::Stun(msg_type) => {
978 let force = msg_type == STUN_BINDING_RESPONSE;
979 Self::maybe_update_remote_addr(&inner, &src_addr, force, &track_id, "stun");
980 continue;
981 }
982 PacketKind::Rtcp => {
983 Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtcp");
984 continue;
985 }
986 PacketKind::Ignore => {
987 continue;
988 }
989 PacketKind::Rtp => {
990 Self::maybe_update_remote_addr(&inner, &src_addr, false, &track_id, "rtp-private");
991 }
992 }
993 let packet = match Packet::unmarshal(&mut &buf[0..n]) {
994 Ok(packet) => packet,
995 Err(e) => {
996 info!(track_id, "Error creating RTP reader: {:?}", e);
997 continue;
998 }
999 };
1000
1001 let seq_num = packet.header.sequence_number as u32;
1002 let payload_len = packet.payload.len() as u32;
1003 stats.update_receive_stats(seq_num, payload_len);
1004
1005 let payload_type = packet.header.payload_type;
1006 let payload = packet.payload.to_vec();
1007 let sample_rate = match payload_type {
1008 9 => 16000, 111 => 48000, _ => 8000,
1011 };
1012
1013 let frame = AudioFrame {
1014 track_id: track_id.clone(),
1015 samples: Samples::RTP {
1016 payload_type,
1017 payload,
1018 sequence_number: packet.header.sequence_number.into(),
1019 },
1020 timestamp: crate::media::get_timestamp(),
1021 sample_rate,
1022 };
1023
1024 jitter.push(frame);
1025 }
1026 _ = send_ticker.tick() => {
1027 let mut frame = match jitter.pop() {
1028 Some(f) => f,
1029 None => continue,
1030 };
1031
1032 if let Err(e) = processor_chain.process_frame(&mut frame) {
1033 warn!(track_id, "Failed to process frame: {}", e);
1034 break;
1035 }
1036 match packet_sender.send(frame) {
1037 Ok(_) => {}
1038 Err(e) => {
1039 warn!(track_id, "Error sending audio frame: {}", e);
1040 break;
1041 }
1042 }
1043 }
1044 }
1045 }
1046 Ok(())
1047 }
1048
1049 fn maybe_update_remote_addr(
1050 inner: &Arc<Mutex<RtpTrackInner>>,
1051 src_addr: &SipAddr,
1052 force: bool,
1053 track_id: &TrackId,
1054 reason: &'static str,
1055 ) -> bool {
1056 let mut guard = inner.lock().unwrap();
1057 let src_ip = Self::sip_addr_ip(src_addr);
1058
1059 let should_update = if force {
1060 true
1061 } else {
1062 match (guard.remote_addr.as_ref(), src_ip) {
1063 (Some(remote), Some(src_ip)) => match Self::sip_addr_ip(remote) {
1064 Some(remote_ip) => remote_ip != src_ip && Self::is_private_ip(&remote_ip),
1065 None => false,
1066 },
1067 (None, _) => true,
1068 _ => false,
1069 }
1070 };
1071
1072 if should_update {
1073 let old = guard.remote_addr.replace(src_addr.clone());
1074 if guard.rtcp_mux {
1075 guard.remote_rtcp_addr = Some(src_addr.clone());
1076 } else if let Some(rtcp_addr) = guard.remote_rtcp_addr.as_mut() {
1077 rtcp_addr.addr.host = src_addr.addr.host.clone();
1078 }
1079 info!(
1080 track_id = track_id.as_str(),
1081 ?old,
1082 ?src_addr,
1083 reason = reason,
1084 "Updating remote RTP address"
1085 );
1086 return true;
1087 }
1088 false
1089 }
1090
1091 fn sip_addr_ip(addr: &SipAddr) -> Option<IpAddr> {
1092 addr.addr.host.to_string().parse().ok()
1093 }
1094
1095 fn is_private_ip(ip: &IpAddr) -> bool {
1096 match ip {
1097 IpAddr::V4(v4) => {
1098 v4.is_private()
1099 || v4.is_loopback()
1100 || v4.is_link_local()
1101 || v4.is_broadcast()
1102 || v4.is_documentation()
1103 || v4.is_unspecified()
1104 }
1105 IpAddr::V6(v6) => {
1106 v6.is_unique_local()
1107 || v6.is_loopback()
1108 || v6.is_unspecified()
1109 || v6.is_unicast_link_local()
1110 }
1111 }
1112 }
1113
1114 async fn send_rtcp_reports(
1116 inner: Arc<Mutex<RtpTrackInner>>,
1117 track_id: TrackId,
1118 token: CancellationToken,
1119 rtcp_socket: &UdpConnection,
1120 ssrc: u32,
1121 ssrc_cname: String,
1122 ) -> Result<()> {
1123 let mut interval = interval_at(
1124 Instant::now() + Duration::from_millis(RTCP_SR_INTERVAL_MS),
1125 Duration::from_millis(RTCP_SR_INTERVAL_MS),
1126 );
1127 let stats = inner.lock().unwrap().stats.clone();
1128 let mut last_sent_octets = stats.octet_count.load(Ordering::Relaxed);
1129 let mut last_recv_octets = stats.received_octets.load(Ordering::Relaxed);
1130 let mut last_rate_instant = Instant::now();
1131 loop {
1132 select! {
1133 _ = token.cancelled() => {
1134 info!(track_id, "RTCP reports task cancelled");
1135 break;
1136 }
1137 _ = interval.tick() => {
1138 let packet_count = stats.packet_count.load(Ordering::Relaxed);
1140 let octet_count = stats.octet_count.load(Ordering::Relaxed);
1141 let rtp_timestamp = stats.timestamp.load(Ordering::Relaxed);
1142
1143 let sent_octets = octet_count;
1144 let recv_octets = stats.received_octets.load(Ordering::Relaxed);
1145 let now = Instant::now();
1146 let elapsed = now.saturating_duration_since(last_rate_instant).as_secs_f64();
1147 if elapsed > 0.0 {
1148 let delta_sent = if sent_octets >= last_sent_octets {
1149 (sent_octets - last_sent_octets) as u64
1150 } else {
1151 (u32::MAX as u64 - last_sent_octets as u64) + sent_octets as u64 + 1
1152 };
1153 let delta_recv = if recv_octets >= last_recv_octets {
1154 (recv_octets - last_recv_octets) as u64
1155 } else {
1156 (u32::MAX as u64 - last_recv_octets as u64) + recv_octets as u64 + 1
1157 };
1158
1159 let send_bps = (delta_sent as f64 * 8.0) / elapsed;
1160 let recv_bps = (delta_recv as f64 * 8.0) / elapsed;
1161 let received_packets = stats.received_packets.load(Ordering::Relaxed);
1162 let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1163 let expected_packets = stats.expected_packets.load(Ordering::Relaxed);
1164 let fraction_lost = stats.get_fraction_lost();
1165 let loss_pct = (fraction_lost as f64) * 100.0 / 256.0;
1166 let jitter = stats.jitter.load(Ordering::Relaxed);
1167
1168 info!(
1169 track_id = track_id.as_str(),
1170 send_kbps = send_bps / 1000.0,
1171 recv_kbps = recv_bps / 1000.0,
1172 sent_packets = packet_count,
1173 recv_packets = received_packets,
1174 expected_packets,
1175 lost_packets,
1176 loss_pct,
1177 jitter,
1178 "RTP throughput"
1179 );
1180
1181 last_rate_instant = now;
1182 last_sent_octets = sent_octets;
1183 last_recv_octets = recv_octets;
1184 }
1185
1186 let mut pkts = vec![Box::new(SenderReport {
1187 ssrc,
1188 ntp_time: Instant::now().elapsed().as_secs() as u64,
1189 rtp_time: rtp_timestamp,
1190 packet_count,
1191 octet_count,
1192 profile_extensions: Bytes::new(),
1193 reports: vec![],
1194 })
1195 as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1196
1197 if !ssrc_cname.is_empty() {
1198 pkts.push(Box::new(SourceDescription {
1199 chunks: vec![SourceDescriptionChunk {
1200 source: ssrc,
1201 items: vec![SourceDescriptionItem {
1202 sdes_type: SdesType::SdesCname,
1203 text: ssrc_cname.clone().into(),
1204 }],
1205 }],
1206 })
1207 as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1208 }
1209
1210 let received_packets = stats.received_packets.load(Ordering::Relaxed);
1211 let lost_packets = stats.lost_packets.load(Ordering::Relaxed);
1212 let highest_seq = stats.highest_seq_num.load(Ordering::Relaxed);
1213 let jitter = stats.jitter.load(Ordering::Relaxed);
1214 let fraction_lost = stats.get_fraction_lost();
1215
1216 if received_packets > 0 || lost_packets > 0 {
1217 let remote_ssrc = ssrc + 1;
1218 let report = ReceptionReport {
1219 ssrc: remote_ssrc,
1220 fraction_lost,
1221 total_lost: lost_packets,
1222 last_sequence_number: highest_seq,
1223 jitter,
1224 last_sender_report: (stats.last_sr_timestamp.load(Ordering::Relaxed) >> 16) as u32,
1225 delay: 0,
1226 };
1227
1228 let rr = ReceiverReport {
1229 ssrc,
1230 reports: vec![report],
1231 profile_extensions: Bytes::new(),
1232 };
1233 pkts.push(Box::new(rr) as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>);
1234 }
1235
1236 let rtcp_data = webrtc::rtcp::packet::marshal(&pkts)?;
1237 let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1238 match remote_rtcp_addr{
1239 Some(ref addr) => {
1240 if let Err(e) = rtcp_socket.send_raw(&rtcp_data, addr).await {
1241 warn!(track_id, "Failed to send RTCP report: {}", e);
1242 }
1243 }
1244 None => {}
1245 }
1246 }
1247 }
1248 }
1249 Ok(())
1250 }
1251
1252 async fn try_ice_connectivity_check(&self) {
1253 let remote_addr = self.inner.lock().unwrap().remote_addr.clone();
1254 let remote_rtcp_addr = self.inner.lock().unwrap().remote_rtcp_addr.clone();
1255
1256 if let Some(ref addr) = remote_addr {
1257 Self::send_ice_connectivity_check(&self.rtp_socket, addr)
1258 .await
1259 .ok();
1260 if let Some(ref rtcp_addr) = remote_rtcp_addr {
1261 if rtcp_addr != addr {
1262 Self::send_ice_connectivity_check(&self.rtcp_socket, rtcp_addr)
1263 .await
1264 .ok();
1265 }
1266 }
1267 }
1268 }
1269}
1270
1271#[async_trait]
1272impl Track for RtpTrack {
1273 fn ssrc(&self) -> u32 {
1274 self.ssrc
1275 }
1276 fn id(&self) -> &TrackId {
1277 &self.track_id
1278 }
1279 fn config(&self) -> &TrackConfig {
1280 &self.config
1281 }
1282 fn processor_chain(&mut self) -> &mut ProcessorChain {
1283 &mut self.processor_chain
1284 }
1285
1286 async fn handshake(&mut self, offer: String, _timeout: Option<Duration>) -> Result<String> {
1287 self.set_remote_description(&offer)?;
1288 self.local_description()
1289 }
1290
1291 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
1292 self.set_remote_description(&answer).ok();
1293
1294 if self.ice_connectivity_check {
1295 self.try_ice_connectivity_check().await;
1296 }
1297 Ok(())
1298 }
1299
1300 async fn start(
1301 &self,
1302 event_sender: EventSender,
1303 packet_sender: TrackPacketSender,
1304 ) -> Result<()> {
1305 let track_id = self.track_id.clone();
1306 let rtcp_socket = self.rtcp_socket.clone();
1307 let ssrc = self.ssrc;
1308 let rtp_socket = self.rtp_socket.clone();
1309 let processor_chain = self.processor_chain.clone();
1310 let token = self.cancel_token.clone();
1311 let ssrc_cname = self.ssrc_cname.clone();
1312 let start_time = crate::media::get_timestamp();
1313 let ptime = self.config.ptime;
1314
1315 if self.ice_connectivity_check {
1317 self.try_ice_connectivity_check().await;
1318 }
1319
1320 let inner = self.inner.clone();
1321
1322 tokio::spawn(async move {
1323 select! {
1324 _ = token.cancelled() => {
1325 debug!(track_id, "RTP processor task cancelled");
1326 },
1327 _ = Self::send_rtcp_reports(inner.clone(),track_id.clone(), token.clone(), &rtcp_socket, ssrc, ssrc_cname) => {
1328 }
1329 _ = Self::recv_rtp_packets(
1330 inner.clone(),
1331 ptime,
1332 rtp_socket,
1333 track_id.clone(),
1334 processor_chain,
1335 packet_sender,
1336 rtcp_socket.clone(),
1337 ssrc,
1338 ) => {
1339 }
1340 };
1341 let remote_rtcp_addr = inner.lock().unwrap().remote_rtcp_addr.clone();
1342 match remote_rtcp_addr {
1344 Some(ref addr) => {
1345 let pkts = vec![Box::new(Goodbye {
1346 sources: vec![ssrc],
1347 reason: "end of call".into(),
1348 })
1349 as Box<dyn webrtc::rtcp::packet::Packet + Send + Sync>];
1350 if let Ok(data) = webrtc::rtcp::packet::marshal(&pkts) {
1351 if let Err(e) = rtcp_socket.send_raw(&data, addr).await {
1352 warn!(track_id, "Failed to send RTCP goodbye packet: {}", e);
1353 }
1354 }
1355 }
1356 None => {}
1357 }
1358 info!(track_id, "RTP processor completed");
1359 event_sender
1360 .send(SessionEvent::TrackEnd {
1361 track_id,
1362 timestamp: crate::media::get_timestamp(),
1363 duration: crate::media::get_timestamp() - start_time,
1364 ssrc,
1365 play_id: None,
1366 })
1367 .ok();
1368 });
1369
1370 Ok(())
1371 }
1372
1373 async fn stop(&self) -> Result<()> {
1374 self.cancel_token.cancel();
1375 Ok(())
1376 }
1377
1378 async fn send_packet(&self, packet: &AudioFrame) -> Result<()> {
1379 let remote_addr = match self.inner.lock().unwrap().remote_addr.clone() {
1380 Some(addr) => addr,
1381 None => return Ok(()),
1382 };
1383 let stats = self.inner.lock().unwrap().stats.clone();
1384
1385 let (payload_type, payload) = self
1386 .encoder
1387 .encode(self.inner.lock().unwrap().payload_type, packet.clone());
1388 if payload.is_empty() {
1389 return Ok(());
1390 }
1391
1392 let clock_rate = match payload_type {
1393 9 => 8000, 111 => 48000, _ => 8000,
1396 };
1397
1398 let now = crate::media::get_timestamp();
1399 let last_update = stats.last_timestamp_update.load(Ordering::Relaxed);
1400 let mut skipped_packets: u32 = 0;
1401
1402 if last_update > 0 {
1403 let frame_duration_ms = self.config.ptime.as_millis() as u64;
1404 if frame_duration_ms > 0 {
1405 let delta_ms = now.saturating_sub(last_update);
1406 let delta_frames = delta_ms / frame_duration_ms;
1407 let prospective_skip = delta_frames.saturating_sub(1);
1408
1409 if prospective_skip >= RTP_RESYNC_MIN_SKIP_PACKETS as u64 {
1410 let last_resync = stats.last_resync_ts.load(Ordering::Relaxed);
1411 let cooldown_ms = frame_duration_ms.saturating_mul(RTP_RESYNC_COOLDOWN_FRAMES);
1412 if last_resync == 0 || now.saturating_sub(last_resync) >= cooldown_ms {
1413 skipped_packets = prospective_skip.min(u32::MAX as u64) as u32;
1414 debug!(
1415 track_id = self.track_id,
1416 delta_ms, skipped_packets, "Resyncing RTP timestamp"
1417 );
1418 for _ in 0..skipped_packets {
1419 self.sequencer.next_sequence_number();
1420 }
1421 stats.last_resync_ts.store(now, Ordering::Relaxed);
1422 }
1423 }
1424 }
1425 }
1426
1427 stats.last_timestamp_update.store(now, Ordering::Relaxed);
1428
1429 let samples_per_packet = (clock_rate as f64 * self.config.ptime.as_secs_f64()) as u32;
1430 let packets = match self
1431 .inner
1432 .lock()
1433 .unwrap()
1434 .packetizer
1435 .lock()
1436 .unwrap()
1437 .as_mut()
1438 {
1439 Some(p) => {
1440 if skipped_packets > 0 {
1441 let skip_samples = (skipped_packets as u64)
1442 .saturating_mul(samples_per_packet as u64)
1443 .min(u32::MAX as u64) as u32;
1444 p.skip_samples(skip_samples);
1445 }
1446 p.packetize(&Bytes::from_owner(payload), samples_per_packet)?
1447 }
1448 None => return Err(anyhow::anyhow!("Packetizer not set")),
1449 };
1450 for mut packet in packets {
1451 packet.header.marker = false;
1452 packet.header.payload_type = payload_type;
1453 match packet.marshal() {
1454 Ok(ref rtp_data) => match self.rtp_socket.send_raw(rtp_data, &remote_addr).await {
1455 Ok(_) => {
1456 stats.update_send_stats(rtp_data.len() as u32, samples_per_packet);
1457 }
1458 Err(e) => {
1459 warn!(track_id = self.track_id, "Failed to send RTP packet: {}", e);
1460 }
1461 },
1462 Err(e) => {
1463 warn!(
1464 track_id = self.track_id,
1465 "Failed to build RTP packet: {:?}", e
1466 );
1467 return Err(anyhow::anyhow!("Failed to build RTP packet"));
1468 }
1469 }
1470 }
1471 Ok(())
1472 }
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477 use super::*;
1478
1479 #[test]
1480 fn test_rtp_track_stats_new() {
1481 let stats = RtpTrackStats::new();
1482 assert_eq!(stats.packet_count.load(Ordering::Relaxed), 0);
1483 assert_eq!(stats.octet_count.load(Ordering::Relaxed), 0);
1484 assert_eq!(stats.received_packets.load(Ordering::Relaxed), 0);
1485 assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1486 assert_eq!(stats.jitter.load(Ordering::Relaxed), 0);
1487 }
1488
1489 #[test]
1490 fn test_update_send_stats() {
1491 let stats = RtpTrackStats::new();
1492 stats.update_send_stats(1200, 160);
1493
1494 assert_eq!(stats.packet_count.load(Ordering::Relaxed), 1);
1495 assert_eq!(stats.octet_count.load(Ordering::Relaxed), 1200);
1496 assert_eq!(stats.timestamp.load(Ordering::Relaxed), 160);
1497
1498 stats.update_send_stats(800, 160);
1500 assert_eq!(stats.packet_count.load(Ordering::Relaxed), 2);
1501 assert_eq!(stats.octet_count.load(Ordering::Relaxed), 2000);
1502 assert_eq!(stats.timestamp.load(Ordering::Relaxed), 320);
1503 }
1504
1505 #[test]
1506 fn test_update_receive_stats() {
1507 let stats = RtpTrackStats::new();
1508
1509 stats.update_receive_stats(1000, 160);
1511 assert_eq!(stats.received_packets.load(Ordering::Relaxed), 1);
1512 assert_eq!(stats.received_octets.load(Ordering::Relaxed), 160);
1513 assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1000);
1514 assert_eq!(stats.base_seq.load(Ordering::Relaxed), 1000);
1515 assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1000);
1516 assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 1);
1517 assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 0);
1518
1519 stats.update_receive_stats(1002, 160);
1521 assert_eq!(stats.received_packets.load(Ordering::Relaxed), 2);
1522 assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1002);
1523 assert_eq!(stats.last_receive_seq.load(Ordering::Relaxed), 1002);
1524 assert_eq!(stats.lost_packets.load(Ordering::Relaxed), 1);
1525 assert_eq!(stats.expected_packets.load(Ordering::Relaxed), 3);
1526 }
1527
1528 #[test]
1529 fn test_get_fraction_lost() {
1530 let stats = RtpTrackStats::new();
1531
1532 assert_eq!(stats.get_fraction_lost(), 0);
1534
1535 stats.expected_packets.store(100, Ordering::Relaxed);
1537 stats.lost_packets.store(5, Ordering::Relaxed);
1538
1539 let fraction_lost = stats.get_fraction_lost();
1540 assert_eq!(fraction_lost, 12); stats.lost_packets.store(100, Ordering::Relaxed);
1544 assert_eq!(stats.get_fraction_lost(), 255); }
1546
1547 #[test]
1548 fn test_store_sr_info() {
1549 let stats = RtpTrackStats::new();
1550 stats.store_sr_info(123456, 789012);
1551
1552 assert_eq!(stats.last_sr_timestamp.load(Ordering::Relaxed), 123456);
1553 assert_eq!(stats.last_sr_ntp.load(Ordering::Relaxed), 789012);
1554 }
1555
1556 #[tokio::test]
1557 async fn test_parse_pjsip_sdp() {
1558 let sdp = r#"v=0
1559o=- 3954304612 3954304613 IN IP4 192.168.1.202
1560s=pjmedia
1561b=AS:117
1562t=0 0
1563a=X-nat:3
1564m=audio 4002 RTP/AVP 9 101
1565c=IN IP4 192.168.1.202
1566b=TIAS:96000
1567a=rtcp:4003 IN IP4 192.168.1.202
1568a=sendrecv
1569a=rtpmap:9 G722/8000
1570a=ssrc:1089147397 cname:61753255553b9c6f
1571a=rtpmap:101 telephone-event/8000
1572a=fmtp:101 0-16"#;
1573 let rtp_track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1574 .build()
1575 .await
1576 .expect("Failed to build rtp track");
1577 rtp_track
1578 .set_remote_description(sdp)
1579 .expect("Failed to set remote description");
1580 let inner = rtp_track.inner.lock().unwrap();
1581 assert_eq!(inner.payload_type, 9);
1582 assert!(!inner.rtcp_mux); }
1584
1585 #[tokio::test]
1586 async fn test_parse_rtcp_mux() {
1587 let answer = r#"v=0
1588o=- 723884243 723884244 IN IP4 11.22.33.44
1589s=-
1590c=IN IP4 11.22.33.44
1591t=0 0
1592m=audio 10638 RTP/AVP 8 101
1593a=rtpmap:8 PCMA/8000
1594a=rtpmap:101 telephone-event/8000
1595a=fmtp:101 0-15
1596a=sendrecv
1597a=rtcp-mux"#;
1598 let mut reader = Cursor::new(answer);
1599 let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1600 let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1601 assert!(peer_media.rtcp_mux);
1602 assert_eq!(peer_media.rtcp_port, 10638);
1603 }
1604
1605 #[tokio::test]
1606 async fn test_parse_linphone_candidate() {
1607 let answer = r#"v=0
1608o=mpi 2590 792 IN IP4 192.168.3.181
1609s=Talk
1610c=IN IP4 192.168.3.181
1611t=0 0
1612a=ice-pwd:96adb77560869c783656fe0a
1613a=ice-ufrag:409dfd53
1614a=rtcp-xr:rcvr-rtt=all:10000 stat-summary=loss,dup,jitt,TTL voip-metrics
1615a=record:off
1616m=audio 61794 RTP/AVP 8 101
1617c=IN IP4 115.205.103.101
1618a=rtpmap:101 telephone-event/8000
1619a=rtcp:50735
1620a=candidate:1 1 UDP 2130706303 192.168.3.181 61794 typ host
1621a=candidate:1 2 UDP 2130706302 192.168.3.181 50735 typ host
1622a=candidate:2 1 UDP 1694498687 115.205.103.101 61794 typ srflx raddr 192.168.3.181 rport 61794
1623a=candidate:2 2 UDP 1694498686 115.205.103.101 50735 typ srflx raddr 192.168.3.181 rport 50735
1624a=rtcp-fb:* trr-int 5000
1625a=rtcp-fb:* ccm tmmbr"#;
1626 let mut reader = Cursor::new(answer);
1627 let sdp = SessionDescription::unmarshal(&mut reader).expect("Failed to parse SDP");
1628 let peer_media = select_peer_media(&sdp, "audio").expect("Failed to select_peer_media");
1629 assert_eq!(peer_media.rtp_addr, "192.168.3.181");
1630 }
1631
1632 #[tokio::test]
1633 async fn test_rtp_track_builder() {
1634 let track_id = "test_track".to_string();
1635 let config = TrackConfig::default();
1636
1637 let track = RtpTrackBuilder::new(track_id.clone(), config)
1638 .with_rtp_start_port(20000)
1639 .with_rtp_end_port(20100)
1640 .with_session_name("test_session".to_string())
1641 .build()
1642 .await
1643 .expect("Failed to build track");
1644
1645 assert_eq!(track.track_id, track_id);
1646 assert_ne!(track.ssrc, 0); assert_eq!(track.ssrc_cname, "test_session");
1649 let inner = track.inner.lock().unwrap();
1650 assert!(inner.rtcp_mux);
1651 }
1652
1653 #[tokio::test]
1654 async fn test_local_description_generation() {
1655 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1656 .build()
1657 .await
1658 .expect("Failed to build track");
1659
1660 let local_desc = track
1661 .local_description()
1662 .expect("Failed to generate local description");
1663
1664 assert!(local_desc.contains("m=audio"));
1666 assert!(local_desc.contains("RTP/AVP"));
1667 assert!(local_desc.contains("a=rtcp-mux")); assert!(local_desc.contains("a=sendrecv"));
1669 assert!(local_desc.contains(&format!("a=ssrc:{}", track.ssrc)));
1670 }
1671
1672 #[tokio::test]
1673 async fn test_double_set_remote_description() {
1674 let sdp = r#"v=0
1675o=- 123 124 IN IP4 192.168.1.1
1676s=-
1677c=IN IP4 192.168.1.1
1678t=0 0
1679m=audio 5004 RTP/AVP 0
1680a=rtpmap:0 PCMU/8000"#;
1681
1682 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1683 .build()
1684 .await
1685 .expect("Failed to build track");
1686
1687 assert!(track.set_remote_description(sdp).is_ok());
1689 assert!(track.remote_description().is_some());
1690
1691 assert!(track.set_remote_description(sdp).is_ok());
1693 }
1694
1695 #[tokio::test]
1696 async fn test_invalid_sdp() {
1697 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1698 .build()
1699 .await
1700 .expect("Failed to build track");
1701
1702 let invalid_sdp = r#"v=0
1704o=- 123 124 IN IP4 192.168.1.1
1705s=-
1706c=IN IP4 192.168.1.1
1707t=0 0"#;
1708
1709 assert!(track.set_remote_description(invalid_sdp).is_err());
1710 }
1711
1712 #[tokio::test]
1713 async fn test_dtmf_digit_mapping() {
1714 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1715 .build()
1716 .await
1717 .expect("Failed to build track");
1718
1719 let valid_digits = [
1721 "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "*", "#", "A", "B", "C", "D",
1722 ];
1723
1724 for digit in &valid_digits {
1725 let result = track.send_dtmf(digit, Some(100)).await;
1728 assert!(result.is_err());
1729 let error_msg = result.unwrap_err().to_string();
1730 assert!(error_msg.contains("Remote address not set"));
1731 }
1732
1733 let result = track.send_dtmf("X", Some(100)).await;
1735 assert!(result.is_err());
1736 let error_msg = result.unwrap_err().to_string();
1737 assert!(error_msg.contains("Invalid DTMF digit"));
1738 }
1739
1740 #[test]
1741 fn test_rtcp_packet_type_detection() {
1742 assert!(200 >= 200 && 200 <= 207); assert!(201 >= 200 && 201 <= 207); assert!(202 >= 200 && 202 <= 207); assert!(203 >= 200 && 203 <= 207); assert!(204 >= 200 && 204 <= 207); let rtp_byte = 0b10001001; let version = (rtp_byte >> 6) & 0x03;
1752 let pt = rtp_byte & 0x7F;
1753
1754 assert_eq!(version, 2);
1755 assert_eq!(pt, 9);
1756 }
1757
1758 #[test]
1759 fn test_stun_magic_cookie_detection() {
1760 let stun_magic_cookie = STUN_MAGIC_COOKIE;
1761 let bytes = stun_magic_cookie.to_be_bytes();
1762 let reconstructed = u32::from_be_bytes(bytes);
1763
1764 assert_eq!(reconstructed, stun_magic_cookie);
1765 }
1766
1767 #[tokio::test]
1768 async fn test_track_ssrc_and_id() {
1769 let track_id = "unique_track_123".to_string();
1770 let custom_ssrc = 0x12345678;
1771
1772 let track = RtpTrackBuilder::new(track_id.clone(), TrackConfig::default())
1773 .with_ssrc(custom_ssrc)
1774 .build()
1775 .await
1776 .expect("Failed to build track");
1777
1778 let builder =
1780 RtpTrackBuilder::new(track_id.clone(), TrackConfig::default()).with_ssrc(custom_ssrc);
1781 assert_eq!(builder.ssrc, custom_ssrc);
1782 assert_eq!(track.id(), &track_id);
1783 }
1784
1785 #[test]
1786 fn test_codec_type_payload_mapping() {
1787 assert_eq!(CodecType::PCMU.payload_type(), 0);
1789 assert_eq!(CodecType::G722.payload_type(), 9);
1790 assert_eq!(CodecType::PCMA.payload_type(), 8);
1791 assert_eq!(CodecType::TelephoneEvent.payload_type(), 101);
1792 }
1793
1794 #[tokio::test]
1795 async fn test_stats_initialization() {
1796 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1797 .build()
1798 .await
1799 .expect("Failed to build track");
1800 let inner = track.inner.lock().unwrap();
1801 assert_eq!(inner.stats.packet_count.load(Ordering::Relaxed), 0);
1803 assert_eq!(inner.stats.octet_count.load(Ordering::Relaxed), 0);
1804 assert_eq!(inner.stats.received_packets.load(Ordering::Relaxed), 0);
1805 assert_eq!(inner.stats.lost_packets.load(Ordering::Relaxed), 0);
1806 assert_eq!(inner.stats.highest_seq_num.load(Ordering::Relaxed), 0);
1807 assert_eq!(inner.stats.jitter.load(Ordering::Relaxed), 0);
1808 assert_eq!(inner.stats.last_sr_timestamp.load(Ordering::Relaxed), 0);
1809 assert_eq!(inner.stats.last_sr_ntp.load(Ordering::Relaxed), 0);
1810 assert_eq!(inner.stats.base_seq.load(Ordering::Relaxed), 0);
1811 assert_eq!(inner.stats.last_receive_seq.load(Ordering::Relaxed), 0);
1812 assert_eq!(inner.stats.last_resync_ts.load(Ordering::Relaxed), 0);
1813 }
1814
1815 #[test]
1816 fn test_sequence_number_gap_calculation() {
1817 let stats = RtpTrackStats::new();
1818
1819 stats.update_receive_stats(1000, 160); stats.update_receive_stats(1002, 160); stats.update_receive_stats(1003, 160); stats.update_receive_stats(1005, 160); assert_eq!(stats.received_packets.load(Ordering::Relaxed), 4);
1826 assert_eq!(stats.highest_seq_num.load(Ordering::Relaxed), 1005);
1827 assert!(stats.lost_packets.load(Ordering::Relaxed) > 0);
1829 }
1830
1831 #[test]
1832 fn test_jitter_calculation() {
1833 let stats = RtpTrackStats::new();
1834
1835 stats.update_receive_stats(1000, 160);
1837 let _initial_jitter = stats.jitter.load(Ordering::Relaxed);
1838
1839 stats.update_receive_stats(1001, 160);
1840 let updated_jitter = stats.jitter.load(Ordering::Relaxed);
1841
1842 assert!(updated_jitter < 1000); }
1846
1847 #[test]
1848 fn test_builder_with_custom_ssrc() {
1849 let custom_ssrc = 0x12345678u32;
1850 let builder =
1851 RtpTrackBuilder::new("test".to_string(), TrackConfig::default()).with_ssrc(custom_ssrc);
1852
1853 assert_eq!(builder.ssrc, custom_ssrc);
1855 assert_eq!(builder.ssrc_cname, format!("rustpbx-{}", custom_ssrc));
1856 }
1857
1858 #[test]
1859 fn test_builder_configuration() {
1860 let builder = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1861 .with_rtp_start_port(10000)
1862 .with_rtp_end_port(20000)
1863 .with_rtp_alloc_count(100)
1864 .with_rtcp_mux(false)
1865 .with_session_name("custom_session".to_string());
1866
1867 assert_eq!(builder.rtp_start_port, 10000);
1868 assert_eq!(builder.rtp_end_port, 20000);
1869 assert_eq!(builder.rtp_alloc_count, 100);
1870 assert!(!builder.rtcp_mux);
1871 assert_eq!(builder.ssrc_cname, "custom_session");
1872 }
1873
1874 #[tokio::test]
1875 async fn test_ice_connectivity_check_enabled_by_default() {
1876 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1877 .build()
1878 .await
1879 .expect("Failed to build track");
1880
1881 assert!(track.ice_connectivity_check); }
1883
1884 #[tokio::test]
1885 async fn test_ice_connectivity_check_can_be_disabled() {
1886 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1887 .with_ice_connectivity_check(false)
1888 .build()
1889 .await
1890 .expect("Failed to build track");
1891
1892 assert!(!track.ice_connectivity_check);
1893 }
1894
1895 #[tokio::test]
1896 async fn test_maybe_update_remote_addr_private_peer() {
1897 let track = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1898 .build()
1899 .await
1900 .expect("Failed to build track");
1901 let inner = track.inner.clone();
1902
1903 let private_addr = SipAddr {
1904 addr: HostWithPort {
1905 host: "192.168.0.10".parse().expect("host"),
1906 port: Some(4000.into()),
1907 },
1908 r#type: Some(rsip::transport::Transport::Udp),
1909 };
1910
1911 let public_addr = SipAddr {
1912 addr: HostWithPort {
1913 host: "203.0.113.5".parse().expect("host"),
1914 port: Some(5004.into()),
1915 },
1916 r#type: Some(rsip::transport::Transport::Udp),
1917 };
1918
1919 {
1920 let mut guard = inner.lock().expect("lock");
1921 guard.remote_addr = Some(private_addr.clone());
1922 guard.remote_rtcp_addr = Some(private_addr.clone());
1923 guard.rtcp_mux = true;
1924 }
1925
1926 let updated = RtpTrack::maybe_update_remote_addr(
1927 &inner,
1928 &public_addr,
1929 false,
1930 &track.track_id,
1931 "test",
1932 );
1933
1934 assert!(updated);
1935 let guard = inner.lock().expect("lock");
1936 assert_eq!(
1937 guard
1938 .remote_addr
1939 .as_ref()
1940 .expect("remote")
1941 .addr
1942 .host
1943 .to_string(),
1944 "203.0.113.5"
1945 );
1946 assert_eq!(
1947 guard
1948 .remote_rtcp_addr
1949 .as_ref()
1950 .expect("rtcp")
1951 .addr
1952 .host
1953 .to_string(),
1954 "203.0.113.5"
1955 );
1956 }
1957
1958 #[test]
1959 fn test_stun_packet_structure() {
1960 assert_eq!(STUN_BINDING_REQUEST, 0x0001);
1962 assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442);
1963 assert_eq!(STUN_TRANSACTION_ID_SIZE, 12);
1964
1965 let mut packet = vec![0u8; 20];
1967 packet[0..2].copy_from_slice(&STUN_BINDING_REQUEST.to_be_bytes());
1968 packet[4..8].copy_from_slice(&STUN_MAGIC_COOKIE.to_be_bytes());
1969
1970 let msg_type = u16::from_be_bytes([packet[0], packet[1]]);
1972 assert_eq!(msg_type, STUN_BINDING_REQUEST);
1973
1974 let magic = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]);
1976 assert_eq!(magic, STUN_MAGIC_COOKIE);
1977 }
1978
1979 #[tokio::test]
1980 async fn test_ice_connectivity_check_builder_method() {
1981 let builder_enabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1982 .with_ice_connectivity_check(true);
1983 assert!(builder_enabled.ice_connectivity_check);
1984
1985 let builder_disabled = RtpTrackBuilder::new("test".to_string(), TrackConfig::default())
1986 .with_ice_connectivity_check(false);
1987 assert!(!builder_disabled.ice_connectivity_check);
1988 }
1989
1990 #[test]
1991 fn test_ice_connectivity_terminology() {
1992 assert_eq!(STUN_BINDING_REQUEST, 0x0001); assert_eq!(STUN_MAGIC_COOKIE, 0x2112A442); }
2005}