1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::{
6 processor::ProcessorChain,
7 track::{Track, TrackConfig, TrackId, TrackPacketSender},
8 },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::StreamExt;
15use rustrtc::{
16 AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17 PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18 config::MediaCapabilities,
19 media::{
20 MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21 track::SampleStreamTrack,
22 },
23};
24use std::{
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34 pub mode: TransportMode,
35 pub ice_servers: Option<Vec<IceServer>>,
36 pub external_ip: Option<String>,
37 pub rtp_port_range: Option<(u16, u16)>,
38 pub preferred_codec: Option<CodecType>,
39 pub codecs: Vec<CodecType>,
40 pub payload_type: Option<u8>,
41 pub enable_latching: Option<bool>,
42}
43
44impl Default for RtcTrackConfig {
45 fn default() -> Self {
46 Self {
47 mode: TransportMode::WebRtc, ice_servers: None,
49 external_ip: None,
50 rtp_port_range: None,
51 preferred_codec: None,
52 codecs: Vec::new(),
53 payload_type: None,
54 enable_latching: None,
55 }
56 }
57}
58
59pub struct RtcTrack {
60 track_id: TrackId,
61 track_config: TrackConfig,
62 rtc_config: RtcTrackConfig,
63 processor_chain: ProcessorChain,
64 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
65 cancel_token: CancellationToken,
66 local_source: Option<Arc<SampleStreamSource>>,
67 encoder: TrackCodec,
68 ssrc: u32,
69 payload_type: Option<u8>,
70 pub peer_connection: Option<Arc<PeerConnection>>,
71 next_rtp_timestamp: u32,
72 next_rtp_sequence_number: u16,
73 last_packet_time: Option<Instant>,
74 last_remote_sdp: Option<String>,
75 need_marker: bool,
76}
77
78impl RtcTrack {
79 pub fn new(
80 cancel_token: CancellationToken,
81 id: TrackId,
82 track_config: TrackConfig,
83 rtc_config: RtcTrackConfig,
84 ) -> Self {
85 let processor_chain = ProcessorChain::new(track_config.samplerate);
86 Self {
87 track_id: id,
88 track_config,
89 rtc_config,
90 processor_chain,
91 packet_sender: Arc::new(Mutex::new(None)),
92 cancel_token,
93 local_source: None,
94 encoder: TrackCodec::new(),
95 ssrc: 0,
96 payload_type: None,
97 peer_connection: None,
98 next_rtp_timestamp: 0,
99 next_rtp_sequence_number: 0,
100 last_packet_time: None,
101 last_remote_sdp: None,
102 need_marker: false,
103 }
104 }
105
106 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
107 self.ssrc = ssrc;
108 self
109 }
110
111 pub fn create_audio_track(
112 _codec: CodecType,
113 _stream_id: Option<String>,
114 ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
115 let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
116 (Arc::new(source), track)
117 }
118
119 pub async fn local_description(&self) -> Result<String> {
120 let pc = self
121 .peer_connection
122 .as_ref()
123 .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
124 let offer = pc.create_offer().await?;
125 pc.set_local_description(offer.clone())?;
126 Ok(offer.to_sdp_string())
127 }
128
129 pub async fn create(&mut self) -> Result<()> {
130 if self.peer_connection.is_some() {
131 return Ok(());
132 }
133
134 let mut config = RtcConfiguration::default();
135 if self.ssrc != 0 {
136 config.ssrc_start = self.ssrc;
137 }
138 config.transport_mode = self.rtc_config.mode.clone();
139 config.enable_latching = self
140 .rtc_config
141 .enable_latching
142 .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
143
144 if let Some(ice_servers) = &self.rtc_config.ice_servers {
145 config.ice_servers = ice_servers.clone();
146 }
147
148 if let Some(external_ip) = &self.rtc_config.external_ip {
149 config.external_ip = Some(external_ip.clone());
150 }
151
152 if !self.rtc_config.codecs.is_empty() {
153 let mut caps = MediaCapabilities::default();
154 caps.audio.clear();
155
156 for codec in &self.rtc_config.codecs {
157 let cap = match codec {
158 CodecType::PCMU => AudioCapability::pcmu(),
159 CodecType::PCMA => AudioCapability::pcma(),
160 CodecType::G722 => AudioCapability::g722(),
161 CodecType::G729 => AudioCapability::g729(),
162 CodecType::TelephoneEvent => AudioCapability::telephone_event(),
163 #[cfg(feature = "opus")]
164 CodecType::Opus => AudioCapability::opus(),
165 };
166 caps.audio.push(cap);
167 }
168 config.media_capabilities = Some(caps);
169 }
170
171 let peer_connection = Arc::new(PeerConnection::new(config));
172 self.peer_connection = Some(peer_connection.clone());
173
174 let default_codec = CodecType::G722;
175 let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
176
177 let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
178 self.local_source = Some(source);
179
180 let payload_type = self
181 .rtc_config
182 .payload_type
183 .unwrap_or_else(|| codec.payload_type());
184
185 self.payload_type = Some(payload_type);
186
187 let params = RtpCodecParameters {
188 clock_rate: codec.clock_rate(),
189 channels: codec.channels() as u8,
190 payload_type,
191 ..Default::default()
192 };
193
194 peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
195
196 self.spawn_handlers(
198 peer_connection.clone(),
199 self.track_id.clone(),
200 self.processor_chain.clone(),
201 payload_type,
202 );
203
204 if self.rtc_config.mode == TransportMode::Rtp {
205 for transceiver in peer_connection.get_transceivers() {
206 if let Some(receiver) = transceiver.receiver() {
207 let track = receiver.track();
208 info!(track_id=%self.track_id, "RTP mode: starting receiver track handler");
209 Self::spawn_track_handler(
210 track,
211 self.packet_sender.clone(),
212 self.track_id.clone(),
213 self.cancel_token.clone(),
214 self.processor_chain.clone(),
215 self.get_payload_type(),
216 );
217 }
218 }
219 }
220
221 Ok(())
222 }
223
224 fn spawn_handlers(
225 &self,
226 pc: Arc<PeerConnection>,
227 track_id: TrackId,
228 processor_chain: ProcessorChain,
229 default_payload_type: u8,
230 ) {
231 let cancel_token = self.cancel_token.clone();
232 let packet_sender = self.packet_sender.clone();
233 let pc_clone = pc.clone();
234 let track_id_log = track_id.clone();
235 let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
236
237 crate::spawn(async move {
239 info!(track_id=%track_id_log, "RtcTrack event loop started");
240 let mut events = futures::stream::unfold(pc_clone.clone(), |pc| async move {
241 pc.recv().await.map(|ev| (ev, pc))
242 })
243 .take_until(cancel_token.cancelled())
244 .boxed();
245
246 let mut event_count = 0;
247 while let Some(event) = events.next().await {
248 event_count += 1;
249 let event_type = match &event {
250 PeerConnectionEvent::Track(_) => "Track",
251 PeerConnectionEvent::DataChannel(_) => "DataChannel",
252 };
253 debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
254
255 if let PeerConnectionEvent::Track(transceiver) = event {
256 if let Some(receiver) = transceiver.receiver() {
257 let track = receiver.track();
258 info!(track_id=%track_id_log, "New track received");
259
260 Self::spawn_track_handler(
261 track,
262 packet_sender.clone(),
263 track_id_log.clone(),
264 cancel_token.clone(),
265 processor_chain.clone(),
266 default_payload_type.clone(),
267 );
268 }
269 }
270 }
271 debug!(track_id=%track_id_log, "RtcTrack event loop ended, total events: {}", event_count);
272 });
273
274 if is_webrtc {
276 let pc_state = pc.clone();
277 let cancel_token_state = self.cancel_token.clone();
278 let mut state_rx = pc_state.subscribe_peer_state();
279 let track_id_state = track_id.clone();
280
281 crate::spawn(async move {
282 while state_rx.changed().await.is_ok() {
283 let s = *state_rx.borrow();
284 debug!(track_id=%track_id_state, "peer connection state changed: {:?}", s);
285 match s {
286 PeerConnectionState::Disconnected
287 | PeerConnectionState::Closed
288 | PeerConnectionState::Failed => {
289 info!(
290 track_id = %track_id_state,
291 "peer connection is {:?}, try to close", s
292 );
293 cancel_token_state.cancel();
294 pc_state.close();
295 break;
296 }
297 _ => {}
298 }
299 }
300 });
301 }
302 }
303
304 fn spawn_track_handler(
305 track: Arc<SampleStreamTrack>,
306 packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
307 track_id: TrackId,
308 cancel_token: CancellationToken,
309 processor_chain: ProcessorChain,
310 default_payload_type: u8,
311 ) {
312 let (tx, mut rx) =
313 tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
314
315 let track_id_proc = track_id.clone();
317 let packet_sender_proc = packet_sender_arc.clone();
318 let mut processor_chain_proc = processor_chain.clone();
319 let cancel_token_proc = cancel_token.clone();
320 crate::spawn(async move {
321 info!(track_id=%track_id_proc, "RtcTrack processing worker started");
322 while let Some(frame) = rx.recv().await {
323 if cancel_token_proc.is_cancelled() {
324 break;
325 }
326 Self::process_audio_frame(
327 frame,
328 &track_id_proc,
329 &packet_sender_proc,
330 &mut processor_chain_proc,
331 default_payload_type,
332 )
333 .await;
334 }
335 info!(track_id=%track_id_proc, "RtcTrack processing worker stopped");
336 });
337
338 crate::spawn(async move {
340 let mut samples =
341 futures::stream::unfold(
342 track,
343 |t| async move { t.recv().await.ok().map(|s| (s, t)) },
344 )
345 .take_until(cancel_token.cancelled())
346 .boxed();
347
348 while let Some(sample) = samples.next().await {
349 if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
350 if let Err(_) = tx.send(frame) {
351 break;
352 }
353 }
354 }
355 });
356 }
357
358 async fn process_audio_frame(
359 frame: rustrtc::media::frame::AudioFrame,
360 track_id: &TrackId,
361 packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
362 processor_chain: &mut ProcessorChain,
363 default_payload_type: u8,
364 ) {
365 let packet_sender = packet_sender.lock().await;
366 if let Some(sender) = packet_sender.as_ref() {
367 let payload_type = frame.payload_type.unwrap_or(default_payload_type);
368 let src_codec = match CodecType::try_from(payload_type) {
369 Ok(c) => c,
370 Err(_) => {
371 debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
372 return;
373 }
374 };
375
376 let mut af = AudioFrame {
377 track_id: track_id.clone(),
378 samples: crate::media::Samples::RTP {
379 payload_type,
380 payload: frame.data.to_vec(),
381 sequence_number: frame.sequence_number.unwrap_or(0),
382 },
383 timestamp: crate::media::get_timestamp(),
384 sample_rate: src_codec.samplerate(),
385 channels: src_codec.channels(),
386 };
387 if let Err(e) = processor_chain.process_frame(&mut af) {
388 debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
389 }
390
391 sender.send(af).ok();
392 }
393 }
394
395 pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
396 use crate::media::negotiate::parse_rtpmap;
397 let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
398
399 if let Some(media) = sdp
400 .media_sections
401 .iter()
402 .find(|m| m.kind == MediaKind::Audio)
403 {
404 for attr in &media.attributes {
405 if attr.key == "rtpmap" {
406 if let Some(value) = &attr.value {
407 if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
408 self.encoder.set_payload_type(pt, codec.clone());
409 self.processor_chain.codec.set_payload_type(pt, codec);
410 }
411 }
412 }
413 }
414
415 let mut negotiated = None;
417
418 if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
421 for preferred_codec in &self.rtc_config.codecs {
422 if *preferred_codec == CodecType::TelephoneEvent {
423 continue;
424 }
425 for fmt in &media.formats {
426 if let Ok(pt) = fmt.parse::<u8>() {
427 let codec = self
428 .encoder
429 .payload_type_map
430 .get(&pt)
431 .cloned()
432 .or_else(|| CodecType::try_from(pt).ok());
433 if let Some(c) = codec {
434 if c == *preferred_codec {
435 negotiated = Some((pt, c));
436 break;
437 }
438 }
439 }
440 }
441 if negotiated.is_some() {
442 break;
443 }
444 }
445 }
446
447 if negotiated.is_none() {
449 for fmt in &media.formats {
450 if let Ok(pt) = fmt.parse::<u8>() {
451 let codec = self
452 .encoder
453 .payload_type_map
454 .get(&pt)
455 .cloned()
456 .or_else(|| CodecType::try_from(pt).ok());
457
458 if let Some(codec) = codec {
459 if codec != CodecType::TelephoneEvent {
460 negotiated = Some((pt, codec));
461 break;
462 }
463 }
464 }
465 }
466 }
467
468 if let Some((pt, codec)) = negotiated {
469 info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
470 self.payload_type = Some(pt);
471 }
472 }
473 Ok(())
474 }
475
476 fn normalize_sdp(sdp: &str) -> String {
477 sdp.lines()
478 .map(|line| {
479 if line.starts_with("o=") {
480 let parts: Vec<&str> = line.split_whitespace().collect();
481 if parts.len() >= 3 {
482 return format!("o= {} {}", parts[1], parts[2]);
483 }
484 }
485 line.to_string()
486 })
487 .filter(|line| {
488 !line.starts_with("t=") && !line.starts_with("a=ssrc:") && !line.starts_with("a=msid:") && !line.trim().is_empty()
492 })
493 .collect::<Vec<_>>()
494 .join("\n")
495 }
496
497 async fn update_remote_description_internal(
498 &mut self,
499 answer: &String,
500 force_update: bool,
501 ) -> Result<()> {
502 if let Some(pc) = &self.peer_connection {
503 if !force_update {
504 if let Some(ref last_sdp) = self.last_remote_sdp {
505 if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
506 debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
507 return Ok(());
508 }
509 }
510 } else {
511 debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
512 }
513
514 let is_first_remote_sdp = self.last_remote_sdp.is_none();
515
516 let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
517 match pc.set_remote_description(sdp_obj.clone()).await {
518 Ok(_) => {
519 debug!(track_id=%self.track_id, "set_remote_description succeeded");
520 self.last_remote_sdp = Some(answer.clone());
521 }
522 Err(e) => {
523 if self.rtc_config.mode == TransportMode::Rtp {
524 info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
525
526 if let Some(current_local) = pc.local_description() {
527 let sdp = current_local.to_sdp_string();
528 for line in sdp.lines() {
529 if line.starts_with("a=ssrc:") {
530 info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
531 }
532 }
533 }
534
535 let offer = pc.create_offer().await?;
536
537 let sdp = offer.to_sdp_string();
538 for line in sdp.lines() {
539 if line.starts_with("a=ssrc:") {
540 info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
541 }
542 }
543
544 pc.set_local_description(offer)?;
545 pc.set_remote_description(sdp_obj).await?;
546 self.last_remote_sdp = Some(answer.clone());
547 info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
548 } else {
549 return Err(e.into());
550 }
551 }
552 }
553
554 if is_first_remote_sdp && self.rtc_config.mode != TransportMode::Rtp {
555 for transceiver in pc.get_transceivers() {
556 if let Some(receiver) = transceiver.receiver() {
557 let track = receiver.track();
558 info!(track_id=%self.track_id, "WebRTC mode: manually starting receiver track handler after first answer");
559 Self::spawn_track_handler(
560 track,
561 self.packet_sender.clone(),
562 self.track_id.clone(),
563 self.cancel_token.clone(),
564 self.processor_chain.clone(),
565 self.get_payload_type(),
566 );
567 }
568 }
569 }
570
571 self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
573 }
574 Ok(())
575 }
576}
577
578#[async_trait]
579impl Track for RtcTrack {
580 fn ssrc(&self) -> u32 {
581 self.ssrc
582 }
583 fn id(&self) -> &TrackId {
584 &self.track_id
585 }
586 fn config(&self) -> &TrackConfig {
587 &self.track_config
588 }
589 fn processor_chain(&mut self) -> &mut ProcessorChain {
590 &mut self.processor_chain
591 }
592
593 async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
594 info!(track_id=%self.track_id, "rtc handshake start");
595 self.create().await?;
596
597 let pc = self.peer_connection.clone().ok_or_else(|| {
598 anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
599 })?;
600
601 debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
602 for (i, t) in pc.get_transceivers().iter().enumerate() {
603 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
604 i, t.kind(), t.mid(), t.direction());
605 }
606
607 let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
608 pc.set_remote_description(sdp.clone()).await?;
609
610 debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
611 for (i, t) in pc.get_transceivers().iter().enumerate() {
612 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
613 i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
614 }
615
616 if self.rtc_config.mode != TransportMode::Rtp {
621 for transceiver in pc.get_transceivers() {
622 if let Some(receiver) = transceiver.receiver() {
623 let track = receiver.track();
624 info!(track_id=%self.track_id, "WebRTC handshake: manually starting receiver track handler for browser audio");
625 Self::spawn_track_handler(
626 track,
627 self.packet_sender.clone(),
628 self.track_id.clone(),
629 self.cancel_token.clone(),
630 self.processor_chain.clone(),
631 self.get_payload_type(),
632 );
633 }
634 }
635 }
636
637 self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
638
639 let mut answer = pc.create_answer().await?;
640 crate::media::negotiate::intersect_answer(&sdp, &mut answer);
641
642 pc.set_local_description(answer.clone())?;
643
644 if self.rtc_config.mode != TransportMode::Rtp {
645 pc.wait_for_gathering_complete().await;
646 }
647
648 let final_answer = pc
649 .local_description()
650 .ok_or(anyhow::anyhow!("No local description"))?;
651
652 Ok(final_answer.to_sdp_string())
653 }
654
655 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
656 self.update_remote_description_internal(answer, false).await
657 }
658
659 async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
660 self.update_remote_description_internal(answer, true).await
661 }
662
663 async fn start(
664 &mut self,
665 event_sender: EventSender,
666 packet_sender: TrackPacketSender,
667 ) -> Result<()> {
668 *self.packet_sender.lock().await = Some(packet_sender.clone());
669 let token_clone = self.cancel_token.clone();
670 let event_sender_clone = event_sender.clone();
671 let track_id = self.track_id.clone();
672 let ssrc = self.ssrc;
673
674 if self.rtc_config.mode != TransportMode::Rtp {
675 let start_time = crate::media::get_timestamp();
676 crate::spawn(async move {
677 token_clone.cancelled().await;
678 let _ = event_sender_clone.send(SessionEvent::TrackEnd {
679 track_id,
680 timestamp: crate::media::get_timestamp(),
681 duration: crate::media::get_timestamp() - start_time,
682 ssrc,
683 play_id: None,
684 });
685 });
686 }
687
688 Ok(())
689 }
690
691 async fn stop(&self) -> Result<()> {
692 self.cancel_token.cancel();
693 if let Some(pc) = &self.peer_connection {
694 pc.close();
695 }
696 Ok(())
697 }
698
699 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
700 let packet = packet.clone();
701
702 if let Some(source) = &self.local_source {
703 match &packet.samples {
704 crate::media::Samples::PCM { samples } => {
705 let payload_type = self.get_payload_type();
706 let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
707 let target_codec = CodecType::try_from(payload_type)?;
708 if !encoded.is_empty() {
709 let clock_rate = target_codec.clock_rate();
710
711 let now = Instant::now();
712 if let Some(last_time) = self.last_packet_time {
713 let elapsed = now.duration_since(last_time);
714 if elapsed.as_millis() > 50 {
715 let gap_increment =
716 (elapsed.as_millis() as u32 * clock_rate) / 1000;
717 self.next_rtp_timestamp += gap_increment;
718 self.need_marker = true;
719 }
720 }
721
722 self.last_packet_time = Some(now);
723
724 let timestamp_increment = (samples.len() as u64 * clock_rate as u64
725 / packet.sample_rate as u64
726 / self.track_config.channels as u64)
727 as u32;
728 let rtp_timestamp = self.next_rtp_timestamp;
729 self.next_rtp_timestamp += timestamp_increment;
730 let sequence_number = self.next_rtp_sequence_number;
731 self.next_rtp_sequence_number += 1;
732
733 let mut marker = false;
734 if self.need_marker {
735 marker = true;
736 self.need_marker = false;
737 }
738
739 let frame = RtcAudioFrame {
740 data: Bytes::from(encoded),
741 clock_rate,
742 payload_type: Some(payload_type),
743 sequence_number: Some(sequence_number),
744 rtp_timestamp,
745 marker,
746 ..Default::default()
747 };
748 source.send_audio(frame).await.ok();
749 }
750 }
751 crate::media::Samples::RTP {
752 payload,
753 payload_type,
754 sequence_number,
755 } => {
756 let clock_rate = match *payload_type {
757 0 | 8 | 9 | 18 => 8000,
758 111 => 48000,
759 _ => packet.sample_rate,
760 };
761
762 let now = Instant::now();
763 if let Some(last_time) = self.last_packet_time {
764 let elapsed = now.duration_since(last_time);
765 if elapsed.as_millis() > 50 {
766 let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
767 self.next_rtp_timestamp += gap_increment;
768 self.need_marker = true;
769 }
770 }
771 self.last_packet_time = Some(now);
772
773 let increment = match *payload_type {
774 0 | 8 | 18 => payload.len() as u32,
775 9 => payload.len() as u32,
776 111 => (clock_rate / 50) as u32,
777 _ => (clock_rate / 50) as u32,
778 };
779
780 let rtp_timestamp = self.next_rtp_timestamp;
781 self.next_rtp_timestamp += increment;
782 let sequence_number = *sequence_number;
783
784 let mut marker = false;
785 if self.need_marker {
786 marker = true;
787 self.need_marker = false;
788 }
789
790 let frame = RtcAudioFrame {
791 data: Bytes::from(payload.clone()),
792 clock_rate,
793 payload_type: Some(*payload_type),
794 sequence_number: Some(sequence_number),
795 rtp_timestamp,
796 marker,
797 ..Default::default()
798 };
799 source.send_audio(frame).await.ok();
800 }
801 _ => {}
802 }
803 }
804 Ok(())
805 }
806}
807
808impl RtcTrack {
809 fn get_payload_type(&self) -> u8 {
810 if let Some(pt) = self.payload_type {
811 return pt;
812 }
813
814 self.rtc_config.payload_type.unwrap_or_else(|| {
815 match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
816 CodecType::PCMU => 0,
817 CodecType::PCMA => 8,
818 CodecType::Opus => 111,
819 CodecType::G722 => 9,
820 _ => 111,
821 }
822 })
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use super::*;
829 use crate::media::track::TrackConfig;
830
831 #[test]
832 fn test_parse_sdp_payload_types() {
833 let track_id = "test-track".to_string();
834 let cancel_token = CancellationToken::new();
835 let mut track = RtcTrack::new(
836 cancel_token,
837 track_id,
838 TrackConfig::default(),
839 RtcTrackConfig::default(),
840 );
841
842 let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
844 track
845 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
846 .expect("parse offer");
847 assert_eq!(track.get_payload_type(), 8);
848
849 let mut rtc_config = RtcTrackConfig::default();
851 rtc_config.preferred_codec = Some(CodecType::PCMU);
852 let mut track2 = RtcTrack::new(
853 CancellationToken::new(),
854 "test-track-2".to_string(),
855 TrackConfig::default(),
856 rtc_config,
857 );
858
859 let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
860 track2
861 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
862 .expect("parse offer");
863 assert_eq!(track2.get_payload_type(), 0);
864
865 let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
867 track
868 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
869 .expect("parse offer");
870 assert_eq!(track.get_payload_type(), 111);
871 }
872}