1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::{
6 processor::ProcessorChain,
7 track::{Track, TrackConfig, TrackId, TrackPacketSender},
8 },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16 AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17 PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18 config::MediaCapabilities,
19 media::{
20 MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21 track::SampleStreamTrack,
22 },
23};
24use std::{
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34 pub mode: TransportMode,
35 pub ice_servers: Option<Vec<IceServer>>,
36 pub external_ip: Option<String>,
37 pub rtp_port_range: Option<(u16, u16)>,
38 pub bind_ip: Option<String>,
39 pub preferred_codec: Option<CodecType>,
40 pub codecs: Vec<CodecType>,
41 pub payload_type: Option<u8>,
42 pub enable_latching: Option<bool>,
43}
44
45impl Default for RtcTrackConfig {
46 fn default() -> Self {
47 Self {
48 mode: TransportMode::WebRtc, ice_servers: None,
50 external_ip: None,
51 rtp_port_range: None,
52 bind_ip: None,
53 preferred_codec: None,
54 codecs: Vec::new(),
55 payload_type: None,
56 enable_latching: None,
57 }
58 }
59}
60
61pub struct RtcTrack {
62 track_id: TrackId,
63 track_config: TrackConfig,
64 rtc_config: RtcTrackConfig,
65 processor_chain: ProcessorChain,
66 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
67 cancel_token: CancellationToken,
68 local_source: Option<Arc<SampleStreamSource>>,
69 encoder: TrackCodec,
70 ssrc: u32,
71 payload_type: Option<u8>,
72 pub peer_connection: Option<Arc<PeerConnection>>,
73 next_rtp_timestamp: u32,
74 next_rtp_sequence_number: u16,
75 last_packet_time: Option<Instant>,
76 last_remote_sdp: Option<String>,
77 need_marker: bool,
78}
79
80impl RtcTrack {
81 pub fn new(
82 cancel_token: CancellationToken,
83 id: TrackId,
84 track_config: TrackConfig,
85 rtc_config: RtcTrackConfig,
86 ) -> Self {
87 let processor_chain = ProcessorChain::new(track_config.samplerate);
88 Self {
89 track_id: id,
90 track_config,
91 rtc_config,
92 processor_chain,
93 packet_sender: Arc::new(Mutex::new(None)),
94 cancel_token,
95 local_source: None,
96 encoder: TrackCodec::new(),
97 ssrc: 0,
98 payload_type: None,
99 peer_connection: None,
100 next_rtp_timestamp: 0,
101 next_rtp_sequence_number: 0,
102 last_packet_time: None,
103 last_remote_sdp: None,
104 need_marker: false,
105 }
106 }
107
108 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
109 self.ssrc = ssrc;
110 self
111 }
112
113 pub fn create_audio_track(
114 _codec: CodecType,
115 _stream_id: Option<String>,
116 ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
117 let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
118 (Arc::new(source), track)
119 }
120
121 pub async fn local_description(&self) -> Result<String> {
122 let pc = self
123 .peer_connection
124 .as_ref()
125 .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
126 let offer = pc.create_offer().await?;
127 pc.set_local_description(offer.clone())?;
128 Ok(offer.to_sdp_string())
129 }
130
131 pub async fn create(&mut self) -> Result<()> {
132 if self.peer_connection.is_some() {
133 return Ok(());
134 }
135
136 let mut config = RtcConfiguration::default();
137 if self.ssrc != 0 {
138 config.ssrc_start = self.ssrc;
139 }
140 config.transport_mode = self.rtc_config.mode.clone();
141
142 if let Some(ice_servers) = &self.rtc_config.ice_servers {
143 config.ice_servers = ice_servers.clone();
144 }
145
146 if let Some(external_ip) = &self.rtc_config.external_ip {
147 config.external_ip = Some(external_ip.clone());
148 }
149 if let Some(bind_ip) = &self.rtc_config.bind_ip {
150 config.bind_ip = Some(bind_ip.clone());
151 }
152 if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
153 config.rtp_start_port = Some(rtp_start_port);
154 config.rtp_end_port = Some(rtp_end_port);
155 }
156
157 config.enable_latching = self
158 .rtc_config
159 .enable_latching
160 .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
161
162 if !self.rtc_config.codecs.is_empty() {
163 let mut caps = MediaCapabilities::default();
164 caps.audio.clear();
165
166 for codec in &self.rtc_config.codecs {
167 let cap = match codec {
168 CodecType::PCMU => AudioCapability::pcmu(),
169 CodecType::PCMA => AudioCapability::pcma(),
170 CodecType::G722 => AudioCapability::g722(),
171 CodecType::G729 => AudioCapability::g729(),
172 CodecType::TelephoneEvent => AudioCapability::telephone_event(),
173 #[cfg(feature = "opus")]
174 CodecType::Opus => AudioCapability::opus(),
175 };
176 caps.audio.push(cap);
177 }
178 config.media_capabilities = Some(caps);
179 }
180
181 let peer_connection = Arc::new(PeerConnection::new(config));
182 self.peer_connection = Some(peer_connection.clone());
183
184 let default_codec = CodecType::G722;
185 let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
186
187 let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
188 self.local_source = Some(source);
189
190 let payload_type = self
191 .rtc_config
192 .payload_type
193 .unwrap_or_else(|| codec.payload_type());
194
195 self.payload_type = Some(payload_type);
196
197 let params = RtpCodecParameters {
198 clock_rate: codec.clock_rate(),
199 channels: codec.channels() as u8,
200 payload_type,
201 ..Default::default()
202 };
203
204 peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
205
206 self.spawn_handlers(
208 peer_connection.clone(),
209 self.track_id.clone(),
210 self.processor_chain.clone(),
211 payload_type,
212 );
213
214 Ok(())
215 }
216
217 fn spawn_handlers(
218 &self,
219 pc: Arc<PeerConnection>,
220 track_id: TrackId,
221 processor_chain: ProcessorChain,
222 default_payload_type: u8,
223 ) {
224 let cancel_token = self.cancel_token.clone();
225 let packet_sender = self.packet_sender.clone();
226 let pc_event = pc.clone();
227 let pc_stats = pc.clone();
228 let pc_state = pc.clone();
229 let track_id_log = track_id.clone();
230 let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
231
232 crate::spawn(async move {
233 info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
234
235 let mut events = futures::stream::unfold(pc_event, |pc| async move {
236 pc.recv().await.map(|ev| (ev, pc))
237 })
238 .boxed();
239
240 let mut state_rx = if is_webrtc {
241 Some(pc_state.subscribe_peer_state())
242 } else {
243 None
244 };
245
246 let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
247 let mut event_count = 0;
248 let mut workers = FuturesUnordered::new();
249
250 loop {
251 tokio::select! {
252 _ = cancel_token.cancelled() => {
253 debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
254 break;
255 }
256
257 Some(event) = events.next() => {
258 event_count += 1;
259 let event_type = match &event {
260 PeerConnectionEvent::Track(_) => "Track",
261 PeerConnectionEvent::DataChannel(_) => "DataChannel",
262 };
263 debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
264
265 if let PeerConnectionEvent::Track(transceiver) = event {
266 if let Some(receiver) = transceiver.receiver() {
267 let track = receiver.track();
268 info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
269
270 let (f1, f2) = Self::create_track_workers(
271 track,
272 packet_sender.clone(),
273 track_id_log.clone(),
274 processor_chain.clone(),
275 default_payload_type,
276 );
277 workers.push(f1);
278 workers.push(f2);
279 }
280 }
281 }
282
283 _ = workers.next(), if !workers.is_empty() => {}
284
285 _ = stats_interval.tick() => {
286 match pc_stats.get_stats().await {
287 Ok(stats) => {
288 info!(track_id=%track_id_log, %stats, "RTCP Stats");
289 }
290 Err(e) => {
291 debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
292 }
293 }
294 }
295
296 res = async {
298 if let Some(rx) = state_rx.as_mut() {
299 rx.changed().await
300 } else {
301 std::future::pending().await
302 }
303 } => {
304 if res.is_ok() {
305 if let Some(rx) = state_rx.as_ref() {
306 let s = *rx.borrow();
307 debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
308 match s {
309 PeerConnectionState::Disconnected
310 | PeerConnectionState::Closed
311 | PeerConnectionState::Failed => {
312 info!(
313 track_id = %track_id_log,
314 "peer connection is {:?}, try to close", s
315 );
316 cancel_token.cancel();
317 pc_state.close();
318 break;
319 }
320 _ => {}
321 }
322 }
323 }
324 }
325 }
326 }
327 debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
328 });
329 }
330
331 fn create_track_workers(
332 track: Arc<SampleStreamTrack>,
333 packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
334 track_id: TrackId,
335 processor_chain: ProcessorChain,
336 default_payload_type: u8,
337 ) -> (
338 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
339 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
340 ) {
341 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
342
343 let track_id_proc = track_id.clone();
345 let packet_sender_proc = packet_sender_arc.clone();
346 let processor_chain_proc = processor_chain.clone();
347 let proc_fut = Self::run_processing_worker(
348 rx,
349 track_id_proc,
350 packet_sender_proc,
351 processor_chain_proc,
352 default_payload_type,
353 );
354
355 let track_id_recv = track_id.clone();
357 let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
358
359 (proc_fut.boxed(), recv_fut.boxed())
360 }
361
362 async fn run_processing_worker(
363 mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
364 track_id: TrackId,
365 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
366 mut processor_chain: ProcessorChain,
367 default_payload_type: u8,
368 ) {
369 info!(track_id=%track_id, "RtcTrack processing worker started");
370 while let Some(frame) = rx.recv().await {
371 let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
372 frame,
373 &track_id,
374 &packet_sender,
375 &mut processor_chain,
376 default_payload_type,
377 ))
378 .catch_unwind()
379 .await;
380
381 if let Err(cause) = res {
382 let msg = if let Some(s) = cause.downcast_ref::<&str>() {
383 *s
384 } else if let Some(s) = cause.downcast_ref::<String>() {
385 &s[..]
386 } else {
387 "Unknown panic"
388 };
389 tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
390 break;
391 }
392 }
393 info!(track_id=%track_id, "RtcTrack processing worker stopped");
394 }
395
396 async fn run_receiving_worker(
397 track: Arc<SampleStreamTrack>,
398 tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
399 track_id: TrackId,
400 ) {
401 let mut samples =
402 futures::stream::unfold(
403 track,
404 |t| async move { t.recv().await.ok().map(|s| (s, t)) },
405 )
406 .boxed();
407
408 while let Some(sample) = samples.next().await {
409 if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
410 if let Err(_) = tx.send(frame) {
411 break;
412 }
413 } else {
414 debug!(track_id=%track_id, "Received non-audio sample");
415 }
416 }
417 info!(track_id=%track_id, "RtcTrack receiving worker stopped");
418 }
419
420 async fn process_audio_frame(
421 frame: rustrtc::media::frame::AudioFrame,
422 track_id: &TrackId,
423 packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
424 processor_chain: &mut ProcessorChain,
425 default_payload_type: u8,
426 ) {
427 let packet_sender = packet_sender.lock().await;
428 if let Some(sender) = packet_sender.as_ref() {
429 let payload_type = frame.payload_type.unwrap_or(default_payload_type);
430 let src_codec = match CodecType::try_from(payload_type) {
431 Ok(c) => c,
432 Err(_) => {
433 debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
434 return;
435 }
436 };
437
438 let mut af = AudioFrame {
439 track_id: track_id.clone(),
440 samples: crate::media::Samples::RTP {
441 payload_type,
442 payload: frame.data.to_vec(),
443 sequence_number: frame.sequence_number.unwrap_or(0),
444 },
445 timestamp: crate::media::get_timestamp(),
446 sample_rate: src_codec.samplerate(),
447 channels: src_codec.channels(),
448 };
449 if let Err(e) = processor_chain.process_frame(&mut af) {
450 debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
451 }
452
453 sender.send(af).ok();
454 }
455 }
456
457 pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
458 use crate::media::negotiate::parse_rtpmap;
459 let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
460
461 if let Some(media) = sdp
462 .media_sections
463 .iter()
464 .find(|m| m.kind == MediaKind::Audio)
465 {
466 for attr in &media.attributes {
467 if attr.key == "rtpmap" {
468 if let Some(value) = &attr.value {
469 if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
470 self.encoder.set_payload_type(pt, codec.clone());
471 self.processor_chain.codec.set_payload_type(pt, codec);
472 }
473 }
474 }
475 }
476
477 let mut negotiated = None;
479
480 if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
483 for preferred_codec in &self.rtc_config.codecs {
484 if *preferred_codec == CodecType::TelephoneEvent {
485 continue;
486 }
487 for fmt in &media.formats {
488 if let Ok(pt) = fmt.parse::<u8>() {
489 let codec = self
490 .encoder
491 .payload_type_map
492 .get(&pt)
493 .cloned()
494 .or_else(|| CodecType::try_from(pt).ok());
495 if let Some(c) = codec {
496 if c == *preferred_codec {
497 negotiated = Some((pt, c));
498 break;
499 }
500 }
501 }
502 }
503 if negotiated.is_some() {
504 break;
505 }
506 }
507 }
508
509 if negotiated.is_none() {
511 for fmt in &media.formats {
512 if let Ok(pt) = fmt.parse::<u8>() {
513 let codec = self
514 .encoder
515 .payload_type_map
516 .get(&pt)
517 .cloned()
518 .or_else(|| CodecType::try_from(pt).ok());
519
520 if let Some(codec) = codec {
521 if codec != CodecType::TelephoneEvent {
522 negotiated = Some((pt, codec));
523 break;
524 }
525 }
526 }
527 }
528 }
529
530 if let Some((pt, codec)) = negotiated {
531 info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
532 self.payload_type = Some(pt);
533 }
534 }
535 Ok(())
536 }
537
538 fn normalize_sdp(sdp: &str) -> String {
539 sdp.lines()
540 .map(|line| {
541 if line.starts_with("o=") {
542 let parts: Vec<&str> = line.split_whitespace().collect();
543 if parts.len() >= 3 {
544 return format!("o= {} {}", parts[1], parts[2]);
545 }
546 }
547 line.to_string()
548 })
549 .filter(|line| {
550 !line.starts_with("t=") && !line.starts_with("a=ssrc:") && !line.starts_with("a=msid:") && !line.trim().is_empty()
554 })
555 .collect::<Vec<_>>()
556 .join("\n")
557 }
558
559 async fn update_remote_description_internal(
560 &mut self,
561 answer: &String,
562 force_update: bool,
563 ) -> Result<()> {
564 info!(
565 track_id=%self.track_id,
566 "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
567 force_update,
568 self.last_remote_sdp.is_some(),
569 self.rtc_config.mode
570 );
571
572 if let Some(pc) = &self.peer_connection {
573 if !force_update {
574 if let Some(ref last_sdp) = self.last_remote_sdp {
575 if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
576 debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
577 return Ok(());
578 }
579 }
580 } else {
581 debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
582 }
583
584 let _is_first_remote_sdp = self.last_remote_sdp.is_none();
585
586 let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
587 match pc.set_remote_description(sdp_obj.clone()).await {
588 Ok(_) => {
589 debug!(track_id=%self.track_id, "set_remote_description succeeded");
590 self.last_remote_sdp = Some(answer.clone());
591 }
592 Err(e) => {
593 if self.rtc_config.mode == TransportMode::Rtp {
594 info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
595
596 if let Some(current_local) = pc.local_description() {
597 let sdp = current_local.to_sdp_string();
598 for line in sdp.lines() {
599 if line.starts_with("a=ssrc:") {
600 info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
601 }
602 }
603 }
604
605 let offer = pc.create_offer().await?;
606
607 let sdp = offer.to_sdp_string();
608 for line in sdp.lines() {
609 if line.starts_with("a=ssrc:") {
610 info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
611 }
612 }
613
614 pc.set_local_description(offer)?;
615 pc.set_remote_description(sdp_obj).await?;
616 self.last_remote_sdp = Some(answer.clone());
617 info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
618 } else {
619 return Err(e.into());
620 }
621 }
622 }
623
624 self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
628 }
629 Ok(())
630 }
631}
632
633#[async_trait]
634impl Track for RtcTrack {
635 fn ssrc(&self) -> u32 {
636 self.ssrc
637 }
638 fn id(&self) -> &TrackId {
639 &self.track_id
640 }
641 fn config(&self) -> &TrackConfig {
642 &self.track_config
643 }
644 fn processor_chain(&mut self) -> &mut ProcessorChain {
645 &mut self.processor_chain
646 }
647
648 async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
649 info!(track_id=%self.track_id, "rtc handshake start");
650 self.create().await?;
651
652 let pc = self.peer_connection.clone().ok_or_else(|| {
653 anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
654 })?;
655
656 debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
657 for (i, t) in pc.get_transceivers().iter().enumerate() {
658 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
659 i, t.kind(), t.mid(), t.direction());
660 }
661
662 let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
663 pc.set_remote_description(sdp.clone()).await?;
664
665 debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
666 for (i, t) in pc.get_transceivers().iter().enumerate() {
667 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
668 i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
669 }
670
671 info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
674
675 self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
676
677 let mut answer = pc.create_answer().await?;
678 crate::media::negotiate::intersect_answer(&sdp, &mut answer);
679
680 pc.set_local_description(answer.clone())?;
681
682 if self.rtc_config.mode != TransportMode::Rtp {
683 pc.wait_for_gathering_complete().await;
684 }
685
686 let final_answer = pc
687 .local_description()
688 .ok_or(anyhow::anyhow!("No local description"))?;
689
690 Ok(final_answer.to_sdp_string())
691 }
692
693 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
694 self.update_remote_description_internal(answer, false).await
695 }
696
697 async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
698 self.update_remote_description_internal(answer, true).await
699 }
700
701 async fn start(
702 &mut self,
703 event_sender: EventSender,
704 packet_sender: TrackPacketSender,
705 ) -> Result<()> {
706 *self.packet_sender.lock().await = Some(packet_sender.clone());
707 let token_clone = self.cancel_token.clone();
708 let event_sender_clone = event_sender.clone();
709 let track_id = self.track_id.clone();
710 let ssrc = self.ssrc;
711
712 if self.rtc_config.mode != TransportMode::Rtp {
713 let start_time = crate::media::get_timestamp();
714 crate::spawn(async move {
715 token_clone.cancelled().await;
716 let _ = event_sender_clone.send(SessionEvent::TrackEnd {
717 track_id,
718 timestamp: crate::media::get_timestamp(),
719 duration: crate::media::get_timestamp() - start_time,
720 ssrc,
721 play_id: None,
722 });
723 });
724 }
725
726 Ok(())
727 }
728
729 async fn stop(&self) -> Result<()> {
730 self.cancel_token.cancel();
731 if let Some(pc) = &self.peer_connection {
732 pc.close();
733 }
734 Ok(())
735 }
736
737 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
738 let packet = packet.clone();
739
740 if let Some(source) = &self.local_source {
741 match &packet.samples {
742 crate::media::Samples::PCM { samples } => {
743 let payload_type = self.get_payload_type();
744 let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
745 let target_codec = CodecType::try_from(payload_type)?;
746 if !encoded.is_empty() {
747 let clock_rate = target_codec.clock_rate();
748
749 let now = Instant::now();
750 if let Some(last_time) = self.last_packet_time {
751 let elapsed = now.duration_since(last_time);
752 if elapsed.as_millis() > 50 {
753 let gap_increment =
754 (elapsed.as_millis() as u32 * clock_rate) / 1000;
755 self.next_rtp_timestamp += gap_increment;
756 self.need_marker = true;
757 }
758 }
759
760 self.last_packet_time = Some(now);
761
762 let timestamp_increment = (samples.len() as u64 * clock_rate as u64
763 / packet.sample_rate as u64
764 / self.track_config.channels as u64)
765 as u32;
766 let rtp_timestamp = self.next_rtp_timestamp;
767 self.next_rtp_timestamp += timestamp_increment;
768 let sequence_number = self.next_rtp_sequence_number;
769 self.next_rtp_sequence_number += 1;
770
771 let mut marker = false;
772 if self.need_marker {
773 marker = true;
774 self.need_marker = false;
775 }
776
777 let frame = RtcAudioFrame {
778 data: Bytes::from(encoded),
779 clock_rate,
780 payload_type: Some(payload_type),
781 sequence_number: Some(sequence_number),
782 rtp_timestamp,
783 marker,
784 ..Default::default()
785 };
786 source.send_audio(frame).await.ok();
787 }
788 }
789 crate::media::Samples::RTP {
790 payload,
791 payload_type,
792 sequence_number,
793 } => {
794 let clock_rate = match *payload_type {
795 0 | 8 | 9 | 18 => 8000,
796 111 => 48000,
797 _ => packet.sample_rate,
798 };
799
800 let now = Instant::now();
801 if let Some(last_time) = self.last_packet_time {
802 let elapsed = now.duration_since(last_time);
803 if elapsed.as_millis() > 50 {
804 let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
805 self.next_rtp_timestamp += gap_increment;
806 self.need_marker = true;
807 }
808 }
809 self.last_packet_time = Some(now);
810
811 let increment = match *payload_type {
812 0 | 8 | 18 => payload.len() as u32,
813 9 => payload.len() as u32,
814 111 => (clock_rate / 50) as u32,
815 _ => (clock_rate / 50) as u32,
816 };
817
818 let rtp_timestamp = self.next_rtp_timestamp;
819 self.next_rtp_timestamp += increment;
820 let sequence_number = *sequence_number;
821
822 let mut marker = false;
823 if self.need_marker {
824 marker = true;
825 self.need_marker = false;
826 }
827
828 let frame = RtcAudioFrame {
829 data: Bytes::from(payload.clone()),
830 clock_rate,
831 payload_type: Some(*payload_type),
832 sequence_number: Some(sequence_number),
833 rtp_timestamp,
834 marker,
835 ..Default::default()
836 };
837 source.send_audio(frame).await.ok();
838 }
839 _ => {}
840 }
841 }
842 Ok(())
843 }
844}
845
846impl RtcTrack {
847 fn get_payload_type(&self) -> u8 {
848 if let Some(pt) = self.payload_type {
849 return pt;
850 }
851
852 self.rtc_config.payload_type.unwrap_or_else(|| {
853 match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
854 CodecType::PCMU => 0,
855 CodecType::PCMA => 8,
856 CodecType::Opus => 111,
857 CodecType::G722 => 9,
858 _ => 111,
859 }
860 })
861 }
862}
863
864#[cfg(test)]
865mod tests {
866 use super::*;
867 use crate::media::track::TrackConfig;
868
869 #[test]
870 fn test_parse_sdp_payload_types() {
871 let track_id = "test-track".to_string();
872 let cancel_token = CancellationToken::new();
873 let mut track = RtcTrack::new(
874 cancel_token,
875 track_id,
876 TrackConfig::default(),
877 RtcTrackConfig::default(),
878 );
879
880 let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
882 track
883 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
884 .expect("parse offer");
885 assert_eq!(track.get_payload_type(), 8);
886
887 let mut rtc_config = RtcTrackConfig::default();
889 rtc_config.preferred_codec = Some(CodecType::PCMU);
890 let mut track2 = RtcTrack::new(
891 CancellationToken::new(),
892 "test-track-2".to_string(),
893 TrackConfig::default(),
894 rtc_config,
895 );
896
897 let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
898 track2
899 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
900 .expect("parse offer");
901 assert_eq!(track2.get_payload_type(), 0);
902
903 let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
905 track
906 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
907 .expect("parse offer");
908 assert_eq!(track.get_payload_type(), 111);
909 }
910
911 #[tokio::test]
912 async fn test_rtp_mode_handshake_spawns_handler() {
913 use rustrtc::TransportMode;
914
915 let track_id = "test-track-sip".to_string();
916 let cancel = CancellationToken::new();
917 let track_config = TrackConfig::default();
918 let mut rtc_config = RtcTrackConfig::default();
919 rtc_config.mode = TransportMode::Rtp;
920
921 let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
922
923 let offer = "v=0\r\n\
925o=- 123456 123456 IN IP4 172.0.0.1\r\n\
926s=-\r\n\
927c=IN IP4 172.0.0.1\r\n\
928t=0 0\r\n\
929m=audio 10000 RTP/AVP 0 101\r\n\
930a=rtpmap:0 PCMU/8000\r\n\
931a=rtpmap:101 telephone-event/8000\r\n\
932a=sendrecv\r\n";
933
934 let res = track.handshake(offer.to_string(), None).await;
936 assert!(res.is_ok());
937
938 if let Some(pc) = &track.peer_connection {
940 let transceivers = pc.get_transceivers();
941 assert_eq!(transceivers.len(), 1);
944 assert!(transceivers[0].receiver().is_some());
945 } else {
946 panic!("PeerConnection not initialized");
947 }
948 }
949}