active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::StreamExt;
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub preferred_codec: Option<CodecType>,
39    pub codecs: Vec<CodecType>,
40    pub payload_type: Option<u8>,
41    pub enable_latching: Option<bool>,
42}
43
44impl Default for RtcTrackConfig {
45    fn default() -> Self {
46        Self {
47            mode: TransportMode::WebRtc, // Default WebRTC behavior
48            ice_servers: None,
49            external_ip: None,
50            rtp_port_range: None,
51            preferred_codec: None,
52            codecs: Vec::new(),
53            payload_type: None,
54            enable_latching: None,
55        }
56    }
57}
58
59pub struct RtcTrack {
60    track_id: TrackId,
61    track_config: TrackConfig,
62    rtc_config: RtcTrackConfig,
63    processor_chain: ProcessorChain,
64    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
65    cancel_token: CancellationToken,
66    local_source: Option<Arc<SampleStreamSource>>,
67    encoder: TrackCodec,
68    ssrc: u32,
69    payload_type: Option<u8>,
70    pub peer_connection: Option<Arc<PeerConnection>>,
71    next_rtp_timestamp: u32,
72    next_rtp_sequence_number: u16,
73    last_packet_time: Option<Instant>,
74    last_remote_sdp: Option<String>,
75    need_marker: bool,
76}
77
78impl RtcTrack {
79    pub fn new(
80        cancel_token: CancellationToken,
81        id: TrackId,
82        track_config: TrackConfig,
83        rtc_config: RtcTrackConfig,
84    ) -> Self {
85        let processor_chain = ProcessorChain::new(track_config.samplerate);
86        Self {
87            track_id: id,
88            track_config,
89            rtc_config,
90            processor_chain,
91            packet_sender: Arc::new(Mutex::new(None)),
92            cancel_token,
93            local_source: None,
94            encoder: TrackCodec::new(),
95            ssrc: 0,
96            payload_type: None,
97            peer_connection: None,
98            next_rtp_timestamp: 0,
99            next_rtp_sequence_number: 0,
100            last_packet_time: None,
101            last_remote_sdp: None,
102            need_marker: false,
103        }
104    }
105
106    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
107        self.ssrc = ssrc;
108        self
109    }
110
111    pub fn create_audio_track(
112        _codec: CodecType,
113        _stream_id: Option<String>,
114    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
115        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
116        (Arc::new(source), track)
117    }
118
119    pub async fn local_description(&self) -> Result<String> {
120        let pc = self
121            .peer_connection
122            .as_ref()
123            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
124        let offer = pc.create_offer().await?;
125        pc.set_local_description(offer.clone())?;
126        Ok(offer.to_sdp_string())
127    }
128
129    pub async fn create(&mut self) -> Result<()> {
130        if self.peer_connection.is_some() {
131            return Ok(());
132        }
133
134        let mut config = RtcConfiguration::default();
135        if self.ssrc != 0 {
136            config.ssrc_start = self.ssrc;
137        }
138        config.transport_mode = self.rtc_config.mode.clone();
139        config.enable_latching = self
140            .rtc_config
141            .enable_latching
142            .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
143
144        if let Some(ice_servers) = &self.rtc_config.ice_servers {
145            config.ice_servers = ice_servers.clone();
146        }
147
148        if let Some(external_ip) = &self.rtc_config.external_ip {
149            config.external_ip = Some(external_ip.clone());
150        }
151
152        if !self.rtc_config.codecs.is_empty() {
153            let mut caps = MediaCapabilities::default();
154            caps.audio.clear();
155
156            for codec in &self.rtc_config.codecs {
157                let cap = match codec {
158                    CodecType::PCMU => AudioCapability::pcmu(),
159                    CodecType::PCMA => AudioCapability::pcma(),
160                    CodecType::G722 => AudioCapability::g722(),
161                    CodecType::G729 => AudioCapability::g729(),
162                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
163                    #[cfg(feature = "opus")]
164                    CodecType::Opus => AudioCapability::opus(),
165                };
166                caps.audio.push(cap);
167            }
168            config.media_capabilities = Some(caps);
169        }
170
171        let peer_connection = Arc::new(PeerConnection::new(config));
172        self.peer_connection = Some(peer_connection.clone());
173
174        let default_codec = CodecType::G722;
175        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
176
177        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
178        self.local_source = Some(source);
179
180        let payload_type = self
181            .rtc_config
182            .payload_type
183            .unwrap_or_else(|| codec.payload_type());
184
185        self.payload_type = Some(payload_type);
186
187        let params = RtpCodecParameters {
188            clock_rate: codec.clock_rate(),
189            channels: codec.channels() as u8,
190            payload_type,
191            ..Default::default()
192        };
193
194        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
195
196        // Spawn Handler Logic
197        self.spawn_handlers(
198            peer_connection.clone(),
199            self.track_id.clone(),
200            self.processor_chain.clone(),
201            payload_type,
202        );
203
204        if self.rtc_config.mode == TransportMode::Rtp {
205            for transceiver in peer_connection.get_transceivers() {
206                if let Some(receiver) = transceiver.receiver() {
207                    let track = receiver.track();
208                    info!(track_id=%self.track_id, "RTP mode: starting receiver track handler");
209                    Self::spawn_track_handler(
210                        track,
211                        self.packet_sender.clone(),
212                        self.track_id.clone(),
213                        self.cancel_token.clone(),
214                        self.processor_chain.clone(),
215                        self.get_payload_type(),
216                    );
217                }
218            }
219        }
220
221        Ok(())
222    }
223
224    fn spawn_handlers(
225        &self,
226        pc: Arc<PeerConnection>,
227        track_id: TrackId,
228        processor_chain: ProcessorChain,
229        default_payload_type: u8,
230    ) {
231        let cancel_token = self.cancel_token.clone();
232        let packet_sender = self.packet_sender.clone();
233        let pc_clone = pc.clone();
234        let track_id_log = track_id.clone();
235        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
236
237        // 1. Event Loop
238        crate::spawn(async move {
239            info!(track_id=%track_id_log, "RtcTrack event loop started");
240            let mut events = futures::stream::unfold(pc_clone.clone(), |pc| async move {
241                pc.recv().await.map(|ev| (ev, pc))
242            })
243            .take_until(cancel_token.cancelled())
244            .boxed();
245
246            let mut event_count = 0;
247            while let Some(event) = events.next().await {
248                event_count += 1;
249                let event_type = match &event {
250                    PeerConnectionEvent::Track(_) => "Track",
251                    PeerConnectionEvent::DataChannel(_) => "DataChannel",
252                };
253                debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
254
255                if let PeerConnectionEvent::Track(transceiver) = event {
256                    if let Some(receiver) = transceiver.receiver() {
257                        let track = receiver.track();
258                        info!(track_id=%track_id_log, "New track received");
259
260                        Self::spawn_track_handler(
261                            track,
262                            packet_sender.clone(),
263                            track_id_log.clone(),
264                            cancel_token.clone(),
265                            processor_chain.clone(),
266                            default_payload_type.clone(),
267                        );
268                    }
269                }
270            }
271            debug!(track_id=%track_id_log, "RtcTrack event loop ended, total events: {}", event_count);
272        });
273
274        // 2. State Monitoring
275        if is_webrtc {
276            let pc_state = pc.clone();
277            let cancel_token_state = self.cancel_token.clone();
278            let mut state_rx = pc_state.subscribe_peer_state();
279            let track_id_state = track_id.clone();
280
281            crate::spawn(async move {
282                while state_rx.changed().await.is_ok() {
283                    let s = *state_rx.borrow();
284                    debug!(track_id=%track_id_state, "peer connection state changed: {:?}", s);
285                    match s {
286                        PeerConnectionState::Disconnected
287                        | PeerConnectionState::Closed
288                        | PeerConnectionState::Failed => {
289                            info!(
290                                track_id = %track_id_state,
291                                "peer connection is {:?}, try to close", s
292                            );
293                            cancel_token_state.cancel();
294                            pc_state.close();
295                            break;
296                        }
297                        _ => {}
298                    }
299                }
300            });
301        }
302    }
303
304    fn spawn_track_handler(
305        track: Arc<SampleStreamTrack>,
306        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
307        track_id: TrackId,
308        cancel_token: CancellationToken,
309        processor_chain: ProcessorChain,
310        default_payload_type: u8,
311    ) {
312        let (tx, mut rx) =
313            tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
314
315        // Processing Worker
316        let track_id_proc = track_id.clone();
317        let packet_sender_proc = packet_sender_arc.clone();
318        let mut processor_chain_proc = processor_chain.clone();
319        let cancel_token_proc = cancel_token.clone();
320        crate::spawn(async move {
321            info!(track_id=%track_id_proc, "RtcTrack processing worker started");
322            while let Some(frame) = rx.recv().await {
323                if cancel_token_proc.is_cancelled() {
324                    break;
325                }
326                Self::process_audio_frame(
327                    frame,
328                    &track_id_proc,
329                    &packet_sender_proc,
330                    &mut processor_chain_proc,
331                    default_payload_type,
332                )
333                .await;
334            }
335            info!(track_id=%track_id_proc, "RtcTrack processing worker stopped");
336        });
337
338        // Receiving Worker
339        crate::spawn(async move {
340            let mut samples =
341                futures::stream::unfold(
342                    track,
343                    |t| async move { t.recv().await.ok().map(|s| (s, t)) },
344                )
345                .take_until(cancel_token.cancelled())
346                .boxed();
347
348            while let Some(sample) = samples.next().await {
349                if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
350                    if let Err(_) = tx.send(frame) {
351                        break;
352                    }
353                }
354            }
355        });
356    }
357
358    async fn process_audio_frame(
359        frame: rustrtc::media::frame::AudioFrame,
360        track_id: &TrackId,
361        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
362        processor_chain: &mut ProcessorChain,
363        default_payload_type: u8,
364    ) {
365        let packet_sender = packet_sender.lock().await;
366        if let Some(sender) = packet_sender.as_ref() {
367            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
368            let src_codec = match CodecType::try_from(payload_type) {
369                Ok(c) => c,
370                Err(_) => {
371                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
372                    return;
373                }
374            };
375
376            let mut af = AudioFrame {
377                track_id: track_id.clone(),
378                samples: crate::media::Samples::RTP {
379                    payload_type,
380                    payload: frame.data.to_vec(),
381                    sequence_number: frame.sequence_number.unwrap_or(0),
382                },
383                timestamp: crate::media::get_timestamp(),
384                sample_rate: src_codec.samplerate(),
385                channels: src_codec.channels(),
386            };
387            if let Err(e) = processor_chain.process_frame(&mut af) {
388                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
389            }
390
391            sender.send(af).ok();
392        }
393    }
394
395    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
396        use crate::media::negotiate::parse_rtpmap;
397        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
398
399        if let Some(media) = sdp
400            .media_sections
401            .iter()
402            .find(|m| m.kind == MediaKind::Audio)
403        {
404            for attr in &media.attributes {
405                if attr.key == "rtpmap" {
406                    if let Some(value) = &attr.value {
407                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
408                            self.encoder.set_payload_type(pt, codec.clone());
409                            self.processor_chain.codec.set_payload_type(pt, codec);
410                        }
411                    }
412                }
413            }
414
415            // Negotiate primary audio codec
416            let mut negotiated = None;
417
418            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
419            // that is also present in the answer.
420            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
421                for preferred_codec in &self.rtc_config.codecs {
422                    if *preferred_codec == CodecType::TelephoneEvent {
423                        continue;
424                    }
425                    for fmt in &media.formats {
426                        if let Ok(pt) = fmt.parse::<u8>() {
427                            let codec = self
428                                .encoder
429                                .payload_type_map
430                                .get(&pt)
431                                .cloned()
432                                .or_else(|| CodecType::try_from(pt).ok());
433                            if let Some(c) = codec {
434                                if c == *preferred_codec {
435                                    negotiated = Some((pt, c));
436                                    break;
437                                }
438                            }
439                        }
440                    }
441                    if negotiated.is_some() {
442                        break;
443                    }
444                }
445            }
446
447            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
448            if negotiated.is_none() {
449                for fmt in &media.formats {
450                    if let Ok(pt) = fmt.parse::<u8>() {
451                        let codec = self
452                            .encoder
453                            .payload_type_map
454                            .get(&pt)
455                            .cloned()
456                            .or_else(|| CodecType::try_from(pt).ok());
457
458                        if let Some(codec) = codec {
459                            if codec != CodecType::TelephoneEvent {
460                                negotiated = Some((pt, codec));
461                                break;
462                            }
463                        }
464                    }
465                }
466            }
467
468            if let Some((pt, codec)) = negotiated {
469                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
470                self.payload_type = Some(pt);
471            }
472        }
473        Ok(())
474    }
475
476    fn normalize_sdp(sdp: &str) -> String {
477        sdp.lines()
478            .map(|line| {
479                if line.starts_with("o=") {
480                    let parts: Vec<&str> = line.split_whitespace().collect();
481                    if parts.len() >= 3 {
482                        return format!("o= {} {}", parts[1], parts[2]);
483                    }
484                }
485                line.to_string()
486            })
487            .filter(|line| {
488                !line.starts_with("t=") &&  // timing line can vary
489                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
490                !line.starts_with("a=msid:") &&  // media stream ID
491                !line.trim().is_empty()
492            })
493            .collect::<Vec<_>>()
494            .join("\n")
495    }
496
497    async fn update_remote_description_internal(
498        &mut self,
499        answer: &String,
500        force_update: bool,
501    ) -> Result<()> {
502        if let Some(pc) = &self.peer_connection {
503            if !force_update {
504                if let Some(ref last_sdp) = self.last_remote_sdp {
505                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
506                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
507                        return Ok(());
508                    }
509                }
510            } else {
511                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
512            }
513
514            let is_first_remote_sdp = self.last_remote_sdp.is_none();
515
516            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
517            match pc.set_remote_description(sdp_obj.clone()).await {
518                Ok(_) => {
519                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
520                    self.last_remote_sdp = Some(answer.clone());
521                }
522                Err(e) => {
523                    if self.rtc_config.mode == TransportMode::Rtp {
524                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
525
526                        if let Some(current_local) = pc.local_description() {
527                            let sdp = current_local.to_sdp_string();
528                            for line in sdp.lines() {
529                                if line.starts_with("a=ssrc:") {
530                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
531                                }
532                            }
533                        }
534
535                        let offer = pc.create_offer().await?;
536
537                        let sdp = offer.to_sdp_string();
538                        for line in sdp.lines() {
539                            if line.starts_with("a=ssrc:") {
540                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
541                            }
542                        }
543
544                        pc.set_local_description(offer)?;
545                        pc.set_remote_description(sdp_obj).await?;
546                        self.last_remote_sdp = Some(answer.clone());
547                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
548                    } else {
549                        return Err(e.into());
550                    }
551                }
552            }
553
554            if is_first_remote_sdp && self.rtc_config.mode != TransportMode::Rtp {
555                for transceiver in pc.get_transceivers() {
556                    if let Some(receiver) = transceiver.receiver() {
557                        let track = receiver.track();
558                        info!(track_id=%self.track_id, "WebRTC mode: manually starting receiver track handler after first answer");
559                        Self::spawn_track_handler(
560                            track,
561                            self.packet_sender.clone(),
562                            self.track_id.clone(),
563                            self.cancel_token.clone(),
564                            self.processor_chain.clone(),
565                            self.get_payload_type(),
566                        );
567                    }
568                }
569            }
570
571            // Extract negotiated payload types from SDP string
572            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
573        }
574        Ok(())
575    }
576}
577
578#[async_trait]
579impl Track for RtcTrack {
580    fn ssrc(&self) -> u32 {
581        self.ssrc
582    }
583    fn id(&self) -> &TrackId {
584        &self.track_id
585    }
586    fn config(&self) -> &TrackConfig {
587        &self.track_config
588    }
589    fn processor_chain(&mut self) -> &mut ProcessorChain {
590        &mut self.processor_chain
591    }
592
593    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
594        info!(track_id=%self.track_id, "rtc handshake start");
595        self.create().await?;
596
597        let pc = self.peer_connection.clone().ok_or_else(|| {
598            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
599        })?;
600
601        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
602        for (i, t) in pc.get_transceivers().iter().enumerate() {
603            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}", 
604                i, t.kind(), t.mid(), t.direction());
605        }
606
607        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
608        pc.set_remote_description(sdp.clone()).await?;
609
610        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
611        for (i, t) in pc.get_transceivers().iter().enumerate() {
612            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}", 
613                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
614        }
615
616        // CRITICAL FIX: When server-initiated signaling (common WebRTC pattern),
617        // Track events fire when remote peer sends offer, not when we accept answer.
618        // Since we received remote offer here and added local track first,
619        // we must manually start receiver tracks as Track events won't fire.
620        if self.rtc_config.mode != TransportMode::Rtp {
621            for transceiver in pc.get_transceivers() {
622                if let Some(receiver) = transceiver.receiver() {
623                    let track = receiver.track();
624                    info!(track_id=%self.track_id, "WebRTC handshake: manually starting receiver track handler for browser audio");
625                    Self::spawn_track_handler(
626                        track,
627                        self.packet_sender.clone(),
628                        self.track_id.clone(),
629                        self.cancel_token.clone(),
630                        self.processor_chain.clone(),
631                        self.get_payload_type(),
632                    );
633                }
634            }
635        }
636
637        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
638
639        let mut answer = pc.create_answer().await?;
640        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
641
642        pc.set_local_description(answer.clone())?;
643
644        if self.rtc_config.mode != TransportMode::Rtp {
645            pc.wait_for_gathering_complete().await;
646        }
647
648        let final_answer = pc
649            .local_description()
650            .ok_or(anyhow::anyhow!("No local description"))?;
651
652        Ok(final_answer.to_sdp_string())
653    }
654
655    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
656        self.update_remote_description_internal(answer, false).await
657    }
658
659    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
660        self.update_remote_description_internal(answer, true).await
661    }
662
663    async fn start(
664        &mut self,
665        event_sender: EventSender,
666        packet_sender: TrackPacketSender,
667    ) -> Result<()> {
668        *self.packet_sender.lock().await = Some(packet_sender.clone());
669        let token_clone = self.cancel_token.clone();
670        let event_sender_clone = event_sender.clone();
671        let track_id = self.track_id.clone();
672        let ssrc = self.ssrc;
673
674        if self.rtc_config.mode != TransportMode::Rtp {
675            let start_time = crate::media::get_timestamp();
676            crate::spawn(async move {
677                token_clone.cancelled().await;
678                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
679                    track_id,
680                    timestamp: crate::media::get_timestamp(),
681                    duration: crate::media::get_timestamp() - start_time,
682                    ssrc,
683                    play_id: None,
684                });
685            });
686        }
687
688        Ok(())
689    }
690
691    async fn stop(&self) -> Result<()> {
692        self.cancel_token.cancel();
693        if let Some(pc) = &self.peer_connection {
694            pc.close();
695        }
696        Ok(())
697    }
698
699    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
700        let packet = packet.clone();
701
702        if let Some(source) = &self.local_source {
703            match &packet.samples {
704                crate::media::Samples::PCM { samples } => {
705                    let payload_type = self.get_payload_type();
706                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
707                    let target_codec = CodecType::try_from(payload_type)?;
708                    if !encoded.is_empty() {
709                        let clock_rate = target_codec.clock_rate();
710
711                        let now = Instant::now();
712                        if let Some(last_time) = self.last_packet_time {
713                            let elapsed = now.duration_since(last_time);
714                            if elapsed.as_millis() > 50 {
715                                let gap_increment =
716                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
717                                self.next_rtp_timestamp += gap_increment;
718                                self.need_marker = true;
719                            }
720                        }
721
722                        self.last_packet_time = Some(now);
723
724                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
725                            / packet.sample_rate as u64
726                            / self.track_config.channels as u64)
727                            as u32;
728                        let rtp_timestamp = self.next_rtp_timestamp;
729                        self.next_rtp_timestamp += timestamp_increment;
730                        let sequence_number = self.next_rtp_sequence_number;
731                        self.next_rtp_sequence_number += 1;
732
733                        let mut marker = false;
734                        if self.need_marker {
735                            marker = true;
736                            self.need_marker = false;
737                        }
738
739                        let frame = RtcAudioFrame {
740                            data: Bytes::from(encoded),
741                            clock_rate,
742                            payload_type: Some(payload_type),
743                            sequence_number: Some(sequence_number),
744                            rtp_timestamp,
745                            marker,
746                            ..Default::default()
747                        };
748                        source.send_audio(frame).await.ok();
749                    }
750                }
751                crate::media::Samples::RTP {
752                    payload,
753                    payload_type,
754                    sequence_number,
755                } => {
756                    let clock_rate = match *payload_type {
757                        0 | 8 | 9 | 18 => 8000,
758                        111 => 48000,
759                        _ => packet.sample_rate,
760                    };
761
762                    let now = Instant::now();
763                    if let Some(last_time) = self.last_packet_time {
764                        let elapsed = now.duration_since(last_time);
765                        if elapsed.as_millis() > 50 {
766                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
767                            self.next_rtp_timestamp += gap_increment;
768                            self.need_marker = true;
769                        }
770                    }
771                    self.last_packet_time = Some(now);
772
773                    let increment = match *payload_type {
774                        0 | 8 | 18 => payload.len() as u32,
775                        9 => payload.len() as u32,
776                        111 => (clock_rate / 50) as u32,
777                        _ => (clock_rate / 50) as u32,
778                    };
779
780                    let rtp_timestamp = self.next_rtp_timestamp;
781                    self.next_rtp_timestamp += increment;
782                    let sequence_number = *sequence_number;
783
784                    let mut marker = false;
785                    if self.need_marker {
786                        marker = true;
787                        self.need_marker = false;
788                    }
789
790                    let frame = RtcAudioFrame {
791                        data: Bytes::from(payload.clone()),
792                        clock_rate,
793                        payload_type: Some(*payload_type),
794                        sequence_number: Some(sequence_number),
795                        rtp_timestamp,
796                        marker,
797                        ..Default::default()
798                    };
799                    source.send_audio(frame).await.ok();
800                }
801                _ => {}
802            }
803        }
804        Ok(())
805    }
806}
807
808impl RtcTrack {
809    fn get_payload_type(&self) -> u8 {
810        if let Some(pt) = self.payload_type {
811            return pt;
812        }
813
814        self.rtc_config.payload_type.unwrap_or_else(|| {
815            match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
816                CodecType::PCMU => 0,
817                CodecType::PCMA => 8,
818                CodecType::Opus => 111,
819                CodecType::G722 => 9,
820                _ => 111,
821            }
822        })
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829    use crate::media::track::TrackConfig;
830
831    #[test]
832    fn test_parse_sdp_payload_types() {
833        let track_id = "test-track".to_string();
834        let cancel_token = CancellationToken::new();
835        let mut track = RtcTrack::new(
836            cancel_token,
837            track_id,
838            TrackConfig::default(),
839            RtcTrackConfig::default(),
840        );
841
842        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
843        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
844        track
845            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
846            .expect("parse offer");
847        assert_eq!(track.get_payload_type(), 8);
848
849        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
850        let mut rtc_config = RtcTrackConfig::default();
851        rtc_config.preferred_codec = Some(CodecType::PCMU);
852        let mut track2 = RtcTrack::new(
853            CancellationToken::new(),
854            "test-track-2".to_string(),
855            TrackConfig::default(),
856            rtc_config,
857        );
858
859        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
860        track2
861            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
862            .expect("parse offer");
863        assert_eq!(track2.get_payload_type(), 0);
864
865        // Case 3: Opus with dynamic payload type 111
866        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
867        track
868            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
869            .expect("parse offer");
870        assert_eq!(track.get_payload_type(), 111);
871    }
872}