1use super::track_codec::TrackCodec;
2use crate::{
3 event::{EventSender, SessionEvent},
4 media::AudioFrame,
5 media::{
6 processor::ProcessorChain,
7 track::{Track, TrackConfig, TrackId, TrackPacketSender},
8 },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16 AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17 PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18 config::MediaCapabilities,
19 media::{
20 MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21 track::SampleStreamTrack,
22 },
23};
24use std::{
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34 pub mode: TransportMode,
35 pub ice_servers: Option<Vec<IceServer>>,
36 pub external_ip: Option<String>,
37 pub rtp_port_range: Option<(u16, u16)>,
38 pub bind_ip: Option<String>,
39 pub preferred_codec: Option<CodecType>,
40 pub codecs: Vec<CodecType>,
41 pub payload_type: Option<u8>,
42 pub enable_latching: Option<bool>,
43 pub enable_ice_lite: Option<bool>,
44}
45
46impl Default for RtcTrackConfig {
47 fn default() -> Self {
48 Self {
49 mode: TransportMode::WebRtc, ice_servers: None,
51 external_ip: None,
52 rtp_port_range: None,
53 bind_ip: None,
54 preferred_codec: None,
55 codecs: Vec::new(),
56 payload_type: None,
57 enable_latching: None,
58 enable_ice_lite: None,
59 }
60 }
61}
62
63pub struct RtcTrack {
64 track_id: TrackId,
65 track_config: TrackConfig,
66 rtc_config: RtcTrackConfig,
67 processor_chain: ProcessorChain,
68 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
69 cancel_token: CancellationToken,
70 local_source: Option<Arc<SampleStreamSource>>,
71 encoder: TrackCodec,
72 ssrc: u32,
73 payload_type: Option<u8>,
74 pub peer_connection: Option<Arc<PeerConnection>>,
75 next_rtp_timestamp: u32,
76 next_rtp_sequence_number: u16,
77 last_packet_time: Option<Instant>,
78 last_remote_sdp: Option<String>,
79 need_marker: bool,
80}
81
82impl RtcTrack {
83 pub fn new(
84 cancel_token: CancellationToken,
85 id: TrackId,
86 track_config: TrackConfig,
87 rtc_config: RtcTrackConfig,
88 ) -> Self {
89 let processor_chain = ProcessorChain::new(track_config.samplerate);
90 Self {
91 track_id: id,
92 track_config,
93 rtc_config,
94 processor_chain,
95 packet_sender: Arc::new(Mutex::new(None)),
96 cancel_token,
97 local_source: None,
98 encoder: TrackCodec::new(),
99 ssrc: 0,
100 payload_type: None,
101 peer_connection: None,
102 next_rtp_timestamp: 0,
103 next_rtp_sequence_number: 0,
104 last_packet_time: None,
105 last_remote_sdp: None,
106 need_marker: false,
107 }
108 }
109
110 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
111 self.ssrc = ssrc;
112 self
113 }
114
115 pub fn create_audio_track(
116 _codec: CodecType,
117 _stream_id: Option<String>,
118 ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
119 let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
120 (Arc::new(source), track)
121 }
122
123 pub async fn local_description(&self) -> Result<String> {
124 let pc = self
125 .peer_connection
126 .as_ref()
127 .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
128 let offer = pc.create_offer().await?;
129 pc.set_local_description(offer.clone())?;
130 Ok(offer.to_sdp_string())
131 }
132
133 pub async fn create(&mut self) -> Result<()> {
134 if self.peer_connection.is_some() {
135 return Ok(());
136 }
137
138 let mut config = RtcConfiguration::default();
139 if self.ssrc != 0 {
140 config.ssrc_start = self.ssrc;
141 }
142 config.transport_mode = self.rtc_config.mode.clone();
143
144 if let Some(ice_servers) = &self.rtc_config.ice_servers {
145 config.ice_servers = ice_servers.clone();
146 }
147
148 if let Some(external_ip) = &self.rtc_config.external_ip {
149 config.external_ip = Some(external_ip.clone());
150 }
151 if let Some(bind_ip) = &self.rtc_config.bind_ip {
152 config.bind_ip = Some(bind_ip.clone());
153 }
154 if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
155 config.rtp_start_port = Some(rtp_start_port);
156 config.rtp_end_port = Some(rtp_end_port);
157 }
158 config.enable_ice_lite = self.rtc_config.enable_ice_lite.unwrap_or(false);
159 config.enable_latching = self
160 .rtc_config
161 .enable_latching
162 .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
163
164 if !self.rtc_config.codecs.is_empty() {
165 let mut caps = MediaCapabilities::default();
166 caps.audio.clear();
167
168 for codec in &self.rtc_config.codecs {
169 let cap = match codec {
170 CodecType::PCMU => AudioCapability::pcmu(),
171 CodecType::PCMA => AudioCapability::pcma(),
172 CodecType::G722 => AudioCapability::g722(),
173 CodecType::G729 => AudioCapability::g729(),
174 CodecType::TelephoneEvent => AudioCapability::telephone_event(),
175 #[cfg(feature = "opus")]
176 CodecType::Opus => AudioCapability::opus(),
177 };
178 caps.audio.push(cap);
179 }
180 config.media_capabilities = Some(caps);
181 }
182
183 let peer_connection = Arc::new(PeerConnection::new(config));
184 self.peer_connection = Some(peer_connection.clone());
185
186 let default_codec = CodecType::G722;
187 let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
188
189 let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
190 self.local_source = Some(source);
191
192 let payload_type = self
193 .rtc_config
194 .payload_type
195 .unwrap_or_else(|| codec.payload_type());
196
197 self.payload_type = Some(payload_type);
198
199 let params = RtpCodecParameters {
200 clock_rate: codec.clock_rate(),
201 channels: codec.channels() as u8,
202 payload_type,
203 ..Default::default()
204 };
205
206 peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
207
208 self.spawn_handlers(
210 peer_connection.clone(),
211 self.track_id.clone(),
212 self.processor_chain.clone(),
213 payload_type,
214 );
215
216 Ok(())
217 }
218
219 fn spawn_handlers(
220 &self,
221 pc: Arc<PeerConnection>,
222 track_id: TrackId,
223 processor_chain: ProcessorChain,
224 default_payload_type: u8,
225 ) {
226 let cancel_token = self.cancel_token.clone();
227 let packet_sender = self.packet_sender.clone();
228 let pc_event = pc.clone();
229 let pc_stats = pc.clone();
230 let pc_state = pc.clone();
231 let track_id_log = track_id.clone();
232 let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
233
234 crate::spawn(async move {
235 info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
236
237 let mut events = futures::stream::unfold(pc_event, |pc| async move {
238 pc.recv().await.map(|ev| (ev, pc))
239 })
240 .boxed();
241
242 let mut state_rx = if is_webrtc {
243 Some(pc_state.subscribe_peer_state())
244 } else {
245 None
246 };
247
248 let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
249 let mut event_count = 0;
250 let mut workers = FuturesUnordered::new();
251
252 loop {
253 tokio::select! {
254 _ = cancel_token.cancelled() => {
255 debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
256 break;
257 }
258
259 Some(event) = events.next() => {
260 event_count += 1;
261 let event_type = match &event {
262 PeerConnectionEvent::Track(_) => "Track",
263 PeerConnectionEvent::DataChannel(_) => "DataChannel",
264 };
265 debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
266
267 if let PeerConnectionEvent::Track(transceiver) = event {
268 if let Some(receiver) = transceiver.receiver() {
269 let track = receiver.track();
270 info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
271
272 let (f1, f2) = Self::create_track_workers(
273 track,
274 packet_sender.clone(),
275 track_id_log.clone(),
276 processor_chain.clone(),
277 default_payload_type,
278 );
279 workers.push(f1);
280 workers.push(f2);
281 }
282 }
283 }
284
285 _ = workers.next(), if !workers.is_empty() => {}
286
287 _ = stats_interval.tick() => {
288 match pc_stats.get_stats().await {
289 Ok(stats) => {
290 info!(track_id=%track_id_log, %stats, "RTCP Stats");
291 }
292 Err(e) => {
293 debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
294 }
295 }
296 }
297
298 res = async {
300 if let Some(rx) = state_rx.as_mut() {
301 rx.changed().await
302 } else {
303 std::future::pending().await
304 }
305 } => {
306 if res.is_ok() {
307 if let Some(rx) = state_rx.as_ref() {
308 let s = *rx.borrow();
309 debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
310 match s {
311 PeerConnectionState::Disconnected
312 | PeerConnectionState::Closed
313 | PeerConnectionState::Failed => {
314 info!(
315 track_id = %track_id_log,
316 "peer connection is {:?}, try to close", s
317 );
318 cancel_token.cancel();
319 pc_state.close();
320 break;
321 }
322 _ => {}
323 }
324 }
325 }
326 }
327 }
328 }
329 debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
330 });
331 }
332
333 fn create_track_workers(
334 track: Arc<SampleStreamTrack>,
335 packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
336 track_id: TrackId,
337 processor_chain: ProcessorChain,
338 default_payload_type: u8,
339 ) -> (
340 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
341 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
342 ) {
343 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
344
345 let track_id_proc = track_id.clone();
347 let packet_sender_proc = packet_sender_arc.clone();
348 let processor_chain_proc = processor_chain.clone();
349 let proc_fut = Self::run_processing_worker(
350 rx,
351 track_id_proc,
352 packet_sender_proc,
353 processor_chain_proc,
354 default_payload_type,
355 );
356
357 let track_id_recv = track_id.clone();
359 let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
360
361 (proc_fut.boxed(), recv_fut.boxed())
362 }
363
364 async fn run_processing_worker(
365 mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
366 track_id: TrackId,
367 packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
368 mut processor_chain: ProcessorChain,
369 default_payload_type: u8,
370 ) {
371 info!(track_id=%track_id, "RtcTrack processing worker started");
372 while let Some(frame) = rx.recv().await {
373 let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
374 frame,
375 &track_id,
376 &packet_sender,
377 &mut processor_chain,
378 default_payload_type,
379 ))
380 .catch_unwind()
381 .await;
382
383 if let Err(cause) = res {
384 let msg = if let Some(s) = cause.downcast_ref::<&str>() {
385 *s
386 } else if let Some(s) = cause.downcast_ref::<String>() {
387 &s[..]
388 } else {
389 "Unknown panic"
390 };
391 tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
392 break;
393 }
394 }
395 info!(track_id=%track_id, "RtcTrack processing worker stopped");
396 }
397
398 async fn run_receiving_worker(
399 track: Arc<SampleStreamTrack>,
400 tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
401 track_id: TrackId,
402 ) {
403 let mut samples =
404 futures::stream::unfold(
405 track,
406 |t| async move { t.recv().await.ok().map(|s| (s, t)) },
407 )
408 .boxed();
409
410 while let Some(sample) = samples.next().await {
411 if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
412 if let Err(_) = tx.send(frame) {
413 break;
414 }
415 } else {
416 debug!(track_id=%track_id, "Received non-audio sample");
417 }
418 }
419 info!(track_id=%track_id, "RtcTrack receiving worker stopped");
420 }
421
422 async fn process_audio_frame(
423 frame: rustrtc::media::frame::AudioFrame,
424 track_id: &TrackId,
425 packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
426 processor_chain: &mut ProcessorChain,
427 default_payload_type: u8,
428 ) {
429 let packet_sender = packet_sender.lock().await;
430 if let Some(sender) = packet_sender.as_ref() {
431 let payload_type = frame.payload_type.unwrap_or(default_payload_type);
432 let src_codec = match CodecType::try_from(payload_type) {
433 Ok(c) => c,
434 Err(_) => {
435 debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
436 return;
437 }
438 };
439
440 let mut af = AudioFrame {
441 track_id: track_id.clone(),
442 samples: crate::media::Samples::RTP {
443 payload_type,
444 payload: frame.data.to_vec(),
445 sequence_number: frame.sequence_number.unwrap_or(0),
446 },
447 timestamp: crate::media::get_timestamp(),
448 sample_rate: src_codec.samplerate(),
449 channels: src_codec.channels(),
450 ..Default::default()
451 };
452 if let Err(e) = processor_chain.process_frame(&mut af) {
453 debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
454 }
455
456 sender.send(af).ok();
457 }
458 }
459
460 pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
461 use crate::media::negotiate::parse_rtpmap;
462 let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
463
464 if let Some(media) = sdp
465 .media_sections
466 .iter()
467 .find(|m| m.kind == MediaKind::Audio)
468 {
469 for attr in &media.attributes {
470 if attr.key == "rtpmap" {
471 if let Some(value) = &attr.value {
472 if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
473 self.encoder.set_payload_type(pt, codec.clone());
474 self.processor_chain.codec.set_payload_type(pt, codec);
475 }
476 }
477 }
478 }
479
480 let mut negotiated = None;
482
483 if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
486 for preferred_codec in &self.rtc_config.codecs {
487 if *preferred_codec == CodecType::TelephoneEvent {
488 continue;
489 }
490 for fmt in &media.formats {
491 if let Ok(pt) = fmt.parse::<u8>() {
492 let codec = self
493 .encoder
494 .payload_type_map
495 .get(&pt)
496 .cloned()
497 .or_else(|| CodecType::try_from(pt).ok());
498 if let Some(c) = codec {
499 if c == *preferred_codec {
500 negotiated = Some((pt, c));
501 break;
502 }
503 }
504 }
505 }
506 if negotiated.is_some() {
507 break;
508 }
509 }
510 }
511
512 if negotiated.is_none() {
514 for fmt in &media.formats {
515 if let Ok(pt) = fmt.parse::<u8>() {
516 let codec = self
517 .encoder
518 .payload_type_map
519 .get(&pt)
520 .cloned()
521 .or_else(|| CodecType::try_from(pt).ok());
522
523 if let Some(codec) = codec {
524 if codec != CodecType::TelephoneEvent {
525 negotiated = Some((pt, codec));
526 break;
527 }
528 }
529 }
530 }
531 }
532
533 if let Some((pt, codec)) = negotiated {
534 info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
535 self.payload_type = Some(pt);
536 }
537 }
538 Ok(())
539 }
540
541 fn normalize_sdp(sdp: &str) -> String {
542 sdp.lines()
543 .map(|line| {
544 if line.starts_with("o=") {
545 let parts: Vec<&str> = line.split_whitespace().collect();
546 if parts.len() >= 3 {
547 return format!("o= {} {}", parts[1], parts[2]);
548 }
549 }
550 line.to_string()
551 })
552 .filter(|line| {
553 !line.starts_with("t=") && !line.starts_with("a=ssrc:") && !line.starts_with("a=msid:") && !line.trim().is_empty()
557 })
558 .collect::<Vec<_>>()
559 .join("\n")
560 }
561
562 async fn update_remote_description_internal(
563 &mut self,
564 answer: &String,
565 force_update: bool,
566 ) -> Result<()> {
567 info!(
568 track_id=%self.track_id,
569 "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
570 force_update,
571 self.last_remote_sdp.is_some(),
572 self.rtc_config.mode
573 );
574
575 if let Some(pc) = &self.peer_connection {
576 if !force_update {
577 if let Some(ref last_sdp) = self.last_remote_sdp {
578 if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
579 debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
580 return Ok(());
581 }
582 }
583 } else {
584 debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
585 }
586
587 let _is_first_remote_sdp = self.last_remote_sdp.is_none();
588
589 let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
590 match pc.set_remote_description(sdp_obj.clone()).await {
591 Ok(_) => {
592 debug!(track_id=%self.track_id, "set_remote_description succeeded");
593 self.last_remote_sdp = Some(answer.clone());
594 }
595 Err(e) => {
596 if self.rtc_config.mode == TransportMode::Rtp {
597 info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
598
599 if let Some(current_local) = pc.local_description() {
600 let sdp = current_local.to_sdp_string();
601 for line in sdp.lines() {
602 if line.starts_with("a=ssrc:") {
603 info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
604 }
605 }
606 }
607
608 let offer = pc.create_offer().await?;
609
610 let sdp = offer.to_sdp_string();
611 for line in sdp.lines() {
612 if line.starts_with("a=ssrc:") {
613 info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
614 }
615 }
616
617 pc.set_local_description(offer)?;
618 pc.set_remote_description(sdp_obj).await?;
619 self.last_remote_sdp = Some(answer.clone());
620 info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
621 } else {
622 return Err(e.into());
623 }
624 }
625 }
626
627 self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
631 }
632 Ok(())
633 }
634}
635
636#[async_trait]
637impl Track for RtcTrack {
638 fn ssrc(&self) -> u32 {
639 self.ssrc
640 }
641 fn id(&self) -> &TrackId {
642 &self.track_id
643 }
644 fn config(&self) -> &TrackConfig {
645 &self.track_config
646 }
647 fn processor_chain(&mut self) -> &mut ProcessorChain {
648 &mut self.processor_chain
649 }
650
651 async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
652 info!(track_id=%self.track_id, "rtc handshake start");
653 self.create().await?;
654
655 let pc = self.peer_connection.clone().ok_or_else(|| {
656 anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
657 })?;
658
659 debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
660 for (i, t) in pc.get_transceivers().iter().enumerate() {
661 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
662 i, t.kind(), t.mid(), t.direction());
663 }
664
665 let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
666 pc.set_remote_description(sdp.clone()).await?;
667
668 debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
669 for (i, t) in pc.get_transceivers().iter().enumerate() {
670 debug!(track_id=%self.track_id, " Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
671 i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
672 }
673
674 info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
677
678 self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
679
680 let mut answer = pc.create_answer().await?;
681 crate::media::negotiate::intersect_answer(&sdp, &mut answer);
682
683 pc.set_local_description(answer.clone())?;
684
685 if self.rtc_config.mode != TransportMode::Rtp {
686 pc.wait_for_gathering_complete().await;
687 }
688
689 let final_answer = pc
690 .local_description()
691 .ok_or(anyhow::anyhow!("No local description"))?;
692
693 Ok(final_answer.to_sdp_string())
694 }
695
696 async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
697 self.update_remote_description_internal(answer, false).await
698 }
699
700 async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
701 self.update_remote_description_internal(answer, true).await
702 }
703
704 async fn start(
705 &mut self,
706 event_sender: EventSender,
707 packet_sender: TrackPacketSender,
708 ) -> Result<()> {
709 *self.packet_sender.lock().await = Some(packet_sender.clone());
710 let token_clone = self.cancel_token.clone();
711 let event_sender_clone = event_sender.clone();
712 let track_id = self.track_id.clone();
713 let ssrc = self.ssrc;
714
715 if self.rtc_config.mode != TransportMode::Rtp {
716 let start_time = crate::media::get_timestamp();
717 crate::spawn(async move {
718 token_clone.cancelled().await;
719 let _ = event_sender_clone.send(SessionEvent::TrackEnd {
720 track_id,
721 timestamp: crate::media::get_timestamp(),
722 duration: crate::media::get_timestamp() - start_time,
723 ssrc,
724 play_id: None,
725 });
726 });
727 }
728
729 Ok(())
730 }
731
732 async fn stop(&self) -> Result<()> {
733 self.cancel_token.cancel();
734 if let Some(pc) = &self.peer_connection {
735 pc.close();
736 }
737 Ok(())
738 }
739
740 async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
741 let packet = packet.clone();
742
743 if let Some(source) = &self.local_source {
744 match &packet.samples {
745 crate::media::Samples::PCM { samples } => {
746 let payload_type = self.get_payload_type();
747 let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
748 let target_codec = CodecType::try_from(payload_type)?;
749 if !encoded.is_empty() {
750 let clock_rate = target_codec.clock_rate();
751
752 let now = Instant::now();
753 if let Some(last_time) = self.last_packet_time {
754 let elapsed = now.duration_since(last_time);
755 if elapsed.as_millis() > 50 {
756 let gap_increment =
757 (elapsed.as_millis() as u32 * clock_rate) / 1000;
758 self.next_rtp_timestamp += gap_increment;
759 self.need_marker = true;
760 }
761 }
762
763 self.last_packet_time = Some(now);
764
765 let timestamp_increment = (samples.len() as u64 * clock_rate as u64
766 / packet.sample_rate as u64
767 / self.track_config.channels as u64)
768 as u32;
769 let rtp_timestamp = self.next_rtp_timestamp;
770 self.next_rtp_timestamp += timestamp_increment;
771 let sequence_number = self.next_rtp_sequence_number;
772 self.next_rtp_sequence_number += 1;
773
774 let mut marker = false;
775 if self.need_marker {
776 marker = true;
777 self.need_marker = false;
778 }
779
780 let frame = RtcAudioFrame {
781 data: Bytes::from(encoded),
782 clock_rate,
783 payload_type: Some(payload_type),
784 sequence_number: Some(sequence_number),
785 rtp_timestamp,
786 marker,
787 ..Default::default()
788 };
789 source.try_send_audio(frame).ok();
790 }
791 }
792 crate::media::Samples::RTP {
793 payload,
794 payload_type,
795 sequence_number,
796 } => {
797 let target_codec = CodecType::try_from(*payload_type)?;
798 let clock_rate = target_codec.clock_rate();
799
800 let now = Instant::now();
801 if let Some(last_time) = self.last_packet_time {
802 let elapsed = now.duration_since(last_time);
803 if elapsed.as_millis() > 50 {
804 let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
805 self.next_rtp_timestamp += gap_increment;
806 self.need_marker = true;
807 }
808 }
809 self.last_packet_time = Some(now);
810
811 let increment = match *payload_type {
812 0 | 8 | 18 => payload.len() as u32,
813 9 => payload.len() as u32,
814 111 => (clock_rate / 50) as u32,
815 _ => (clock_rate / 50) as u32,
816 };
817
818 let rtp_timestamp = self.next_rtp_timestamp;
819 self.next_rtp_timestamp += increment;
820 let sequence_number = *sequence_number;
821
822 let mut marker = false;
823 if self.need_marker {
824 marker = true;
825 self.need_marker = false;
826 }
827
828 let frame = RtcAudioFrame {
829 data: Bytes::from(payload.clone()),
830 clock_rate,
831 payload_type: Some(*payload_type),
832 sequence_number: Some(sequence_number),
833 rtp_timestamp,
834 marker,
835 ..Default::default()
836 };
837 source.try_send_audio(frame).ok();
838 }
839 _ => {}
840 }
841 }
842 Ok(())
843 }
844}
845
846impl RtcTrack {
847 fn get_payload_type(&self) -> u8 {
848 if let Some(pt) = self.payload_type {
849 return pt;
850 }
851
852 self.rtc_config.payload_type.unwrap_or_else(|| {
853 match self.rtc_config.preferred_codec.unwrap_or(CodecType::G722) {
854 CodecType::PCMU => 0,
855 CodecType::PCMA => 8,
856 #[cfg(feature = "opus")]
857 CodecType::Opus => 111,
858 CodecType::G722 => 9,
859 CodecType::G729 => 18,
860 _ => 111,
861 }
862 })
863 }
864}
865
866#[cfg(test)]
867mod tests {
868 use super::*;
869 use crate::media::track::TrackConfig;
870
871 #[test]
872 fn test_parse_sdp_payload_types() {
873 let track_id = "test-track".to_string();
874 let cancel_token = CancellationToken::new();
875 let mut track = RtcTrack::new(
876 cancel_token,
877 track_id,
878 TrackConfig::default(),
879 RtcTrackConfig::default(),
880 );
881
882 let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
884 track
885 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
886 .expect("parse offer");
887 assert_eq!(track.get_payload_type(), 8);
888
889 let mut rtc_config = RtcTrackConfig::default();
891 rtc_config.preferred_codec = Some(CodecType::PCMU);
892 let mut track2 = RtcTrack::new(
893 CancellationToken::new(),
894 "test-track-2".to_string(),
895 TrackConfig::default(),
896 rtc_config,
897 );
898
899 let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
900 track2
901 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
902 .expect("parse offer");
903 assert_eq!(track2.get_payload_type(), 0);
904
905 let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
907 track
908 .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
909 .expect("parse offer");
910 assert_eq!(track.get_payload_type(), 111);
911 }
912
913 #[tokio::test]
914 async fn test_rtp_mode_handshake_spawns_handler() {
915 use rustrtc::TransportMode;
916
917 let track_id = "test-track-sip".to_string();
918 let cancel = CancellationToken::new();
919 let track_config = TrackConfig::default();
920 let mut rtc_config = RtcTrackConfig::default();
921 rtc_config.mode = TransportMode::Rtp;
922
923 let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
924
925 let offer = "v=0\r\n\
927o=- 123456 123456 IN IP4 172.0.0.1\r\n\
928s=-\r\n\
929c=IN IP4 172.0.0.1\r\n\
930t=0 0\r\n\
931m=audio 10000 RTP/AVP 0 101\r\n\
932a=rtpmap:0 PCMU/8000\r\n\
933a=rtpmap:101 telephone-event/8000\r\n\
934a=sendrecv\r\n";
935
936 let res = track.handshake(offer.to_string(), None).await;
938 assert!(res.is_ok());
939
940 if let Some(pc) = &track.peer_connection {
942 let transceivers = pc.get_transceivers();
943 assert_eq!(transceivers.len(), 1);
946 assert!(transceivers[0].receiver().is_some());
947 } else {
948 panic!("PeerConnection not initialized");
949 }
950 }
951}