active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::StreamExt;
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub preferred_codec: Option<CodecType>,
39    pub codecs: Vec<CodecType>,
40    pub payload_type: Option<u8>,
41}
42
43impl Default for RtcTrackConfig {
44    fn default() -> Self {
45        Self {
46            mode: TransportMode::WebRtc, // Default WebRTC behavior
47            ice_servers: None,
48            external_ip: None,
49            rtp_port_range: None,
50            preferred_codec: None,
51            codecs: Vec::new(),
52            payload_type: None,
53        }
54    }
55}
56
57pub struct RtcTrack {
58    track_id: TrackId,
59    track_config: TrackConfig,
60    rtc_config: RtcTrackConfig,
61    processor_chain: ProcessorChain,
62    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
63    cancel_token: CancellationToken,
64    local_source: Option<Arc<SampleStreamSource>>,
65    encoder: TrackCodec,
66    ssrc: u32,
67    payload_type: Option<u8>,
68    pub peer_connection: Option<Arc<PeerConnection>>,
69    next_rtp_timestamp: u32,
70    next_rtp_sequence_number: u16,
71    last_packet_time: Option<Instant>,
72    last_remote_sdp: Option<String>,
73    need_marker: bool,
74}
75
76impl RtcTrack {
77    pub fn new(
78        cancel_token: CancellationToken,
79        id: TrackId,
80        track_config: TrackConfig,
81        rtc_config: RtcTrackConfig,
82    ) -> Self {
83        let processor_chain = ProcessorChain::new(track_config.samplerate);
84        Self {
85            track_id: id,
86            track_config,
87            rtc_config,
88            processor_chain,
89            packet_sender: Arc::new(Mutex::new(None)),
90            cancel_token,
91            local_source: None,
92            encoder: TrackCodec::new(),
93            ssrc: 0,
94            payload_type: None,
95            peer_connection: None,
96            next_rtp_timestamp: 0,
97            next_rtp_sequence_number: 0,
98            last_packet_time: None,
99            last_remote_sdp: None,
100            need_marker: false,
101        }
102    }
103
104    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
105        self.ssrc = ssrc;
106        self
107    }
108
109    pub fn create_audio_track(
110        _codec: CodecType,
111        _stream_id: Option<String>,
112    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
113        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
114        (Arc::new(source), track)
115    }
116
117    pub async fn local_description(&self) -> Result<String> {
118        let pc = self
119            .peer_connection
120            .as_ref()
121            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
122        let offer = pc.create_offer().await?;
123        pc.set_local_description(offer.clone())?;
124        Ok(offer.to_sdp_string())
125    }
126
127    pub async fn create(&mut self) -> Result<()> {
128        if self.peer_connection.is_some() {
129            return Ok(());
130        }
131
132        let mut config = RtcConfiguration::default();
133        if self.ssrc != 0 {
134            config.ssrc_start = self.ssrc;
135        }
136        config.transport_mode = self.rtc_config.mode.clone();
137
138        if let Some(ice_servers) = &self.rtc_config.ice_servers {
139            config.ice_servers = ice_servers.clone();
140        }
141
142        if let Some(external_ip) = &self.rtc_config.external_ip {
143            config.external_ip = Some(external_ip.clone());
144        }
145
146        if !self.rtc_config.codecs.is_empty() {
147            let mut caps = MediaCapabilities::default();
148            caps.audio.clear();
149
150            for codec in &self.rtc_config.codecs {
151                let cap = match codec {
152                    CodecType::PCMU => AudioCapability::pcmu(),
153                    CodecType::PCMA => AudioCapability::pcma(),
154                    CodecType::G722 => AudioCapability::g722(),
155                    CodecType::G729 => AudioCapability::g729(),
156                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
157                    #[cfg(feature = "opus")]
158                    CodecType::Opus => AudioCapability::opus(),
159                };
160                caps.audio.push(cap);
161            }
162            config.media_capabilities = Some(caps);
163        }
164
165        let peer_connection = Arc::new(PeerConnection::new(config));
166        self.peer_connection = Some(peer_connection.clone());
167
168        let default_codec = CodecType::G722;
169        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
170
171        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
172        self.local_source = Some(source);
173
174        let payload_type = self
175            .rtc_config
176            .payload_type
177            .unwrap_or_else(|| codec.payload_type());
178
179        self.payload_type = Some(payload_type);
180
181        let params = RtpCodecParameters {
182            clock_rate: codec.clock_rate(),
183            channels: codec.channels() as u8,
184            payload_type,
185            ..Default::default()
186        };
187
188        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
189
190        // Spawn Handler Logic
191        self.spawn_handlers(
192            peer_connection.clone(),
193            self.track_id.clone(),
194            self.processor_chain.clone(),
195            payload_type,
196        );
197
198        if self.rtc_config.mode == TransportMode::Rtp {
199            for transceiver in peer_connection.get_transceivers() {
200                if let Some(receiver) = transceiver.receiver() {
201                    let track = receiver.track();
202                    info!(track_id=%self.track_id, "RTP mode: starting receiver track handler");
203                    Self::spawn_track_handler(
204                        track,
205                        self.packet_sender.clone(),
206                        self.track_id.clone(),
207                        self.cancel_token.clone(),
208                        self.processor_chain.clone(),
209                        self.get_payload_type(),
210                    );
211                }
212            }
213        }
214
215        Ok(())
216    }
217
218    fn spawn_handlers(
219        &self,
220        pc: Arc<PeerConnection>,
221        track_id: TrackId,
222        processor_chain: ProcessorChain,
223        default_payload_type: u8,
224    ) {
225        let cancel_token = self.cancel_token.clone();
226        let packet_sender = self.packet_sender.clone();
227        let pc_clone = pc.clone();
228        let track_id_log = track_id.clone();
229        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
230
231        // 1. Event Loop
232        crate::spawn(async move {
233            info!(track_id=%track_id_log, "RtcTrack event loop started");
234            let mut events = futures::stream::unfold(pc_clone.clone(), |pc| async move {
235                pc.recv().await.map(|ev| (ev, pc))
236            })
237            .take_until(cancel_token.cancelled())
238            .boxed();
239
240            let mut event_count = 0;
241            while let Some(event) = events.next().await {
242                event_count += 1;
243                let event_type = match &event {
244                    PeerConnectionEvent::Track(_) => "Track",
245                    PeerConnectionEvent::DataChannel(_) => "DataChannel",
246                };
247                debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
248
249                if let PeerConnectionEvent::Track(transceiver) = event {
250                    if let Some(receiver) = transceiver.receiver() {
251                        let track = receiver.track();
252                        info!(track_id=%track_id_log, "New track received");
253
254                        Self::spawn_track_handler(
255                            track,
256                            packet_sender.clone(),
257                            track_id_log.clone(),
258                            cancel_token.clone(),
259                            processor_chain.clone(),
260                            default_payload_type.clone(),
261                        );
262                    }
263                }
264            }
265            debug!(track_id=%track_id_log, "RtcTrack event loop ended, total events: {}", event_count);
266        });
267
268        // 2. State Monitoring
269        if is_webrtc {
270            let pc_state = pc.clone();
271            let cancel_token_state = self.cancel_token.clone();
272            let mut state_rx = pc_state.subscribe_peer_state();
273            let track_id_state = track_id.clone();
274
275            crate::spawn(async move {
276                while state_rx.changed().await.is_ok() {
277                    let s = *state_rx.borrow();
278                    debug!(track_id=%track_id_state, "peer connection state changed: {:?}", s);
279                    match s {
280                        PeerConnectionState::Disconnected
281                        | PeerConnectionState::Closed
282                        | PeerConnectionState::Failed => {
283                            info!(
284                                track_id = %track_id_state,
285                                "peer connection is {:?}, try to close", s
286                            );
287                            cancel_token_state.cancel();
288                            pc_state.close();
289                            break;
290                        }
291                        _ => {}
292                    }
293                }
294            });
295        }
296    }
297
298    fn spawn_track_handler(
299        track: Arc<SampleStreamTrack>,
300        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
301        track_id: TrackId,
302        cancel_token: CancellationToken,
303        processor_chain: ProcessorChain,
304        default_payload_type: u8,
305    ) {
306        let (tx, mut rx) =
307            tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
308
309        // Processing Worker
310        let track_id_proc = track_id.clone();
311        let packet_sender_proc = packet_sender_arc.clone();
312        let mut processor_chain_proc = processor_chain.clone();
313        let cancel_token_proc = cancel_token.clone();
314        crate::spawn(async move {
315            info!(track_id=%track_id_proc, "RtcTrack processing worker started");
316            while let Some(frame) = rx.recv().await {
317                if cancel_token_proc.is_cancelled() {
318                    break;
319                }
320                Self::process_audio_frame(
321                    frame,
322                    &track_id_proc,
323                    &packet_sender_proc,
324                    &mut processor_chain_proc,
325                    default_payload_type,
326                )
327                .await;
328            }
329            info!(track_id=%track_id_proc, "RtcTrack processing worker stopped");
330        });
331
332        // Receiving Worker
333        crate::spawn(async move {
334            let mut samples =
335                futures::stream::unfold(
336                    track,
337                    |t| async move { t.recv().await.ok().map(|s| (s, t)) },
338                )
339                .take_until(cancel_token.cancelled())
340                .boxed();
341
342            while let Some(sample) = samples.next().await {
343                if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
344                    if let Err(_) = tx.send(frame) {
345                        break;
346                    }
347                }
348            }
349        });
350    }
351
352    async fn process_audio_frame(
353        frame: rustrtc::media::frame::AudioFrame,
354        track_id: &TrackId,
355        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
356        processor_chain: &mut ProcessorChain,
357        default_payload_type: u8,
358    ) {
359        let packet_sender = packet_sender.lock().await;
360        if let Some(sender) = packet_sender.as_ref() {
361            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
362            let src_codec = match CodecType::try_from(payload_type) {
363                Ok(c) => c,
364                Err(_) => {
365                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
366                    return;
367                }
368            };
369
370            let mut af = AudioFrame {
371                track_id: track_id.clone(),
372                samples: crate::media::Samples::RTP {
373                    payload_type,
374                    payload: frame.data.to_vec(),
375                    sequence_number: frame.sequence_number.unwrap_or(0),
376                },
377                timestamp: crate::media::get_timestamp(),
378                sample_rate: src_codec.samplerate(),
379                channels: src_codec.channels(),
380            };
381            if let Err(e) = processor_chain.process_frame(&mut af) {
382                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
383            }
384
385            sender.send(af).ok();
386        }
387    }
388
389    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
390        use crate::media::negotiate::parse_rtpmap;
391        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
392
393        if let Some(media) = sdp
394            .media_sections
395            .iter()
396            .find(|m| m.kind == MediaKind::Audio)
397        {
398            for attr in &media.attributes {
399                if attr.key == "rtpmap" {
400                    if let Some(value) = &attr.value {
401                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
402                            self.encoder.set_payload_type(pt, codec.clone());
403                            self.processor_chain.codec.set_payload_type(pt, codec);
404                        }
405                    }
406                }
407            }
408
409            // Negotiate primary audio codec
410            let mut negotiated = None;
411
412            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
413            // that is also present in the answer.
414            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
415                for preferred_codec in &self.rtc_config.codecs {
416                    if *preferred_codec == CodecType::TelephoneEvent {
417                        continue;
418                    }
419                    for fmt in &media.formats {
420                        if let Ok(pt) = fmt.parse::<u8>() {
421                            let codec = self
422                                .encoder
423                                .payload_type_map
424                                .get(&pt)
425                                .cloned()
426                                .or_else(|| CodecType::try_from(pt).ok());
427                            if let Some(c) = codec {
428                                if c == *preferred_codec {
429                                    negotiated = Some((pt, c));
430                                    break;
431                                }
432                            }
433                        }
434                    }
435                    if negotiated.is_some() {
436                        break;
437                    }
438                }
439            }
440
441            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
442            if negotiated.is_none() {
443                for fmt in &media.formats {
444                    if let Ok(pt) = fmt.parse::<u8>() {
445                        let codec = self
446                            .encoder
447                            .payload_type_map
448                            .get(&pt)
449                            .cloned()
450                            .or_else(|| CodecType::try_from(pt).ok());
451
452                        if let Some(codec) = codec {
453                            if codec != CodecType::TelephoneEvent {
454                                negotiated = Some((pt, codec));
455                                break;
456                            }
457                        }
458                    }
459                }
460            }
461
462            if let Some((pt, codec)) = negotiated {
463                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
464                self.payload_type = Some(pt);
465            }
466        }
467        Ok(())
468    }
469
470    fn normalize_sdp(sdp: &str) -> String {
471        sdp.lines()
472            .map(|line| {
473                if line.starts_with("o=") {
474                    let parts: Vec<&str> = line.split_whitespace().collect();
475                    if parts.len() >= 3 {
476                        return format!("o= {} {}", parts[1], parts[2]);
477                    }
478                }
479                line.to_string()
480            })
481            .filter(|line| {
482                !line.starts_with("t=") &&  // timing line can vary
483                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
484                !line.starts_with("a=msid:") &&  // media stream ID
485                !line.trim().is_empty()
486            })
487            .collect::<Vec<_>>()
488            .join("\n")
489    }
490
491    async fn update_remote_description_internal(
492        &mut self,
493        answer: &String,
494        force_update: bool,
495    ) -> Result<()> {
496        if let Some(pc) = &self.peer_connection {
497            if !force_update {
498                if let Some(ref last_sdp) = self.last_remote_sdp {
499                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
500                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
501                        return Ok(());
502                    }
503                }
504            } else {
505                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
506            }
507
508            let is_first_remote_sdp = self.last_remote_sdp.is_none();
509
510            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
511            match pc.set_remote_description(sdp_obj.clone()).await {
512                Ok(_) => {
513                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
514                    self.last_remote_sdp = Some(answer.clone());
515                }
516                Err(e) => {
517                    if self.rtc_config.mode == TransportMode::Rtp {
518                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
519
520                        if let Some(current_local) = pc.local_description() {
521                            let sdp = current_local.to_sdp_string();
522                            for line in sdp.lines() {
523                                if line.starts_with("a=ssrc:") {
524                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
525                                }
526                            }
527                        }
528
529                        let offer = pc.create_offer().await?;
530
531                        let sdp = offer.to_sdp_string();
532                        for line in sdp.lines() {
533                            if line.starts_with("a=ssrc:") {
534                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
535                            }
536                        }
537
538                        pc.set_local_description(offer)?;
539                        pc.set_remote_description(sdp_obj).await?;
540                        self.last_remote_sdp = Some(answer.clone());
541                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
542                    } else {
543                        return Err(e.into());
544                    }
545                }
546            }
547
548            if is_first_remote_sdp && self.rtc_config.mode != TransportMode::Rtp {
549                for transceiver in pc.get_transceivers() {
550                    if let Some(receiver) = transceiver.receiver() {
551                        let track = receiver.track();
552                        info!(track_id=%self.track_id, "WebRTC mode: manually starting receiver track handler after first answer");
553                        Self::spawn_track_handler(
554                            track,
555                            self.packet_sender.clone(),
556                            self.track_id.clone(),
557                            self.cancel_token.clone(),
558                            self.processor_chain.clone(),
559                            self.get_payload_type(),
560                        );
561                    }
562                }
563            }
564
565            // Extract negotiated payload types from SDP string
566            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
567        }
568        Ok(())
569    }
570}
571
572#[async_trait]
573impl Track for RtcTrack {
574    fn ssrc(&self) -> u32 {
575        self.ssrc
576    }
577    fn id(&self) -> &TrackId {
578        &self.track_id
579    }
580    fn config(&self) -> &TrackConfig {
581        &self.track_config
582    }
583    fn processor_chain(&mut self) -> &mut ProcessorChain {
584        &mut self.processor_chain
585    }
586
587    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
588        info!(track_id=%self.track_id, "rtc handshake start");
589        self.create().await?;
590
591        let pc = self.peer_connection.clone().ok_or_else(|| {
592            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
593        })?;
594
595        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
596        for (i, t) in pc.get_transceivers().iter().enumerate() {
597            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}", 
598                i, t.kind(), t.mid(), t.direction());
599        }
600
601        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
602        pc.set_remote_description(sdp.clone()).await?;
603
604        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
605        for (i, t) in pc.get_transceivers().iter().enumerate() {
606            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}", 
607                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
608        }
609
610        // CRITICAL FIX: When server-initiated signaling (common WebRTC pattern),
611        // Track events fire when remote peer sends offer, not when we accept answer.
612        // Since we received remote offer here and added local track first,
613        // we must manually start receiver tracks as Track events won't fire.
614        if self.rtc_config.mode != TransportMode::Rtp {
615            for transceiver in pc.get_transceivers() {
616                if let Some(receiver) = transceiver.receiver() {
617                    let track = receiver.track();
618                    info!(track_id=%self.track_id, "WebRTC handshake: manually starting receiver track handler for browser audio");
619                    Self::spawn_track_handler(
620                        track,
621                        self.packet_sender.clone(),
622                        self.track_id.clone(),
623                        self.cancel_token.clone(),
624                        self.processor_chain.clone(),
625                        self.get_payload_type(),
626                    );
627                }
628            }
629        }
630
631        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
632
633        let mut answer = pc.create_answer().await?;
634        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
635
636        pc.set_local_description(answer.clone())?;
637
638        if self.rtc_config.mode != TransportMode::Rtp {
639            pc.wait_for_gathering_complete().await;
640        }
641
642        let final_answer = pc
643            .local_description()
644            .ok_or(anyhow::anyhow!("No local description"))?;
645
646        Ok(final_answer.to_sdp_string())
647    }
648
649    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
650        self.update_remote_description_internal(answer, false).await
651    }
652
653    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
654        self.update_remote_description_internal(answer, true).await
655    }
656
657    async fn start(
658        &mut self,
659        event_sender: EventSender,
660        packet_sender: TrackPacketSender,
661    ) -> Result<()> {
662        *self.packet_sender.lock().await = Some(packet_sender.clone());
663        let token_clone = self.cancel_token.clone();
664        let event_sender_clone = event_sender.clone();
665        let track_id = self.track_id.clone();
666        let ssrc = self.ssrc;
667
668        if self.rtc_config.mode != TransportMode::Rtp {
669            let start_time = crate::media::get_timestamp();
670            crate::spawn(async move {
671                token_clone.cancelled().await;
672                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
673                    track_id,
674                    timestamp: crate::media::get_timestamp(),
675                    duration: crate::media::get_timestamp() - start_time,
676                    ssrc,
677                    play_id: None,
678                });
679            });
680        }
681
682        Ok(())
683    }
684
685    async fn stop(&self) -> Result<()> {
686        self.cancel_token.cancel();
687        if let Some(pc) = &self.peer_connection {
688            pc.close();
689        }
690        Ok(())
691    }
692
693    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
694        let packet = packet.clone();
695
696        if let Some(source) = &self.local_source {
697            match &packet.samples {
698                crate::media::Samples::PCM { samples } => {
699                    let payload_type = self.get_payload_type();
700                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
701                    let target_codec = CodecType::try_from(payload_type)?;
702                    if !encoded.is_empty() {
703                        let clock_rate = target_codec.clock_rate();
704
705                        let now = Instant::now();
706                        if let Some(last_time) = self.last_packet_time {
707                            let elapsed = now.duration_since(last_time);
708                            if elapsed.as_millis() > 50 {
709                                let gap_increment =
710                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
711                                self.next_rtp_timestamp += gap_increment;
712                                self.need_marker = true;
713                            }
714                        }
715
716                        self.last_packet_time = Some(now);
717
718                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
719                            / packet.sample_rate as u64
720                            / self.track_config.channels as u64)
721                            as u32;
722                        let rtp_timestamp = self.next_rtp_timestamp;
723                        self.next_rtp_timestamp += timestamp_increment;
724                        let sequence_number = self.next_rtp_sequence_number;
725                        self.next_rtp_sequence_number += 1;
726
727                        let mut marker = false;
728                        if self.need_marker {
729                            marker = true;
730                            self.need_marker = false;
731                        }
732
733                        let frame = RtcAudioFrame {
734                            data: Bytes::from(encoded),
735                            clock_rate,
736                            payload_type: Some(payload_type),
737                            sequence_number: Some(sequence_number),
738                            rtp_timestamp,
739                            marker,
740                            ..Default::default()
741                        };
742                        source.send_audio(frame).await.ok();
743                    }
744                }
745                crate::media::Samples::RTP {
746                    payload,
747                    payload_type,
748                    sequence_number,
749                } => {
750                    let clock_rate = match *payload_type {
751                        0 | 8 | 9 | 18 => 8000,
752                        111 => 48000,
753                        _ => packet.sample_rate,
754                    };
755
756                    let now = Instant::now();
757                    if let Some(last_time) = self.last_packet_time {
758                        let elapsed = now.duration_since(last_time);
759                        if elapsed.as_millis() > 50 {
760                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
761                            self.next_rtp_timestamp += gap_increment;
762                            self.need_marker = true;
763                        }
764                    }
765                    self.last_packet_time = Some(now);
766
767                    let increment = match *payload_type {
768                        0 | 8 | 18 => payload.len() as u32,
769                        9 => payload.len() as u32,
770                        111 => (clock_rate / 50) as u32,
771                        _ => (clock_rate / 50) as u32,
772                    };
773
774                    let rtp_timestamp = self.next_rtp_timestamp;
775                    self.next_rtp_timestamp += increment;
776                    let sequence_number = *sequence_number;
777
778                    let mut marker = false;
779                    if self.need_marker {
780                        marker = true;
781                        self.need_marker = false;
782                    }
783
784                    let frame = RtcAudioFrame {
785                        data: Bytes::from(payload.clone()),
786                        clock_rate,
787                        payload_type: Some(*payload_type),
788                        sequence_number: Some(sequence_number),
789                        rtp_timestamp,
790                        marker,
791                        ..Default::default()
792                    };
793                    source.send_audio(frame).await.ok();
794                }
795                _ => {}
796            }
797        }
798        Ok(())
799    }
800}
801
802impl RtcTrack {
803    fn get_payload_type(&self) -> u8 {
804        if let Some(pt) = self.payload_type {
805            return pt;
806        }
807
808        self.rtc_config.payload_type.unwrap_or_else(|| {
809            match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
810                CodecType::PCMU => 0,
811                CodecType::PCMA => 8,
812                CodecType::Opus => 111,
813                CodecType::G722 => 9,
814                _ => 111,
815            }
816        })
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use crate::media::track::TrackConfig;
824
825    #[test]
826    fn test_parse_sdp_payload_types() {
827        let track_id = "test-track".to_string();
828        let cancel_token = CancellationToken::new();
829        let mut track = RtcTrack::new(
830            cancel_token,
831            track_id,
832            TrackConfig::default(),
833            RtcTrackConfig::default(),
834        );
835
836        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
837        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
838        track
839            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
840            .expect("parse offer");
841        assert_eq!(track.get_payload_type(), 8);
842
843        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
844        let mut rtc_config = RtcTrackConfig::default();
845        rtc_config.preferred_codec = Some(CodecType::PCMU);
846        let mut track2 = RtcTrack::new(
847            CancellationToken::new(),
848            "test-track-2".to_string(),
849            TrackConfig::default(),
850            rtc_config,
851        );
852
853        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
854        track2
855            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
856            .expect("parse offer");
857        assert_eq!(track2.get_payload_type(), 0);
858
859        // Case 3: Opus with dynamic payload type 111
860        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
861        track
862            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
863            .expect("parse offer");
864        assert_eq!(track.get_payload_type(), 111);
865    }
866}