1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::{
6 processor::ProcessorChain,
7 track::{Track, TrackConfig, TrackId, TrackPacketSender},
8 },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::StreamExt;
15use rustrtc::{
16 AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17 PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18 config::MediaCapabilities,
19 media::{
20 MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21 track::SampleStreamTrack,
22 },
23};
24use std::{
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34 pub mode: TransportMode,
35 pub ice_servers: Option<Vec<IceServer>>,
36 pub external_ip: Option<String>,
37 pub rtp_port_range: Option<(u16, u16)>,
38 pub preferred_codec: Option<CodecType>,
39 pub codecs: Vec<CodecType>,
40 pub payload_type: Option<u8>,
41}
42
43impl Default for RtcTrackConfig {
44 fn default() -> Self {
45 Self {
46 mode: TransportMode::WebRtc, ice_servers: None,
48 external_ip: None,
49 rtp_port_range: None,
50 preferred_codec: None,
51 codecs: Vec::new(),
52 payload_type: None,
53 }
54 }
55}
56
57pub struct RtcTrack {
58 track_id: TrackId,
59 track_config: TrackConfig,
60 rtc_config: RtcTrackConfig,
61 processor_chain: ProcessorChain,
62 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
63 cancel_token: CancellationToken,
64 local_source: Option<Arc<SampleStreamSource>>,
65 encoder: TrackCodec,
66 ssrc: u32,
67 payload_type: Option<u8>,
68 pub peer_connection: Option<Arc<PeerConnection>>,
69 next_rtp_timestamp: u32,
70 next_rtp_sequence_number: u16,
71 last_packet_time: Option<Instant>,
72 last_remote_sdp: Option<String>,
73 need_marker: bool,
74}
75
76impl RtcTrack {
77 pub fn new(
78 cancel_token: CancellationToken,
79 id: TrackId,
80 track_config: TrackConfig,
81 rtc_config: RtcTrackConfig,
82 ) -> Self {
83 let processor_chain = ProcessorChain::new(track_config.samplerate);
84 Self {
85 track_id: id,
86 track_config,
87 rtc_config,
88 processor_chain,
89 packet_sender: Arc::new(Mutex::new(None)),
90 cancel_token,
91 local_source: None,
92 encoder: TrackCodec::new(),
93 ssrc: 0,
94 payload_type: None,
95 peer_connection: None,
96 next_rtp_timestamp: 0,
97 next_rtp_sequence_number: 0,
98 last_packet_time: None,
99 last_remote_sdp: None,
100 need_marker: false,
101 }
102 }
103
104 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
105 self.ssrc = ssrc;
106 self
107 }
108
109 pub fn create_audio_track(
110 _codec: CodecType,
111 _stream_id: Option<String>,
112 ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
113 let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
114 (Arc::new(source), track)
115 }
116
117 pub async fn local_description(&self) -> Result<String> {
118 let pc = self
119 .peer_connection
120 .as_ref()
121 .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
122 let offer = pc.create_offer().await?;
123 pc.set_local_description(offer.clone())?;
124 Ok(offer.to_sdp_string())
125 }
126
127 pub async fn create(&mut self) -> Result<()> {
128 if self.peer_connection.is_some() {
129 return Ok(());
130 }
131
132 let mut config = RtcConfiguration::default();
133 if self.ssrc != 0 {
134 config.ssrc_start = self.ssrc;
135 }
136 config.transport_mode = self.rtc_config.mode.clone();
137
138 if let Some(ice_servers) = &self.rtc_config.ice_servers {
139 config.ice_servers = ice_servers.clone();
140 }
141
142 if let Some(external_ip) = &self.rtc_config.external_ip {
143 config.external_ip = Some(external_ip.clone());
144 }
145
146 if !self.rtc_config.codecs.is_empty() {
147 let mut caps = MediaCapabilities::default();
148 caps.audio.clear();
149
150 for codec in &self.rtc_config.codecs {
151 let cap = match codec {
152 CodecType::PCMU => AudioCapability::pcmu(),
153 CodecType::PCMA => AudioCapability::pcma(),
154 CodecType::G722 => AudioCapability::g722(),
155 CodecType::G729 => AudioCapability::g729(),
156 CodecType::TelephoneEvent => AudioCapability::telephone_event(),
157 #[cfg(feature = "opus")]
158 CodecType::Opus => AudioCapability::opus(),
159 };
160 caps.audio.push(cap);
161 }
162 config.media_capabilities = Some(caps);
163 }
164
165 let peer_connection = Arc::new(PeerConnection::new(config));
166 self.peer_connection = Some(peer_connection.clone());
167
168 let default_codec = CodecType::G722;
169 let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
170
171 let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
172 self.local_source = Some(source);
173
174 let payload_type = self
175 .rtc_config
176 .payload_type
177 .unwrap_or_else(|| codec.payload_type());
178
179 self.payload_type = Some(payload_type);
180
181 let params = RtpCodecParameters {
182 clock_rate: codec.clock_rate(),
183 channels: codec.channels() as u8,
184 payload_type,
185 ..Default::default()
186 };
187
188 peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
189
190 self.spawn_handlers(
192 peer_connection.clone(),
193 self.track_id.clone(),
194 self.processor_chain.clone(),
195 payload_type,
196 );
197
198 if self.rtc_config.mode == TransportMode::Rtp {
199 for transceiver in peer_connection.get_transceivers() {
200 if let Some(receiver) = transceiver.receiver() {
201 let track = receiver.track();
202 info!(track_id=%self.track_id, "RTP mode: starting receiver track handler");
203 Self::spawn_track_handler(
204 track,
205 self.packet_sender.clone(),
206 self.track_id.clone(),
207 self.cancel_token.clone(),
208 self.processor_chain.clone(),
209 self.get_payload_type(),
210 );
211 }
212 }
213 }
214
215 Ok(())
216 }
217
218 fn spawn_handlers(
219 &self,
220 pc: Arc<PeerConnection>,
221 track_id: TrackId,
222 processor_chain: ProcessorChain,
223 default_payload_type: u8,
224 ) {
225 let cancel_token = self.cancel_token.clone();
226 let packet_sender = self.packet_sender.clone();
227 let pc_clone = pc.clone();
228 let track_id_log = track_id.clone();
229 let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
230
231 crate::spawn(async move {
233 info!(track_id=%track_id_log, "RtcTrack event loop started");
234 let mut events = futures::stream::unfold(pc_clone.clone(), |pc| async move {
235 pc.recv().await.map(|ev| (ev, pc))
236 })
237 .take_until(cancel_token.cancelled())
238 .boxed();
239
240 let mut event_count = 0;
241 while let Some(event) = events.next().await {
242 event_count += 1;
243 let event_type = match &event {
244 PeerConnectionEvent::Track(_) => "Track",
245 PeerConnectionEvent::DataChannel(_) => "DataChannel",
246 };
247 debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
248
249 if let PeerConnectionEvent::Track(transceiver) = event {
250 if let Some(receiver) = transceiver.receiver() {
251 let track = receiver.track();
252 info!(track_id=%track_id_log, "New track received");
253
254 Self::spawn_track_handler(
255 track,
256 packet_sender.clone(),
257 track_id_log.clone(),
258 cancel_token.clone(),
259 processor_chain.clone(),
260 default_payload_type.clone(),
261 );
262 }
263 }
264 }
265 debug!(track_id=%track_id_log, "RtcTrack event loop ended, total events: {}", event_count);
266 });
267
268 if is_webrtc {
270 let pc_state = pc.clone();
271 let cancel_token_state = self.cancel_token.clone();
272 let mut state_rx = pc_state.subscribe_peer_state();
273 let track_id_state = track_id.clone();
274
275 crate::spawn(async move {
276 while state_rx.changed().await.is_ok() {
277 let s = *state_rx.borrow();
278 debug!(track_id=%track_id_state, "peer connection state changed: {:?}", s);
279 match s {
280 PeerConnectionState::Disconnected
281 | PeerConnectionState::Closed
282 | PeerConnectionState::Failed => {
283 info!(
284 track_id = %track_id_state,
285 "peer connection is {:?}, try to close", s
286 );
287 cancel_token_state.cancel();
288 pc_state.close();
289 break;
290 }
291 _ => {}
292 }
293 }
294 });
295 }
296 }
297
298 fn spawn_track_handler(
299 track: Arc<SampleStreamTrack>,
300 packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
301 track_id: TrackId,
302 cancel_token: CancellationToken,
303 processor_chain: ProcessorChain,
304 default_payload_type: u8,
305 ) {
306 let (tx, mut rx) =
307 tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
308
309 let track_id_proc = track_id.clone();
311 let packet_sender_proc = packet_sender_arc.clone();
312 let mut processor_chain_proc = processor_chain.clone();
313 let cancel_token_proc = cancel_token.clone();
314 crate::spawn(async move {
315 info!(track_id=%track_id_proc, "RtcTrack processing worker started");
316 while let Some(frame) = rx.recv().await {
317 if cancel_token_proc.is_cancelled() {
318 break;
319 }
320 Self::process_audio_frame(
321 frame,
322 &track_id_proc,
323 &packet_sender_proc,
324 &mut processor_chain_proc,
325 default_payload_type,
326 )
327 .await;
328 }
329 info!(track_id=%track_id_proc, "RtcTrack processing worker stopped");
330 });
331
332 crate::spawn(async move {
334 let mut samples =
335 futures::stream::unfold(
336 track,
337 |t| async move { t.recv().await.ok().map(|s| (s, t)) },
338 )
339 .take_until(cancel_token.cancelled())
340 .boxed();
341
342 while let Some(sample) = samples.next().await {
343 if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
344 if let Err(_) = tx.send(frame) {
345 break;
346 }
347 }
348 }
349 });
350 }
351
352 async fn process_audio_frame(
353 frame: rustrtc::media::frame::AudioFrame,
354 track_id: &TrackId,
355 packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
356 processor_chain: &mut ProcessorChain,
357 default_payload_type: u8,
358 ) {
359 let packet_sender = packet_sender.lock().await;
360 if let Some(sender) = packet_sender.as_ref() {
361 let payload_type = frame.payload_type.unwrap_or(default_payload_type);
362 let src_codec = match CodecType::try_from(payload_type) {
363 Ok(c) => c,
364 Err(_) => {
365 debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
366 return;
367 }
368 };
369
370 let mut af = AudioFrame {
371 track_id: track_id.clone(),
372 samples: crate::media::Samples::RTP {
373 payload_type,
374 payload: frame.data.to_vec(),
375 sequence_number: frame.sequence_number.unwrap_or(0),
376 },
377 timestamp: crate::media::get_timestamp(),
378 sample_rate: src_codec.samplerate(),
379 channels: src_codec.channels(),
380 };
381 if let Err(e) = processor_chain.process_frame(&mut af) {
382 debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
383 }
384
385 sender.send(af).ok();
386 }
387 }
388
389 pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
390 use crate::media::negotiate::parse_rtpmap;
391 let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
392
393 if let Some(media) = sdp
394 .media_sections
395 .iter()
396 .find(|m| m.kind == MediaKind::Audio)
397 {
398 for attr in &media.attributes {
399 if attr.key == "rtpmap" {
400 if let Some(value) = &attr.value {
401 if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
402 self.encoder.set_payload_type(pt, codec.clone());
403 self.processor_chain.codec.set_payload_type(pt, codec);
404 }
405 }
406 }
407 }
408
409 let mut negotiated = None;
411
412 if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
415 for preferred_codec in &self.rtc_config.codecs {
416 if *preferred_codec == CodecType::TelephoneEvent {
417 continue;
418 }
419 for fmt in &media.formats {
420 if let Ok(pt) = fmt.parse::<u8>() {
421 let codec = self
422 .encoder
423 .payload_type_map
424 .get(&pt)
425 .cloned()
426 .or_else(|| CodecType::try_from(pt).ok());
427 if let Some(c) = codec {
428 if c == *preferred_codec {
429 negotiated = Some((pt, c));
430 break;
431 }
432 }
433 }
434 }
435 if negotiated.is_some() {
436 break;
437 }
438 }
439 }
440
441 if negotiated.is_none() {
443 for fmt in &media.formats {
444 if let Ok(pt) = fmt.parse::<u8>() {
445 let codec = self
446 .encoder
447 .payload_type_map
448 .get(&pt)
449 .cloned()
450 .or_else(|| CodecType::try_from(pt).ok());
451
452 if let Some(codec) = codec {
453 if codec != CodecType::TelephoneEvent {
454 negotiated = Some((pt, codec));
455 break;
456 }
457 }
458 }
459 }
460 }
461
462 if let Some((pt, codec)) = negotiated {
463 info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
464 self.payload_type = Some(pt);
465 }
466 }
467 Ok(())
468 }
469
470 fn normalize_sdp(sdp: &str) -> String {
471 sdp.lines()
472 .map(|line| {
473 if line.starts_with("o=") {
474 let parts: Vec<&str> = line.split_whitespace().collect();
475 if parts.len() >= 3 {
476 return format!("o= {} {}", parts[1], parts[2]);
477 }
478 }
479 line.to_string()
480 })
481 .filter(|line| {
482 !line.starts_with("t=") && !line.starts_with("a=ssrc:") && !line.starts_with("a=msid:") && !line.trim().is_empty()
486 })
487 .collect::<Vec<_>>()
488 .join("\n")
489 }
490
491 async fn update_remote_description_internal(
492 &mut self,
493 answer: &String,
494 force_update: bool,
495 ) -> Result<()> {
496 if let Some(pc) = &self.peer_connection {
497 if !force_update {
498 if let Some(ref last_sdp) = self.last_remote_sdp {
499 if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
500 debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
501 return Ok(());
502 }
503 }
504 } else {
505 debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
506 }
507
508 let is_first_remote_sdp = self.last_remote_sdp.is_none();
509
510 let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
511 match pc.set_remote_description(sdp_obj.clone()).await {
512 Ok(_) => {
513 debug!(track_id=%self.track_id, "set_remote_description succeeded");
514 self.last_remote_sdp = Some(answer.clone());
515 }
516 Err(e) => {
517 if self.rtc_config.mode == TransportMode::Rtp {
518 info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
519
520 if let Some(current_local) = pc.local_description() {
521 let sdp = current_local.to_sdp_string();
522 for line in sdp.lines() {
523 if line.starts_with("a=ssrc:") {
524 info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
525 }
526 }
527 }
528
529 let offer = pc.create_offer().await?;
530
531 let sdp = offer.to_sdp_string();
532 for line in sdp.lines() {
533 if line.starts_with("a=ssrc:") {
534 info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
535 }
536 }
537
538 pc.set_local_description(offer)?;
539 pc.set_remote_description(sdp_obj).await?;
540 self.last_remote_sdp = Some(answer.clone());
541 info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
542 } else {
543 return Err(e.into());
544 }
545 }
546 }
547
548 if is_first_remote_sdp && self.rtc_config.mode != TransportMode::Rtp {
549 for transceiver in pc.get_transceivers() {
550 if let Some(receiver) = transceiver.receiver() {
551 let track = receiver.track();
552 info!(track_id=%self.track_id, "WebRTC mode: manually starting receiver track handler after first answer");
553 Self::spawn_track_handler(
554 track,
555 self.packet_sender.clone(),
556 self.track_id.clone(),
557 self.cancel_token.clone(),
558 self.processor_chain.clone(),
559 self.get_payload_type(),
560 );
561 }
562 }
563 }
564
565 self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
567 }
568 Ok(())
569 }
570}
571
572#[async_trait]
573impl Track for RtcTrack {
574 fn ssrc(&self) -> u32 {
575 self.ssrc
576 }
577 fn id(&self) -> &TrackId {
578 &self.track_id
579 }
580 fn config(&self) -> &TrackConfig {
581 &self.track_config
582 }
583 fn processor_chain(&mut self) -> &mut ProcessorChain {
584 &mut self.processor_chain
585 }
586
587 async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
588 info!(track_id=%self.track_id, "rtc handshake start");
589 self.create().await?;
590
591 let pc = self.peer_connection.clone().ok_or_else(|| {
592 anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
593 })?;
594
595 debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
596 for (i, t) in pc.get_transceivers().iter().enumerate() {
597 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
598 i, t.kind(), t.mid(), t.direction());
599 }
600
601 let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
602 pc.set_remote_description(sdp.clone()).await?;
603
604 debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
605 for (i, t) in pc.get_transceivers().iter().enumerate() {
606 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
607 i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
608 }
609
610 if self.rtc_config.mode != TransportMode::Rtp {
615 for transceiver in pc.get_transceivers() {
616 if let Some(receiver) = transceiver.receiver() {
617 let track = receiver.track();
618 info!(track_id=%self.track_id, "WebRTC handshake: manually starting receiver track handler for browser audio");
619 Self::spawn_track_handler(
620 track,
621 self.packet_sender.clone(),
622 self.track_id.clone(),
623 self.cancel_token.clone(),
624 self.processor_chain.clone(),
625 self.get_payload_type(),
626 );
627 }
628 }
629 }
630
631 self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
632
633 let mut answer = pc.create_answer().await?;
634 crate::media::negotiate::intersect_answer(&sdp, &mut answer);
635
636 pc.set_local_description(answer.clone())?;
637
638 if self.rtc_config.mode != TransportMode::Rtp {
639 pc.wait_for_gathering_complete().await;
640 }
641
642 let final_answer = pc
643 .local_description()
644 .ok_or(anyhow::anyhow!("No local description"))?;
645
646 Ok(final_answer.to_sdp_string())
647 }
648
649 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
650 self.update_remote_description_internal(answer, false).await
651 }
652
653 async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
654 self.update_remote_description_internal(answer, true).await
655 }
656
657 async fn start(
658 &mut self,
659 event_sender: EventSender,
660 packet_sender: TrackPacketSender,
661 ) -> Result<()> {
662 *self.packet_sender.lock().await = Some(packet_sender.clone());
663 let token_clone = self.cancel_token.clone();
664 let event_sender_clone = event_sender.clone();
665 let track_id = self.track_id.clone();
666 let ssrc = self.ssrc;
667
668 if self.rtc_config.mode != TransportMode::Rtp {
669 let start_time = crate::media::get_timestamp();
670 crate::spawn(async move {
671 token_clone.cancelled().await;
672 let _ = event_sender_clone.send(SessionEvent::TrackEnd {
673 track_id,
674 timestamp: crate::media::get_timestamp(),
675 duration: crate::media::get_timestamp() - start_time,
676 ssrc,
677 play_id: None,
678 });
679 });
680 }
681
682 Ok(())
683 }
684
685 async fn stop(&self) -> Result<()> {
686 self.cancel_token.cancel();
687 if let Some(pc) = &self.peer_connection {
688 pc.close();
689 }
690 Ok(())
691 }
692
693 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
694 let packet = packet.clone();
695
696 if let Some(source) = &self.local_source {
697 match &packet.samples {
698 crate::media::Samples::PCM { samples } => {
699 let payload_type = self.get_payload_type();
700 let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
701 let target_codec = CodecType::try_from(payload_type)?;
702 if !encoded.is_empty() {
703 let clock_rate = target_codec.clock_rate();
704
705 let now = Instant::now();
706 if let Some(last_time) = self.last_packet_time {
707 let elapsed = now.duration_since(last_time);
708 if elapsed.as_millis() > 50 {
709 let gap_increment =
710 (elapsed.as_millis() as u32 * clock_rate) / 1000;
711 self.next_rtp_timestamp += gap_increment;
712 self.need_marker = true;
713 }
714 }
715
716 self.last_packet_time = Some(now);
717
718 let timestamp_increment = (samples.len() as u64 * clock_rate as u64
719 / packet.sample_rate as u64
720 / self.track_config.channels as u64)
721 as u32;
722 let rtp_timestamp = self.next_rtp_timestamp;
723 self.next_rtp_timestamp += timestamp_increment;
724 let sequence_number = self.next_rtp_sequence_number;
725 self.next_rtp_sequence_number += 1;
726
727 let mut marker = false;
728 if self.need_marker {
729 marker = true;
730 self.need_marker = false;
731 }
732
733 let frame = RtcAudioFrame {
734 data: Bytes::from(encoded),
735 clock_rate,
736 payload_type: Some(payload_type),
737 sequence_number: Some(sequence_number),
738 rtp_timestamp,
739 marker,
740 ..Default::default()
741 };
742 source.send_audio(frame).await.ok();
743 }
744 }
745 crate::media::Samples::RTP {
746 payload,
747 payload_type,
748 sequence_number,
749 } => {
750 let clock_rate = match *payload_type {
751 0 | 8 | 9 | 18 => 8000,
752 111 => 48000,
753 _ => packet.sample_rate,
754 };
755
756 let now = Instant::now();
757 if let Some(last_time) = self.last_packet_time {
758 let elapsed = now.duration_since(last_time);
759 if elapsed.as_millis() > 50 {
760 let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
761 self.next_rtp_timestamp += gap_increment;
762 self.need_marker = true;
763 }
764 }
765 self.last_packet_time = Some(now);
766
767 let increment = match *payload_type {
768 0 | 8 | 18 => payload.len() as u32,
769 9 => payload.len() as u32,
770 111 => (clock_rate / 50) as u32,
771 _ => (clock_rate / 50) as u32,
772 };
773
774 let rtp_timestamp = self.next_rtp_timestamp;
775 self.next_rtp_timestamp += increment;
776 let sequence_number = *sequence_number;
777
778 let mut marker = false;
779 if self.need_marker {
780 marker = true;
781 self.need_marker = false;
782 }
783
784 let frame = RtcAudioFrame {
785 data: Bytes::from(payload.clone()),
786 clock_rate,
787 payload_type: Some(*payload_type),
788 sequence_number: Some(sequence_number),
789 rtp_timestamp,
790 marker,
791 ..Default::default()
792 };
793 source.send_audio(frame).await.ok();
794 }
795 _ => {}
796 }
797 }
798 Ok(())
799 }
800}
801
802impl RtcTrack {
803 fn get_payload_type(&self) -> u8 {
804 if let Some(pt) = self.payload_type {
805 return pt;
806 }
807
808 self.rtc_config.payload_type.unwrap_or_else(|| {
809 match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
810 CodecType::PCMU => 0,
811 CodecType::PCMA => 8,
812 CodecType::Opus => 111,
813 CodecType::G722 => 9,
814 _ => 111,
815 }
816 })
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use crate::media::track::TrackConfig;
824
825 #[test]
826 fn test_parse_sdp_payload_types() {
827 let track_id = "test-track".to_string();
828 let cancel_token = CancellationToken::new();
829 let mut track = RtcTrack::new(
830 cancel_token,
831 track_id,
832 TrackConfig::default(),
833 RtcTrackConfig::default(),
834 );
835
836 let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
838 track
839 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
840 .expect("parse offer");
841 assert_eq!(track.get_payload_type(), 8);
842
843 let mut rtc_config = RtcTrackConfig::default();
845 rtc_config.preferred_codec = Some(CodecType::PCMU);
846 let mut track2 = RtcTrack::new(
847 CancellationToken::new(),
848 "test-track-2".to_string(),
849 TrackConfig::default(),
850 rtc_config,
851 );
852
853 let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
854 track2
855 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
856 .expect("parse offer");
857 assert_eq!(track2.get_payload_type(), 0);
858
859 let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
861 track
862 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
863 .expect("parse offer");
864 assert_eq!(track.get_payload_type(), 111);
865 }
866}