1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::{
6 processor::ProcessorChain,
7 track::{Track, TrackConfig, TrackId, TrackPacketSender},
8 },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16 AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17 PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18 config::MediaCapabilities,
19 media::{
20 MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21 track::SampleStreamTrack,
22 },
23};
24use std::{
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34 pub mode: TransportMode,
35 pub ice_servers: Option<Vec<IceServer>>,
36 pub external_ip: Option<String>,
37 pub rtp_port_range: Option<(u16, u16)>,
38 pub bind_ip: Option<String>,
39 pub preferred_codec: Option<CodecType>,
40 pub codecs: Vec<CodecType>,
41 pub payload_type: Option<u8>,
42 pub enable_latching: Option<bool>,
43}
44
45impl Default for RtcTrackConfig {
46 fn default() -> Self {
47 Self {
48 mode: TransportMode::WebRtc, ice_servers: None,
50 external_ip: None,
51 rtp_port_range: None,
52 bind_ip: None,
53 preferred_codec: None,
54 codecs: Vec::new(),
55 payload_type: None,
56 enable_latching: None,
57 }
58 }
59}
60
61pub struct RtcTrack {
62 track_id: TrackId,
63 track_config: TrackConfig,
64 rtc_config: RtcTrackConfig,
65 processor_chain: ProcessorChain,
66 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
67 cancel_token: CancellationToken,
68 local_source: Option<Arc<SampleStreamSource>>,
69 encoder: TrackCodec,
70 ssrc: u32,
71 payload_type: Option<u8>,
72 pub peer_connection: Option<Arc<PeerConnection>>,
73 next_rtp_timestamp: u32,
74 next_rtp_sequence_number: u16,
75 last_packet_time: Option<Instant>,
76 last_remote_sdp: Option<String>,
77 need_marker: bool,
78}
79
80impl RtcTrack {
81 pub fn new(
82 cancel_token: CancellationToken,
83 id: TrackId,
84 track_config: TrackConfig,
85 rtc_config: RtcTrackConfig,
86 ) -> Self {
87 let processor_chain = ProcessorChain::new(track_config.samplerate);
88 Self {
89 track_id: id,
90 track_config,
91 rtc_config,
92 processor_chain,
93 packet_sender: Arc::new(Mutex::new(None)),
94 cancel_token,
95 local_source: None,
96 encoder: TrackCodec::new(),
97 ssrc: 0,
98 payload_type: None,
99 peer_connection: None,
100 next_rtp_timestamp: 0,
101 next_rtp_sequence_number: 0,
102 last_packet_time: None,
103 last_remote_sdp: None,
104 need_marker: false,
105 }
106 }
107
108 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
109 self.ssrc = ssrc;
110 self
111 }
112
113 pub fn create_audio_track(
114 _codec: CodecType,
115 _stream_id: Option<String>,
116 ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
117 let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
118 (Arc::new(source), track)
119 }
120
121 pub async fn local_description(&self) -> Result<String> {
122 let pc = self
123 .peer_connection
124 .as_ref()
125 .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
126 let offer = pc.create_offer().await?;
127 pc.set_local_description(offer.clone())?;
128 Ok(offer.to_sdp_string())
129 }
130
131 pub async fn create(&mut self) -> Result<()> {
132 if self.peer_connection.is_some() {
133 return Ok(());
134 }
135
136 let mut config = RtcConfiguration::default();
137 if self.ssrc != 0 {
138 config.ssrc_start = self.ssrc;
139 }
140 config.transport_mode = self.rtc_config.mode.clone();
141
142 if let Some(ice_servers) = &self.rtc_config.ice_servers {
143 config.ice_servers = ice_servers.clone();
144 }
145
146 if let Some(external_ip) = &self.rtc_config.external_ip {
147 config.external_ip = Some(external_ip.clone());
148 }
149 if let Some(bind_ip) = &self.rtc_config.bind_ip {
150 config.bind_ip = Some(bind_ip.clone());
151 }
152 if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
153 config.rtp_start_port = Some(rtp_start_port);
154 config.rtp_end_port = Some(rtp_end_port);
155 }
156
157 config.enable_latching = self
158 .rtc_config
159 .enable_latching
160 .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
161
162 if !self.rtc_config.codecs.is_empty() {
163 let mut caps = MediaCapabilities::default();
164 caps.audio.clear();
165
166 for codec in &self.rtc_config.codecs {
167 let cap = match codec {
168 CodecType::PCMU => AudioCapability::pcmu(),
169 CodecType::PCMA => AudioCapability::pcma(),
170 CodecType::G722 => AudioCapability::g722(),
171 CodecType::G729 => AudioCapability::g729(),
172 CodecType::TelephoneEvent => AudioCapability::telephone_event(),
173 #[cfg(feature = "opus")]
174 CodecType::Opus => AudioCapability::opus(),
175 };
176 caps.audio.push(cap);
177 }
178 config.media_capabilities = Some(caps);
179 }
180
181 let peer_connection = Arc::new(PeerConnection::new(config));
182 self.peer_connection = Some(peer_connection.clone());
183
184 let default_codec = CodecType::G722;
185 let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
186
187 let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
188 self.local_source = Some(source);
189
190 let payload_type = self
191 .rtc_config
192 .payload_type
193 .unwrap_or_else(|| codec.payload_type());
194
195 self.payload_type = Some(payload_type);
196
197 let params = RtpCodecParameters {
198 clock_rate: codec.clock_rate(),
199 channels: codec.channels() as u8,
200 payload_type,
201 ..Default::default()
202 };
203
204 peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
205
206 self.spawn_handlers(
208 peer_connection.clone(),
209 self.track_id.clone(),
210 self.processor_chain.clone(),
211 payload_type,
212 );
213
214 Ok(())
215 }
216
217 fn spawn_handlers(
218 &self,
219 pc: Arc<PeerConnection>,
220 track_id: TrackId,
221 processor_chain: ProcessorChain,
222 default_payload_type: u8,
223 ) {
224 let cancel_token = self.cancel_token.clone();
225 let packet_sender = self.packet_sender.clone();
226 let pc_event = pc.clone();
227 let pc_stats = pc.clone();
228 let pc_state = pc.clone();
229 let track_id_log = track_id.clone();
230 let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
231
232 crate::spawn(async move {
233 info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
234
235 let mut events = futures::stream::unfold(pc_event, |pc| async move {
236 pc.recv().await.map(|ev| (ev, pc))
237 })
238 .boxed();
239
240 let mut state_rx = if is_webrtc {
241 Some(pc_state.subscribe_peer_state())
242 } else {
243 None
244 };
245
246 let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
247 let mut event_count = 0;
248 let mut workers = FuturesUnordered::new();
249
250 loop {
251 tokio::select! {
252 _ = cancel_token.cancelled() => {
253 debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
254 break;
255 }
256
257 Some(event) = events.next() => {
258 event_count += 1;
259 let event_type = match &event {
260 PeerConnectionEvent::Track(_) => "Track",
261 PeerConnectionEvent::DataChannel(_) => "DataChannel",
262 };
263 debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
264
265 if let PeerConnectionEvent::Track(transceiver) = event {
266 if let Some(receiver) = transceiver.receiver() {
267 let track = receiver.track();
268 info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
269
270 let (f1, f2) = Self::create_track_workers(
271 track,
272 packet_sender.clone(),
273 track_id_log.clone(),
274 processor_chain.clone(),
275 default_payload_type,
276 );
277 workers.push(f1);
278 workers.push(f2);
279 }
280 }
281 }
282
283 _ = workers.next(), if !workers.is_empty() => {}
284
285 _ = stats_interval.tick() => {
286 match pc_stats.get_stats().await {
287 Ok(stats) => {
288 info!(track_id=%track_id_log, %stats, "RTCP Stats");
289 }
290 Err(e) => {
291 debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
292 }
293 }
294 }
295
296 res = async {
298 if let Some(rx) = state_rx.as_mut() {
299 rx.changed().await
300 } else {
301 std::future::pending().await
302 }
303 } => {
304 if res.is_ok() {
305 if let Some(rx) = state_rx.as_ref() {
306 let s = *rx.borrow();
307 debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
308 match s {
309 PeerConnectionState::Disconnected
310 | PeerConnectionState::Closed
311 | PeerConnectionState::Failed => {
312 info!(
313 track_id = %track_id_log,
314 "peer connection is {:?}, try to close", s
315 );
316 cancel_token.cancel();
317 pc_state.close();
318 break;
319 }
320 _ => {}
321 }
322 }
323 }
324 }
325 }
326 }
327 debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
328 });
329 }
330
331 fn create_track_workers(
332 track: Arc<SampleStreamTrack>,
333 packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
334 track_id: TrackId,
335 processor_chain: ProcessorChain,
336 default_payload_type: u8,
337 ) -> (
338 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
339 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
340 ) {
341 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
342
343 let track_id_proc = track_id.clone();
345 let packet_sender_proc = packet_sender_arc.clone();
346 let processor_chain_proc = processor_chain.clone();
347 let proc_fut = Self::run_processing_worker(
348 rx,
349 track_id_proc,
350 packet_sender_proc,
351 processor_chain_proc,
352 default_payload_type,
353 );
354
355 let track_id_recv = track_id.clone();
357 let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
358
359 (proc_fut.boxed(), recv_fut.boxed())
360 }
361
362 async fn run_processing_worker(
363 mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
364 track_id: TrackId,
365 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
366 mut processor_chain: ProcessorChain,
367 default_payload_type: u8,
368 ) {
369 info!(track_id=%track_id, "RtcTrack processing worker started");
370 while let Some(frame) = rx.recv().await {
371 let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
372 frame,
373 &track_id,
374 &packet_sender,
375 &mut processor_chain,
376 default_payload_type,
377 ))
378 .catch_unwind()
379 .await;
380
381 if let Err(cause) = res {
382 let msg = if let Some(s) = cause.downcast_ref::<&str>() {
383 *s
384 } else if let Some(s) = cause.downcast_ref::<String>() {
385 &s[..]
386 } else {
387 "Unknown panic"
388 };
389 tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
390 break;
391 }
392 }
393 info!(track_id=%track_id, "RtcTrack processing worker stopped");
394 }
395
396 async fn run_receiving_worker(
397 track: Arc<SampleStreamTrack>,
398 tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
399 track_id: TrackId,
400 ) {
401 let mut samples =
402 futures::stream::unfold(
403 track,
404 |t| async move { t.recv().await.ok().map(|s| (s, t)) },
405 )
406 .boxed();
407
408 while let Some(sample) = samples.next().await {
409 if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
410 if let Err(_) = tx.send(frame) {
411 break;
412 }
413 } else {
414 debug!(track_id=%track_id, "Received non-audio sample");
415 }
416 }
417 info!(track_id=%track_id, "RtcTrack receiving worker stopped");
418 }
419
420 async fn process_audio_frame(
421 frame: rustrtc::media::frame::AudioFrame,
422 track_id: &TrackId,
423 packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
424 processor_chain: &mut ProcessorChain,
425 default_payload_type: u8,
426 ) {
427 let packet_sender = packet_sender.lock().await;
428 if let Some(sender) = packet_sender.as_ref() {
429 let payload_type = frame.payload_type.unwrap_or(default_payload_type);
430 let src_codec = match CodecType::try_from(payload_type) {
431 Ok(c) => c,
432 Err(_) => {
433 debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
434 return;
435 }
436 };
437
438 let mut af = AudioFrame {
439 track_id: track_id.clone(),
440 samples: crate::media::Samples::RTP {
441 payload_type,
442 payload: frame.data.to_vec(),
443 sequence_number: frame.sequence_number.unwrap_or(0),
444 },
445 timestamp: crate::media::get_timestamp(),
446 sample_rate: src_codec.samplerate(),
447 channels: src_codec.channels(),
448 ..Default::default()
449 };
450 if let Err(e) = processor_chain.process_frame(&mut af) {
451 debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
452 }
453
454 sender.send(af).ok();
455 }
456 }
457
458 pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
459 use crate::media::negotiate::parse_rtpmap;
460 let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
461
462 if let Some(media) = sdp
463 .media_sections
464 .iter()
465 .find(|m| m.kind == MediaKind::Audio)
466 {
467 for attr in &media.attributes {
468 if attr.key == "rtpmap" {
469 if let Some(value) = &attr.value {
470 if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
471 self.encoder.set_payload_type(pt, codec.clone());
472 self.processor_chain.codec.set_payload_type(pt, codec);
473 }
474 }
475 }
476 }
477
478 let mut negotiated = None;
480
481 if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
484 for preferred_codec in &self.rtc_config.codecs {
485 if *preferred_codec == CodecType::TelephoneEvent {
486 continue;
487 }
488 for fmt in &media.formats {
489 if let Ok(pt) = fmt.parse::<u8>() {
490 let codec = self
491 .encoder
492 .payload_type_map
493 .get(&pt)
494 .cloned()
495 .or_else(|| CodecType::try_from(pt).ok());
496 if let Some(c) = codec {
497 if c == *preferred_codec {
498 negotiated = Some((pt, c));
499 break;
500 }
501 }
502 }
503 }
504 if negotiated.is_some() {
505 break;
506 }
507 }
508 }
509
510 if negotiated.is_none() {
512 for fmt in &media.formats {
513 if let Ok(pt) = fmt.parse::<u8>() {
514 let codec = self
515 .encoder
516 .payload_type_map
517 .get(&pt)
518 .cloned()
519 .or_else(|| CodecType::try_from(pt).ok());
520
521 if let Some(codec) = codec {
522 if codec != CodecType::TelephoneEvent {
523 negotiated = Some((pt, codec));
524 break;
525 }
526 }
527 }
528 }
529 }
530
531 if let Some((pt, codec)) = negotiated {
532 info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
533 self.payload_type = Some(pt);
534 }
535 }
536 Ok(())
537 }
538
539 fn normalize_sdp(sdp: &str) -> String {
540 sdp.lines()
541 .map(|line| {
542 if line.starts_with("o=") {
543 let parts: Vec<&str> = line.split_whitespace().collect();
544 if parts.len() >= 3 {
545 return format!("o= {} {}", parts[1], parts[2]);
546 }
547 }
548 line.to_string()
549 })
550 .filter(|line| {
551 !line.starts_with("t=") && !line.starts_with("a=ssrc:") && !line.starts_with("a=msid:") && !line.trim().is_empty()
555 })
556 .collect::<Vec<_>>()
557 .join("\n")
558 }
559
560 async fn update_remote_description_internal(
561 &mut self,
562 answer: &String,
563 force_update: bool,
564 ) -> Result<()> {
565 info!(
566 track_id=%self.track_id,
567 "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
568 force_update,
569 self.last_remote_sdp.is_some(),
570 self.rtc_config.mode
571 );
572
573 if let Some(pc) = &self.peer_connection {
574 if !force_update {
575 if let Some(ref last_sdp) = self.last_remote_sdp {
576 if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
577 debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
578 return Ok(());
579 }
580 }
581 } else {
582 debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
583 }
584
585 let _is_first_remote_sdp = self.last_remote_sdp.is_none();
586
587 let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
588 match pc.set_remote_description(sdp_obj.clone()).await {
589 Ok(_) => {
590 debug!(track_id=%self.track_id, "set_remote_description succeeded");
591 self.last_remote_sdp = Some(answer.clone());
592 }
593 Err(e) => {
594 if self.rtc_config.mode == TransportMode::Rtp {
595 info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
596
597 if let Some(current_local) = pc.local_description() {
598 let sdp = current_local.to_sdp_string();
599 for line in sdp.lines() {
600 if line.starts_with("a=ssrc:") {
601 info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
602 }
603 }
604 }
605
606 let offer = pc.create_offer().await?;
607
608 let sdp = offer.to_sdp_string();
609 for line in sdp.lines() {
610 if line.starts_with("a=ssrc:") {
611 info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
612 }
613 }
614
615 pc.set_local_description(offer)?;
616 pc.set_remote_description(sdp_obj).await?;
617 self.last_remote_sdp = Some(answer.clone());
618 info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
619 } else {
620 return Err(e.into());
621 }
622 }
623 }
624
625 self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
629 }
630 Ok(())
631 }
632}
633
634#[async_trait]
635impl Track for RtcTrack {
636 fn ssrc(&self) -> u32 {
637 self.ssrc
638 }
639 fn id(&self) -> &TrackId {
640 &self.track_id
641 }
642 fn config(&self) -> &TrackConfig {
643 &self.track_config
644 }
645 fn processor_chain(&mut self) -> &mut ProcessorChain {
646 &mut self.processor_chain
647 }
648
649 async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
650 info!(track_id=%self.track_id, "rtc handshake start");
651 self.create().await?;
652
653 let pc = self.peer_connection.clone().ok_or_else(|| {
654 anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
655 })?;
656
657 debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
658 for (i, t) in pc.get_transceivers().iter().enumerate() {
659 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
660 i, t.kind(), t.mid(), t.direction());
661 }
662
663 let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
664 pc.set_remote_description(sdp.clone()).await?;
665
666 debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
667 for (i, t) in pc.get_transceivers().iter().enumerate() {
668 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
669 i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
670 }
671
672 info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
675
676 self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
677
678 let mut answer = pc.create_answer().await?;
679 crate::media::negotiate::intersect_answer(&sdp, &mut answer);
680
681 pc.set_local_description(answer.clone())?;
682
683 if self.rtc_config.mode != TransportMode::Rtp {
684 pc.wait_for_gathering_complete().await;
685 }
686
687 let final_answer = pc
688 .local_description()
689 .ok_or(anyhow::anyhow!("No local description"))?;
690
691 Ok(final_answer.to_sdp_string())
692 }
693
694 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
695 self.update_remote_description_internal(answer, false).await
696 }
697
698 async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
699 self.update_remote_description_internal(answer, true).await
700 }
701
702 async fn start(
703 &mut self,
704 event_sender: EventSender,
705 packet_sender: TrackPacketSender,
706 ) -> Result<()> {
707 *self.packet_sender.lock().await = Some(packet_sender.clone());
708 let token_clone = self.cancel_token.clone();
709 let event_sender_clone = event_sender.clone();
710 let track_id = self.track_id.clone();
711 let ssrc = self.ssrc;
712
713 if self.rtc_config.mode != TransportMode::Rtp {
714 let start_time = crate::media::get_timestamp();
715 crate::spawn(async move {
716 token_clone.cancelled().await;
717 let _ = event_sender_clone.send(SessionEvent::TrackEnd {
718 track_id,
719 timestamp: crate::media::get_timestamp(),
720 duration: crate::media::get_timestamp() - start_time,
721 ssrc,
722 play_id: None,
723 });
724 });
725 }
726
727 Ok(())
728 }
729
730 async fn stop(&self) -> Result<()> {
731 self.cancel_token.cancel();
732 if let Some(pc) = &self.peer_connection {
733 pc.close();
734 }
735 Ok(())
736 }
737
738 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
739 let packet = packet.clone();
740
741 if let Some(source) = &self.local_source {
742 match &packet.samples {
743 crate::media::Samples::PCM { samples } => {
744 let payload_type = self.get_payload_type();
745 let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
746 let target_codec = CodecType::try_from(payload_type)?;
747 if !encoded.is_empty() {
748 let clock_rate = target_codec.clock_rate();
749
750 let now = Instant::now();
751 if let Some(last_time) = self.last_packet_time {
752 let elapsed = now.duration_since(last_time);
753 if elapsed.as_millis() > 50 {
754 let gap_increment =
755 (elapsed.as_millis() as u32 * clock_rate) / 1000;
756 self.next_rtp_timestamp += gap_increment;
757 self.need_marker = true;
758 }
759 }
760
761 self.last_packet_time = Some(now);
762
763 let timestamp_increment = (samples.len() as u64 * clock_rate as u64
764 / packet.sample_rate as u64
765 / self.track_config.channels as u64)
766 as u32;
767 let rtp_timestamp = self.next_rtp_timestamp;
768 self.next_rtp_timestamp += timestamp_increment;
769 let sequence_number = self.next_rtp_sequence_number;
770 self.next_rtp_sequence_number += 1;
771
772 let mut marker = false;
773 if self.need_marker {
774 marker = true;
775 self.need_marker = false;
776 }
777
778 let frame = RtcAudioFrame {
779 data: Bytes::from(encoded),
780 clock_rate,
781 payload_type: Some(payload_type),
782 sequence_number: Some(sequence_number),
783 rtp_timestamp,
784 marker,
785 ..Default::default()
786 };
787 source.send_audio(frame).await.ok();
788 }
789 }
790 crate::media::Samples::RTP {
791 payload,
792 payload_type,
793 sequence_number,
794 } => {
795 let clock_rate = match *payload_type {
796 0 | 8 | 9 | 18 => 8000,
797 111 => 48000,
798 _ => packet.sample_rate,
799 };
800
801 let now = Instant::now();
802 if let Some(last_time) = self.last_packet_time {
803 let elapsed = now.duration_since(last_time);
804 if elapsed.as_millis() > 50 {
805 let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
806 self.next_rtp_timestamp += gap_increment;
807 self.need_marker = true;
808 }
809 }
810 self.last_packet_time = Some(now);
811
812 let increment = match *payload_type {
813 0 | 8 | 18 => payload.len() as u32,
814 9 => payload.len() as u32,
815 111 => (clock_rate / 50) as u32,
816 _ => (clock_rate / 50) as u32,
817 };
818
819 let rtp_timestamp = self.next_rtp_timestamp;
820 self.next_rtp_timestamp += increment;
821 let sequence_number = *sequence_number;
822
823 let mut marker = false;
824 if self.need_marker {
825 marker = true;
826 self.need_marker = false;
827 }
828
829 let frame = RtcAudioFrame {
830 data: Bytes::from(payload.clone()),
831 clock_rate,
832 payload_type: Some(*payload_type),
833 sequence_number: Some(sequence_number),
834 rtp_timestamp,
835 marker,
836 ..Default::default()
837 };
838 source.send_audio(frame).await.ok();
839 }
840 _ => {}
841 }
842 }
843 Ok(())
844 }
845}
846
847impl RtcTrack {
848 fn get_payload_type(&self) -> u8 {
849 if let Some(pt) = self.payload_type {
850 return pt;
851 }
852
853 self.rtc_config.payload_type.unwrap_or_else(|| {
854 match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
855 CodecType::PCMU => 0,
856 CodecType::PCMA => 8,
857 CodecType::Opus => 111,
858 CodecType::G722 => 9,
859 _ => 111,
860 }
861 })
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868 use crate::media::track::TrackConfig;
869
870 #[test]
871 fn test_parse_sdp_payload_types() {
872 let track_id = "test-track".to_string();
873 let cancel_token = CancellationToken::new();
874 let mut track = RtcTrack::new(
875 cancel_token,
876 track_id,
877 TrackConfig::default(),
878 RtcTrackConfig::default(),
879 );
880
881 let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
883 track
884 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
885 .expect("parse offer");
886 assert_eq!(track.get_payload_type(), 8);
887
888 let mut rtc_config = RtcTrackConfig::default();
890 rtc_config.preferred_codec = Some(CodecType::PCMU);
891 let mut track2 = RtcTrack::new(
892 CancellationToken::new(),
893 "test-track-2".to_string(),
894 TrackConfig::default(),
895 rtc_config,
896 );
897
898 let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
899 track2
900 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
901 .expect("parse offer");
902 assert_eq!(track2.get_payload_type(), 0);
903
904 let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
906 track
907 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
908 .expect("parse offer");
909 assert_eq!(track.get_payload_type(), 111);
910 }
911
912 #[tokio::test]
913 async fn test_rtp_mode_handshake_spawns_handler() {
914 use rustrtc::TransportMode;
915
916 let track_id = "test-track-sip".to_string();
917 let cancel = CancellationToken::new();
918 let track_config = TrackConfig::default();
919 let mut rtc_config = RtcTrackConfig::default();
920 rtc_config.mode = TransportMode::Rtp;
921
922 let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
923
924 let offer = "v=0\r\n\
926o=- 123456 123456 IN IP4 172.0.0.1\r\n\
927s=-\r\n\
928c=IN IP4 172.0.0.1\r\n\
929t=0 0\r\n\
930m=audio 10000 RTP/AVP 0 101\r\n\
931a=rtpmap:0 PCMU/8000\r\n\
932a=rtpmap:101 telephone-event/8000\r\n\
933a=sendrecv\r\n";
934
935 let res = track.handshake(offer.to_string(), None).await;
937 assert!(res.is_ok());
938
939 if let Some(pc) = &track.peer_connection {
941 let transceivers = pc.get_transceivers();
942 assert_eq!(transceivers.len(), 1);
945 assert!(transceivers[0].receiver().is_some());
946 } else {
947 panic!("PeerConnection not initialized");
948 }
949 }
950}