1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::{
6 processor::ProcessorChain,
7 track::{Track, TrackConfig, TrackId, TrackPacketSender},
8 },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16 AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17 PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18 config::MediaCapabilities,
19 media::{
20 MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21 track::SampleStreamTrack,
22 },
23};
24use std::{
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34 pub mode: TransportMode,
35 pub ice_servers: Option<Vec<IceServer>>,
36 pub external_ip: Option<String>,
37 pub rtp_port_range: Option<(u16, u16)>,
38 pub preferred_codec: Option<CodecType>,
39 pub codecs: Vec<CodecType>,
40 pub payload_type: Option<u8>,
41 pub enable_latching: Option<bool>,
42}
43
44impl Default for RtcTrackConfig {
45 fn default() -> Self {
46 Self {
47 mode: TransportMode::WebRtc, ice_servers: None,
49 external_ip: None,
50 rtp_port_range: None,
51 preferred_codec: None,
52 codecs: Vec::new(),
53 payload_type: None,
54 enable_latching: None,
55 }
56 }
57}
58
59pub struct RtcTrack {
60 track_id: TrackId,
61 track_config: TrackConfig,
62 rtc_config: RtcTrackConfig,
63 processor_chain: ProcessorChain,
64 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
65 cancel_token: CancellationToken,
66 local_source: Option<Arc<SampleStreamSource>>,
67 encoder: TrackCodec,
68 ssrc: u32,
69 payload_type: Option<u8>,
70 pub peer_connection: Option<Arc<PeerConnection>>,
71 next_rtp_timestamp: u32,
72 next_rtp_sequence_number: u16,
73 last_packet_time: Option<Instant>,
74 last_remote_sdp: Option<String>,
75 need_marker: bool,
76}
77
78impl RtcTrack {
79 pub fn new(
80 cancel_token: CancellationToken,
81 id: TrackId,
82 track_config: TrackConfig,
83 rtc_config: RtcTrackConfig,
84 ) -> Self {
85 let processor_chain = ProcessorChain::new(track_config.samplerate);
86 Self {
87 track_id: id,
88 track_config,
89 rtc_config,
90 processor_chain,
91 packet_sender: Arc::new(Mutex::new(None)),
92 cancel_token,
93 local_source: None,
94 encoder: TrackCodec::new(),
95 ssrc: 0,
96 payload_type: None,
97 peer_connection: None,
98 next_rtp_timestamp: 0,
99 next_rtp_sequence_number: 0,
100 last_packet_time: None,
101 last_remote_sdp: None,
102 need_marker: false,
103 }
104 }
105
106 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
107 self.ssrc = ssrc;
108 self
109 }
110
111 pub fn create_audio_track(
112 _codec: CodecType,
113 _stream_id: Option<String>,
114 ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
115 let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
116 (Arc::new(source), track)
117 }
118
119 pub async fn local_description(&self) -> Result<String> {
120 let pc = self
121 .peer_connection
122 .as_ref()
123 .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
124 let offer = pc.create_offer().await?;
125 pc.set_local_description(offer.clone())?;
126 Ok(offer.to_sdp_string())
127 }
128
129 pub async fn create(&mut self) -> Result<()> {
130 if self.peer_connection.is_some() {
131 return Ok(());
132 }
133
134 let mut config = RtcConfiguration::default();
135 if self.ssrc != 0 {
136 config.ssrc_start = self.ssrc;
137 }
138 config.transport_mode = self.rtc_config.mode.clone();
139
140 if let Some(ice_servers) = &self.rtc_config.ice_servers {
141 config.ice_servers = ice_servers.clone();
142 }
143
144 if let Some(external_ip) = &self.rtc_config.external_ip {
145 config.external_ip = Some(external_ip.clone());
146 }
147
148 config.enable_latching = self
149 .rtc_config
150 .enable_latching
151 .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
152
153 if !self.rtc_config.codecs.is_empty() {
154 let mut caps = MediaCapabilities::default();
155 caps.audio.clear();
156
157 for codec in &self.rtc_config.codecs {
158 let cap = match codec {
159 CodecType::PCMU => AudioCapability::pcmu(),
160 CodecType::PCMA => AudioCapability::pcma(),
161 CodecType::G722 => AudioCapability::g722(),
162 CodecType::G729 => AudioCapability::g729(),
163 CodecType::TelephoneEvent => AudioCapability::telephone_event(),
164 #[cfg(feature = "opus")]
165 CodecType::Opus => AudioCapability::opus(),
166 };
167 caps.audio.push(cap);
168 }
169 config.media_capabilities = Some(caps);
170 }
171
172 let peer_connection = Arc::new(PeerConnection::new(config));
173 self.peer_connection = Some(peer_connection.clone());
174
175 let default_codec = CodecType::G722;
176 let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
177
178 let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
179 self.local_source = Some(source);
180
181 let payload_type = self
182 .rtc_config
183 .payload_type
184 .unwrap_or_else(|| codec.payload_type());
185
186 self.payload_type = Some(payload_type);
187
188 let params = RtpCodecParameters {
189 clock_rate: codec.clock_rate(),
190 channels: codec.channels() as u8,
191 payload_type,
192 ..Default::default()
193 };
194
195 peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
196
197 self.spawn_handlers(
199 peer_connection.clone(),
200 self.track_id.clone(),
201 self.processor_chain.clone(),
202 payload_type,
203 );
204
205 Ok(())
206 }
207
208 fn spawn_handlers(
209 &self,
210 pc: Arc<PeerConnection>,
211 track_id: TrackId,
212 processor_chain: ProcessorChain,
213 default_payload_type: u8,
214 ) {
215 let cancel_token = self.cancel_token.clone();
216 let packet_sender = self.packet_sender.clone();
217 let pc_event = pc.clone();
218 let pc_stats = pc.clone();
219 let pc_state = pc.clone();
220 let track_id_log = track_id.clone();
221 let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
222
223 crate::spawn(async move {
224 info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
225
226 let mut events = futures::stream::unfold(pc_event, |pc| async move {
227 pc.recv().await.map(|ev| (ev, pc))
228 })
229 .boxed();
230
231 let mut state_rx = if is_webrtc {
232 Some(pc_state.subscribe_peer_state())
233 } else {
234 None
235 };
236
237 let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
238 let mut event_count = 0;
239 let mut workers = FuturesUnordered::new();
240
241 loop {
242 tokio::select! {
243 _ = cancel_token.cancelled() => {
244 debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
245 break;
246 }
247
248 Some(event) = events.next() => {
249 event_count += 1;
250 let event_type = match &event {
251 PeerConnectionEvent::Track(_) => "Track",
252 PeerConnectionEvent::DataChannel(_) => "DataChannel",
253 };
254 debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
255
256 if let PeerConnectionEvent::Track(transceiver) = event {
257 if let Some(receiver) = transceiver.receiver() {
258 let track = receiver.track();
259 info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
260
261 let (f1, f2) = Self::create_track_workers(
262 track,
263 packet_sender.clone(),
264 track_id_log.clone(),
265 processor_chain.clone(),
266 default_payload_type,
267 );
268 workers.push(f1);
269 workers.push(f2);
270 }
271 }
272 }
273
274 _ = workers.next(), if !workers.is_empty() => {}
275
276 _ = stats_interval.tick() => {
277 match pc_stats.get_stats().await {
278 Ok(stats) => {
279 info!(track_id=%track_id_log, %stats, "RTCP Stats");
280 }
281 Err(e) => {
282 debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
283 }
284 }
285 }
286
287 res = async {
289 if let Some(rx) = state_rx.as_mut() {
290 rx.changed().await
291 } else {
292 std::future::pending().await
293 }
294 } => {
295 if res.is_ok() {
296 if let Some(rx) = state_rx.as_ref() {
297 let s = *rx.borrow();
298 debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
299 match s {
300 PeerConnectionState::Disconnected
301 | PeerConnectionState::Closed
302 | PeerConnectionState::Failed => {
303 info!(
304 track_id = %track_id_log,
305 "peer connection is {:?}, try to close", s
306 );
307 cancel_token.cancel();
308 pc_state.close();
309 break;
310 }
311 _ => {}
312 }
313 }
314 }
315 }
316 }
317 }
318 debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
319 });
320 }
321
322 fn create_track_workers(
323 track: Arc<SampleStreamTrack>,
324 packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
325 track_id: TrackId,
326 processor_chain: ProcessorChain,
327 default_payload_type: u8,
328 ) -> (
329 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
330 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
331 ) {
332 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
333
334 let track_id_proc = track_id.clone();
336 let packet_sender_proc = packet_sender_arc.clone();
337 let processor_chain_proc = processor_chain.clone();
338 let proc_fut = Self::run_processing_worker(
339 rx,
340 track_id_proc,
341 packet_sender_proc,
342 processor_chain_proc,
343 default_payload_type,
344 );
345
346 let track_id_recv = track_id.clone();
348 let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
349
350 (proc_fut.boxed(), recv_fut.boxed())
351 }
352
353 async fn run_processing_worker(
354 mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
355 track_id: TrackId,
356 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
357 mut processor_chain: ProcessorChain,
358 default_payload_type: u8,
359 ) {
360 info!(track_id=%track_id, "RtcTrack processing worker started");
361 while let Some(frame) = rx.recv().await {
362 let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
363 frame,
364 &track_id,
365 &packet_sender,
366 &mut processor_chain,
367 default_payload_type,
368 ))
369 .catch_unwind()
370 .await;
371
372 if let Err(cause) = res {
373 let msg = if let Some(s) = cause.downcast_ref::<&str>() {
374 *s
375 } else if let Some(s) = cause.downcast_ref::<String>() {
376 &s[..]
377 } else {
378 "Unknown panic"
379 };
380 tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
381 break;
382 }
383 }
384 info!(track_id=%track_id, "RtcTrack processing worker stopped");
385 }
386
387 async fn run_receiving_worker(
388 track: Arc<SampleStreamTrack>,
389 tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
390 track_id: TrackId,
391 ) {
392 let mut samples =
393 futures::stream::unfold(
394 track,
395 |t| async move { t.recv().await.ok().map(|s| (s, t)) },
396 )
397 .boxed();
398
399 while let Some(sample) = samples.next().await {
400 if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
401 if let Err(_) = tx.send(frame) {
402 break;
403 }
404 } else {
405 debug!(track_id=%track_id, "Received non-audio sample");
406 }
407 }
408 info!(track_id=%track_id, "RtcTrack receiving worker stopped");
409 }
410
411 async fn process_audio_frame(
412 frame: rustrtc::media::frame::AudioFrame,
413 track_id: &TrackId,
414 packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
415 processor_chain: &mut ProcessorChain,
416 default_payload_type: u8,
417 ) {
418 let packet_sender = packet_sender.lock().await;
419 if let Some(sender) = packet_sender.as_ref() {
420 let payload_type = frame.payload_type.unwrap_or(default_payload_type);
421 let src_codec = match CodecType::try_from(payload_type) {
422 Ok(c) => c,
423 Err(_) => {
424 debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
425 return;
426 }
427 };
428
429 let mut af = AudioFrame {
430 track_id: track_id.clone(),
431 samples: crate::media::Samples::RTP {
432 payload_type,
433 payload: frame.data.to_vec(),
434 sequence_number: frame.sequence_number.unwrap_or(0),
435 },
436 timestamp: crate::media::get_timestamp(),
437 sample_rate: src_codec.samplerate(),
438 channels: src_codec.channels(),
439 };
440 if let Err(e) = processor_chain.process_frame(&mut af) {
441 debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
442 }
443
444 sender.send(af).ok();
445 }
446 }
447
448 pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
449 use crate::media::negotiate::parse_rtpmap;
450 let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
451
452 if let Some(media) = sdp
453 .media_sections
454 .iter()
455 .find(|m| m.kind == MediaKind::Audio)
456 {
457 for attr in &media.attributes {
458 if attr.key == "rtpmap" {
459 if let Some(value) = &attr.value {
460 if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
461 self.encoder.set_payload_type(pt, codec.clone());
462 self.processor_chain.codec.set_payload_type(pt, codec);
463 }
464 }
465 }
466 }
467
468 let mut negotiated = None;
470
471 if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
474 for preferred_codec in &self.rtc_config.codecs {
475 if *preferred_codec == CodecType::TelephoneEvent {
476 continue;
477 }
478 for fmt in &media.formats {
479 if let Ok(pt) = fmt.parse::<u8>() {
480 let codec = self
481 .encoder
482 .payload_type_map
483 .get(&pt)
484 .cloned()
485 .or_else(|| CodecType::try_from(pt).ok());
486 if let Some(c) = codec {
487 if c == *preferred_codec {
488 negotiated = Some((pt, c));
489 break;
490 }
491 }
492 }
493 }
494 if negotiated.is_some() {
495 break;
496 }
497 }
498 }
499
500 if negotiated.is_none() {
502 for fmt in &media.formats {
503 if let Ok(pt) = fmt.parse::<u8>() {
504 let codec = self
505 .encoder
506 .payload_type_map
507 .get(&pt)
508 .cloned()
509 .or_else(|| CodecType::try_from(pt).ok());
510
511 if let Some(codec) = codec {
512 if codec != CodecType::TelephoneEvent {
513 negotiated = Some((pt, codec));
514 break;
515 }
516 }
517 }
518 }
519 }
520
521 if let Some((pt, codec)) = negotiated {
522 info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
523 self.payload_type = Some(pt);
524 }
525 }
526 Ok(())
527 }
528
529 fn normalize_sdp(sdp: &str) -> String {
530 sdp.lines()
531 .map(|line| {
532 if line.starts_with("o=") {
533 let parts: Vec<&str> = line.split_whitespace().collect();
534 if parts.len() >= 3 {
535 return format!("o= {} {}", parts[1], parts[2]);
536 }
537 }
538 line.to_string()
539 })
540 .filter(|line| {
541 !line.starts_with("t=") && !line.starts_with("a=ssrc:") && !line.starts_with("a=msid:") && !line.trim().is_empty()
545 })
546 .collect::<Vec<_>>()
547 .join("\n")
548 }
549
550 async fn update_remote_description_internal(
551 &mut self,
552 answer: &String,
553 force_update: bool,
554 ) -> Result<()> {
555 info!(
556 track_id=%self.track_id,
557 "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
558 force_update,
559 self.last_remote_sdp.is_some(),
560 self.rtc_config.mode
561 );
562
563 if let Some(pc) = &self.peer_connection {
564 if !force_update {
565 if let Some(ref last_sdp) = self.last_remote_sdp {
566 if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
567 debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
568 return Ok(());
569 }
570 }
571 } else {
572 debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
573 }
574
575 let _is_first_remote_sdp = self.last_remote_sdp.is_none();
576
577 let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
578 match pc.set_remote_description(sdp_obj.clone()).await {
579 Ok(_) => {
580 debug!(track_id=%self.track_id, "set_remote_description succeeded");
581 self.last_remote_sdp = Some(answer.clone());
582 }
583 Err(e) => {
584 if self.rtc_config.mode == TransportMode::Rtp {
585 info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
586
587 if let Some(current_local) = pc.local_description() {
588 let sdp = current_local.to_sdp_string();
589 for line in sdp.lines() {
590 if line.starts_with("a=ssrc:") {
591 info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
592 }
593 }
594 }
595
596 let offer = pc.create_offer().await?;
597
598 let sdp = offer.to_sdp_string();
599 for line in sdp.lines() {
600 if line.starts_with("a=ssrc:") {
601 info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
602 }
603 }
604
605 pc.set_local_description(offer)?;
606 pc.set_remote_description(sdp_obj).await?;
607 self.last_remote_sdp = Some(answer.clone());
608 info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
609 } else {
610 return Err(e.into());
611 }
612 }
613 }
614
615 self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
619 }
620 Ok(())
621 }
622}
623
624#[async_trait]
625impl Track for RtcTrack {
626 fn ssrc(&self) -> u32 {
627 self.ssrc
628 }
629 fn id(&self) -> &TrackId {
630 &self.track_id
631 }
632 fn config(&self) -> &TrackConfig {
633 &self.track_config
634 }
635 fn processor_chain(&mut self) -> &mut ProcessorChain {
636 &mut self.processor_chain
637 }
638
639 async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
640 info!(track_id=%self.track_id, "rtc handshake start");
641 self.create().await?;
642
643 let pc = self.peer_connection.clone().ok_or_else(|| {
644 anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
645 })?;
646
647 debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
648 for (i, t) in pc.get_transceivers().iter().enumerate() {
649 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
650 i, t.kind(), t.mid(), t.direction());
651 }
652
653 let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
654 pc.set_remote_description(sdp.clone()).await?;
655
656 debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
657 for (i, t) in pc.get_transceivers().iter().enumerate() {
658 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
659 i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
660 }
661
662 info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
665
666 self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
667
668 let mut answer = pc.create_answer().await?;
669 crate::media::negotiate::intersect_answer(&sdp, &mut answer);
670
671 pc.set_local_description(answer.clone())?;
672
673 if self.rtc_config.mode != TransportMode::Rtp {
674 pc.wait_for_gathering_complete().await;
675 }
676
677 let final_answer = pc
678 .local_description()
679 .ok_or(anyhow::anyhow!("No local description"))?;
680
681 Ok(final_answer.to_sdp_string())
682 }
683
684 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
685 self.update_remote_description_internal(answer, false).await
686 }
687
688 async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
689 self.update_remote_description_internal(answer, true).await
690 }
691
692 async fn start(
693 &mut self,
694 event_sender: EventSender,
695 packet_sender: TrackPacketSender,
696 ) -> Result<()> {
697 *self.packet_sender.lock().await = Some(packet_sender.clone());
698 let token_clone = self.cancel_token.clone();
699 let event_sender_clone = event_sender.clone();
700 let track_id = self.track_id.clone();
701 let ssrc = self.ssrc;
702
703 if self.rtc_config.mode != TransportMode::Rtp {
704 let start_time = crate::media::get_timestamp();
705 crate::spawn(async move {
706 token_clone.cancelled().await;
707 let _ = event_sender_clone.send(SessionEvent::TrackEnd {
708 track_id,
709 timestamp: crate::media::get_timestamp(),
710 duration: crate::media::get_timestamp() - start_time,
711 ssrc,
712 play_id: None,
713 });
714 });
715 }
716
717 Ok(())
718 }
719
720 async fn stop(&self) -> Result<()> {
721 self.cancel_token.cancel();
722 if let Some(pc) = &self.peer_connection {
723 pc.close();
724 }
725 Ok(())
726 }
727
728 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
729 let packet = packet.clone();
730
731 if let Some(source) = &self.local_source {
732 match &packet.samples {
733 crate::media::Samples::PCM { samples } => {
734 let payload_type = self.get_payload_type();
735 let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
736 let target_codec = CodecType::try_from(payload_type)?;
737 if !encoded.is_empty() {
738 let clock_rate = target_codec.clock_rate();
739
740 let now = Instant::now();
741 if let Some(last_time) = self.last_packet_time {
742 let elapsed = now.duration_since(last_time);
743 if elapsed.as_millis() > 50 {
744 let gap_increment =
745 (elapsed.as_millis() as u32 * clock_rate) / 1000;
746 self.next_rtp_timestamp += gap_increment;
747 self.need_marker = true;
748 }
749 }
750
751 self.last_packet_time = Some(now);
752
753 let timestamp_increment = (samples.len() as u64 * clock_rate as u64
754 / packet.sample_rate as u64
755 / self.track_config.channels as u64)
756 as u32;
757 let rtp_timestamp = self.next_rtp_timestamp;
758 self.next_rtp_timestamp += timestamp_increment;
759 let sequence_number = self.next_rtp_sequence_number;
760 self.next_rtp_sequence_number += 1;
761
762 let mut marker = false;
763 if self.need_marker {
764 marker = true;
765 self.need_marker = false;
766 }
767
768 let frame = RtcAudioFrame {
769 data: Bytes::from(encoded),
770 clock_rate,
771 payload_type: Some(payload_type),
772 sequence_number: Some(sequence_number),
773 rtp_timestamp,
774 marker,
775 ..Default::default()
776 };
777 source.send_audio(frame).await.ok();
778 }
779 }
780 crate::media::Samples::RTP {
781 payload,
782 payload_type,
783 sequence_number,
784 } => {
785 let clock_rate = match *payload_type {
786 0 | 8 | 9 | 18 => 8000,
787 111 => 48000,
788 _ => packet.sample_rate,
789 };
790
791 let now = Instant::now();
792 if let Some(last_time) = self.last_packet_time {
793 let elapsed = now.duration_since(last_time);
794 if elapsed.as_millis() > 50 {
795 let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
796 self.next_rtp_timestamp += gap_increment;
797 self.need_marker = true;
798 }
799 }
800 self.last_packet_time = Some(now);
801
802 let increment = match *payload_type {
803 0 | 8 | 18 => payload.len() as u32,
804 9 => payload.len() as u32,
805 111 => (clock_rate / 50) as u32,
806 _ => (clock_rate / 50) as u32,
807 };
808
809 let rtp_timestamp = self.next_rtp_timestamp;
810 self.next_rtp_timestamp += increment;
811 let sequence_number = *sequence_number;
812
813 let mut marker = false;
814 if self.need_marker {
815 marker = true;
816 self.need_marker = false;
817 }
818
819 let frame = RtcAudioFrame {
820 data: Bytes::from(payload.clone()),
821 clock_rate,
822 payload_type: Some(*payload_type),
823 sequence_number: Some(sequence_number),
824 rtp_timestamp,
825 marker,
826 ..Default::default()
827 };
828 source.send_audio(frame).await.ok();
829 }
830 _ => {}
831 }
832 }
833 Ok(())
834 }
835}
836
837impl RtcTrack {
838 fn get_payload_type(&self) -> u8 {
839 if let Some(pt) = self.payload_type {
840 return pt;
841 }
842
843 self.rtc_config.payload_type.unwrap_or_else(|| {
844 match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
845 CodecType::PCMU => 0,
846 CodecType::PCMA => 8,
847 CodecType::Opus => 111,
848 CodecType::G722 => 9,
849 _ => 111,
850 }
851 })
852 }
853}
854
855#[cfg(test)]
856mod tests {
857 use super::*;
858 use crate::media::track::TrackConfig;
859
860 #[test]
861 fn test_parse_sdp_payload_types() {
862 let track_id = "test-track".to_string();
863 let cancel_token = CancellationToken::new();
864 let mut track = RtcTrack::new(
865 cancel_token,
866 track_id,
867 TrackConfig::default(),
868 RtcTrackConfig::default(),
869 );
870
871 let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
873 track
874 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
875 .expect("parse offer");
876 assert_eq!(track.get_payload_type(), 8);
877
878 let mut rtc_config = RtcTrackConfig::default();
880 rtc_config.preferred_codec = Some(CodecType::PCMU);
881 let mut track2 = RtcTrack::new(
882 CancellationToken::new(),
883 "test-track-2".to_string(),
884 TrackConfig::default(),
885 rtc_config,
886 );
887
888 let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
889 track2
890 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
891 .expect("parse offer");
892 assert_eq!(track2.get_payload_type(), 0);
893
894 let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
896 track
897 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
898 .expect("parse offer");
899 assert_eq!(track.get_payload_type(), 111);
900 }
901
902 #[tokio::test]
903 async fn test_rtp_mode_handshake_spawns_handler() {
904 use rustrtc::TransportMode;
905
906 let track_id = "test-track-sip".to_string();
907 let cancel = CancellationToken::new();
908 let track_config = TrackConfig::default();
909 let mut rtc_config = RtcTrackConfig::default();
910 rtc_config.mode = TransportMode::Rtp;
911
912 let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
913
914 let offer = "v=0\r\n\
916o=- 123456 123456 IN IP4 172.0.0.1\r\n\
917s=-\r\n\
918c=IN IP4 172.0.0.1\r\n\
919t=0 0\r\n\
920m=audio 10000 RTP/AVP 0 101\r\n\
921a=rtpmap:0 PCMU/8000\r\n\
922a=rtpmap:101 telephone-event/8000\r\n\
923a=sendrecv\r\n";
924
925 let res = track.handshake(offer.to_string(), None).await;
927 assert!(res.is_ok());
928
929 if let Some(pc) = &track.peer_connection {
931 let transceivers = pc.get_transceivers();
932 assert_eq!(transceivers.len(), 1);
935 assert!(transceivers[0].receiver().is_some());
936 } else {
937 panic!("PeerConnection not initialized");
938 }
939 }
940}