active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::StreamExt;
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub preferred_codec: Option<CodecType>,
39    pub codecs: Vec<CodecType>,
40    pub payload_type: Option<u8>,
41}
42
43impl Default for RtcTrackConfig {
44    fn default() -> Self {
45        Self {
46            mode: TransportMode::WebRtc, // Default WebRTC behavior
47            ice_servers: None,
48            external_ip: None,
49            rtp_port_range: None,
50            preferred_codec: None,
51            codecs: Vec::new(),
52            payload_type: None,
53        }
54    }
55}
56
57pub struct RtcTrack {
58    track_id: TrackId,
59    track_config: TrackConfig,
60    rtc_config: RtcTrackConfig,
61    processor_chain: ProcessorChain,
62    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
63    cancel_token: CancellationToken,
64    local_source: Option<Arc<SampleStreamSource>>,
65    encoder: TrackCodec,
66    ssrc: u32,
67    payload_type: u8,
68    pub peer_connection: Option<Arc<PeerConnection>>,
69    next_rtp_timestamp: u32,
70    next_rtp_sequence_number: u16,
71    last_packet_time: Option<Instant>,
72    last_remote_sdp: Option<String>,
73}
74
75impl RtcTrack {
76    pub fn new(
77        cancel_token: CancellationToken,
78        id: TrackId,
79        track_config: TrackConfig,
80        rtc_config: RtcTrackConfig,
81    ) -> Self {
82        let processor_chain = ProcessorChain::new(track_config.samplerate);
83        Self {
84            track_id: id,
85            track_config,
86            rtc_config,
87            processor_chain,
88            packet_sender: Arc::new(Mutex::new(None)),
89            cancel_token,
90            local_source: None,
91            encoder: TrackCodec::new(),
92            ssrc: 0,
93            payload_type: 0,
94            peer_connection: None,
95            next_rtp_timestamp: 0,
96            next_rtp_sequence_number: 0,
97            last_packet_time: None,
98            last_remote_sdp: None,
99        }
100    }
101
102    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
103        self.ssrc = ssrc;
104        self
105    }
106
107    pub fn create_audio_track(
108        _codec: CodecType,
109        _stream_id: Option<String>,
110    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
111        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
112        (Arc::new(source), track)
113    }
114
115    pub async fn local_description(&self) -> Result<String> {
116        let pc = self
117            .peer_connection
118            .as_ref()
119            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
120        let offer = pc.create_offer().await?;
121        pc.set_local_description(offer.clone())?;
122        Ok(offer.to_sdp_string())
123    }
124
125    pub async fn create(&mut self) -> Result<()> {
126        if self.peer_connection.is_some() {
127            return Ok(());
128        }
129
130        let mut config = RtcConfiguration::default();
131        config.transport_mode = self.rtc_config.mode.clone();
132
133        if let Some(ice_servers) = &self.rtc_config.ice_servers {
134            config.ice_servers = ice_servers.clone();
135        }
136
137        if let Some(external_ip) = &self.rtc_config.external_ip {
138            config.external_ip = Some(external_ip.clone());
139        }
140
141        if !self.rtc_config.codecs.is_empty() {
142            let mut caps = MediaCapabilities::default();
143            caps.audio.clear();
144
145            for codec in &self.rtc_config.codecs {
146                let cap = match codec {
147                    CodecType::PCMU => AudioCapability::pcmu(),
148                    CodecType::PCMA => AudioCapability::pcma(),
149                    CodecType::G722 => AudioCapability::g722(),
150                    CodecType::G729 => AudioCapability::g729(),
151                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
152                    #[cfg(feature = "opus")]
153                    CodecType::Opus => AudioCapability::opus(),
154                };
155                caps.audio.push(cap);
156            }
157            config.media_capabilities = Some(caps);
158        }
159
160        let peer_connection = Arc::new(PeerConnection::new(config));
161        self.peer_connection = Some(peer_connection.clone());
162
163        let default_codec = CodecType::G722;
164        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
165
166        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
167        self.local_source = Some(source);
168
169        let payload_type = self
170            .rtc_config
171            .payload_type
172            .unwrap_or_else(|| codec.payload_type());
173
174        self.payload_type = payload_type;
175
176        let params = RtpCodecParameters {
177            clock_rate: codec.clock_rate(),
178            channels: codec.channels() as u8,
179            payload_type,
180            ..Default::default()
181        };
182
183        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
184
185        // Spawn Handler Logic
186        self.spawn_handlers(
187            peer_connection.clone(),
188            self.track_id.clone(),
189            self.processor_chain.clone(),
190            self.payload_type,
191        );
192
193        if self.rtc_config.mode == TransportMode::Rtp {
194            for transceiver in peer_connection.get_transceivers() {
195                if let Some(receiver) = transceiver.receiver() {
196                    let track = receiver.track();
197                    info!(track_id=%self.track_id, "RTP mode: starting receiver track handler");
198                    Self::spawn_track_handler(
199                        track,
200                        self.packet_sender.clone(),
201                        self.track_id.clone(),
202                        self.cancel_token.clone(),
203                        self.processor_chain.clone(),
204                        self.payload_type,
205                    );
206                }
207            }
208        }
209
210        Ok(())
211    }
212
213    fn spawn_handlers(
214        &self,
215        pc: Arc<PeerConnection>,
216        track_id: TrackId,
217        processor_chain: ProcessorChain,
218        default_payload_type: u8,
219    ) {
220        let cancel_token = self.cancel_token.clone();
221        let packet_sender = self.packet_sender.clone();
222        let pc_clone = pc.clone();
223        let track_id_log = track_id.clone();
224        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
225
226        // 1. Event Loop
227        crate::spawn(async move {
228            info!(track_id=%track_id_log, "RtcTrack event loop started");
229            let mut events = futures::stream::unfold(pc_clone.clone(), |pc| async move {
230                pc.recv().await.map(|ev| (ev, pc))
231            })
232            .take_until(cancel_token.cancelled())
233            .boxed();
234
235            let mut event_count = 0;
236            while let Some(event) = events.next().await {
237                event_count += 1;
238                let event_type = match &event {
239                    PeerConnectionEvent::Track(_) => "Track",
240                    PeerConnectionEvent::DataChannel(_) => "DataChannel",
241                };
242                debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
243
244                if let PeerConnectionEvent::Track(transceiver) = event {
245                    if let Some(receiver) = transceiver.receiver() {
246                        let track = receiver.track();
247                        info!(track_id=%track_id_log, "New track received");
248
249                        Self::spawn_track_handler(
250                            track,
251                            packet_sender.clone(),
252                            track_id_log.clone(),
253                            cancel_token.clone(),
254                            processor_chain.clone(),
255                            default_payload_type.clone(),
256                        );
257                    }
258                }
259            }
260            debug!(track_id=%track_id_log, "RtcTrack event loop ended, total events: {}", event_count);
261        });
262
263        // 2. State Monitoring
264        if is_webrtc {
265            let pc_state = pc.clone();
266            let cancel_token_state = self.cancel_token.clone();
267            let mut state_rx = pc_state.subscribe_peer_state();
268            let track_id_state = track_id.clone();
269
270            crate::spawn(async move {
271                while state_rx.changed().await.is_ok() {
272                    let s = *state_rx.borrow();
273                    debug!(track_id=%track_id_state, "peer connection state changed: {:?}", s);
274                    match s {
275                        PeerConnectionState::Disconnected
276                        | PeerConnectionState::Closed
277                        | PeerConnectionState::Failed => {
278                            info!(
279                                track_id = %track_id_state,
280                                "peer connection is {:?}, try to close", s
281                            );
282                            cancel_token_state.cancel();
283                            pc_state.close();
284                            break;
285                        }
286                        _ => {}
287                    }
288                }
289            });
290        }
291    }
292
293    fn spawn_track_handler(
294        track: Arc<SampleStreamTrack>,
295        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
296        track_id: TrackId,
297        cancel_token: CancellationToken,
298        processor_chain: ProcessorChain,
299        default_payload_type: u8,
300    ) {
301        let (tx, mut rx) =
302            tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
303
304        // Processing Worker
305        let track_id_proc = track_id.clone();
306        let packet_sender_proc = packet_sender_arc.clone();
307        let mut processor_chain_proc = processor_chain.clone();
308        let cancel_token_proc = cancel_token.clone();
309        crate::spawn(async move {
310            info!(track_id=%track_id_proc, "RtcTrack processing worker started");
311            while let Some(frame) = rx.recv().await {
312                if cancel_token_proc.is_cancelled() {
313                    break;
314                }
315                Self::process_audio_frame(
316                    frame,
317                    &track_id_proc,
318                    &packet_sender_proc,
319                    &mut processor_chain_proc,
320                    default_payload_type,
321                )
322                .await;
323            }
324            info!(track_id=%track_id_proc, "RtcTrack processing worker stopped");
325        });
326
327        // Receiving Worker
328        crate::spawn(async move {
329            let mut samples =
330                futures::stream::unfold(
331                    track,
332                    |t| async move { t.recv().await.ok().map(|s| (s, t)) },
333                )
334                .take_until(cancel_token.cancelled())
335                .boxed();
336
337            while let Some(sample) = samples.next().await {
338                if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
339                    if let Err(_) = tx.send(frame) {
340                        break;
341                    }
342                }
343            }
344        });
345    }
346
347    async fn process_audio_frame(
348        frame: rustrtc::media::frame::AudioFrame,
349        track_id: &TrackId,
350        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
351        processor_chain: &mut ProcessorChain,
352        default_payload_type: u8,
353    ) {
354        let packet_sender = packet_sender.lock().await;
355        if let Some(sender) = packet_sender.as_ref() {
356            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
357            let src_codec = match CodecType::try_from(payload_type) {
358                Ok(c) => c,
359                Err(_) => {
360                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
361                    return;
362                }
363            };
364
365            let mut af = AudioFrame {
366                track_id: track_id.clone(),
367                samples: crate::media::Samples::RTP {
368                    payload_type,
369                    payload: frame.data.to_vec(),
370                    sequence_number: frame.sequence_number.unwrap_or(0),
371                },
372                timestamp: crate::media::get_timestamp(),
373                sample_rate: src_codec.samplerate(),
374                channels: src_codec.channels(),
375            };
376            if let Err(e) = processor_chain.process_frame(&mut af) {
377                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
378            }
379
380            sender.send(af).ok();
381        }
382    }
383
384    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
385        use crate::media::negotiate::parse_rtpmap;
386        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
387
388        if let Some(media) = sdp
389            .media_sections
390            .iter()
391            .find(|m| m.kind == MediaKind::Audio)
392        {
393            for attr in &media.attributes {
394                if attr.key == "rtpmap" {
395                    if let Some(value) = &attr.value {
396                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
397                            self.encoder.set_payload_type(pt, codec.clone());
398                            self.processor_chain.codec.set_payload_type(pt, codec);
399                        }
400                    }
401                }
402            }
403
404            for fmt in &media.formats {
405                if let Ok(pt) = fmt.parse::<u8>() {
406                    let codec = self
407                        .encoder
408                        .payload_type_map
409                        .get(&pt)
410                        .cloned()
411                        .or_else(|| CodecType::try_from(pt).ok());
412
413                    if let Some(codec) = codec {
414                        if codec != CodecType::TelephoneEvent {
415                            info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
416                            self.payload_type = pt;
417                            break;
418                        }
419                    }
420                }
421            }
422        }
423        Ok(())
424    }
425
426    /// Normalize SDP for comparison by removing session-specific fields
427    fn normalize_sdp(sdp: &str) -> String {
428        sdp.lines()
429            .map(|line| {
430                // For origin line (o=), only keep session id and version for comparison
431                // Format: o=<username> <sess-id> <sess-version> <nettype> <addrtype> <unicast-address>
432                // Session version increment indicates session changes (RFC 4566)
433                if line.starts_with("o=") {
434                    // Extract session id and version (2nd and 3rd fields)
435                    let parts: Vec<&str> = line.split_whitespace().collect();
436                    if parts.len() >= 3 {
437                        return format!("o= {} {}", parts[1], parts[2]);
438                    }
439                }
440                line.to_string()
441            })
442            .filter(|line| {
443                // Filter out lines that can change between identical sessions
444                !line.starts_with("t=") &&  // timing line can vary
445                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
446                !line.starts_with("a=msid:") &&  // media stream ID
447                !line.trim().is_empty()
448            })
449            .collect::<Vec<_>>()
450            .join("\n")
451    }
452
453    async fn update_remote_description_internal(&mut self, answer: &String) -> Result<()> {
454        if let Some(pc) = &self.peer_connection {
455            // Check if SDP is the same as the last one we set
456            if let Some(ref last_sdp) = self.last_remote_sdp {
457                if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
458                    debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
459                    return Ok(());
460                }
461            }
462
463            let is_first_remote_sdp = self.last_remote_sdp.is_none();
464
465            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
466            match pc.set_remote_description(sdp_obj.clone()).await {
467                Ok(_) => {
468                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
469                    self.last_remote_sdp = Some(answer.clone());
470                }
471                Err(e) => {
472                    if self.rtc_config.mode == TransportMode::Rtp {
473                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
474                        // SIP 200 OK often sends a final answer after 183 Session Progress early answer.
475                        // WebRTC PeerConnection state machine requires:
476                        //   - stable -> setLocalDescription(offer) -> have-local-offer -> setRemoteDescription(answer) -> stable
477                        // After 183 with SDP, we're in stable state. We can't accept another answer (200 OK).
478                        // Solution: Create a new offer to transition to have-local-offer, then accept the answer.
479                        let offer = pc.create_offer().await?;
480                        pc.set_local_description(offer)?;
481                        pc.set_remote_description(sdp_obj).await?;
482                        self.last_remote_sdp = Some(answer.clone());
483                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
484                    } else {
485                        return Err(e.into());
486                    }
487                }
488            }
489
490            // CRITICAL FIX: When server initiates offer (common in SIP->WebRTC),
491            // Track events only fire when processing remote *offers*, not answers.
492            // Since we send offer and receive answer, we must manually start receiver tracks.
493            // Only do this on the first remote SDP (not on re-invites) and for WebRTC mode.
494            if is_first_remote_sdp && self.rtc_config.mode != TransportMode::Rtp {
495                for transceiver in pc.get_transceivers() {
496                    if let Some(receiver) = transceiver.receiver() {
497                        let track = receiver.track();
498                        info!(track_id=%self.track_id, "WebRTC mode: manually starting receiver track handler after first answer");
499                        Self::spawn_track_handler(
500                            track,
501                            self.packet_sender.clone(),
502                            self.track_id.clone(),
503                            self.cancel_token.clone(),
504                            self.processor_chain.clone(),
505                            self.payload_type,
506                        );
507                    }
508                }
509            }
510
511            // Extract negotiated payload types from SDP string
512            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
513        }
514        Ok(())
515    }
516}
517
518#[async_trait]
519impl Track for RtcTrack {
520    fn ssrc(&self) -> u32 {
521        self.ssrc
522    }
523    fn id(&self) -> &TrackId {
524        &self.track_id
525    }
526    fn config(&self) -> &TrackConfig {
527        &self.track_config
528    }
529    fn processor_chain(&mut self) -> &mut ProcessorChain {
530        &mut self.processor_chain
531    }
532
533    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
534        info!(track_id=%self.track_id, "rtc handshake start");
535        self.create().await?;
536
537        let pc = self.peer_connection.clone().ok_or_else(|| {
538            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
539        })?;
540
541        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
542        for (i, t) in pc.get_transceivers().iter().enumerate() {
543            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}", 
544                i, t.kind(), t.mid(), t.direction());
545        }
546
547        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
548        pc.set_remote_description(sdp).await?;
549
550        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
551        for (i, t) in pc.get_transceivers().iter().enumerate() {
552            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}", 
553                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
554        }
555
556        // CRITICAL FIX: When server-initiated signaling (common WebRTC pattern),
557        // Track events fire when remote peer sends offer, not when we accept answer.
558        // Since we received remote offer here and added local track first,
559        // we must manually start receiver tracks as Track events won't fire.
560        if self.rtc_config.mode != TransportMode::Rtp {
561            for transceiver in pc.get_transceivers() {
562                if let Some(receiver) = transceiver.receiver() {
563                    let track = receiver.track();
564                    info!(track_id=%self.track_id, "WebRTC handshake: manually starting receiver track handler for browser audio");
565                    Self::spawn_track_handler(
566                        track,
567                        self.packet_sender.clone(),
568                        self.track_id.clone(),
569                        self.cancel_token.clone(),
570                        self.processor_chain.clone(),
571                        self.payload_type,
572                    );
573                }
574            }
575        }
576
577        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
578
579        let answer = pc.create_answer().await?;
580        pc.set_local_description(answer.clone())?;
581
582        if self.rtc_config.mode != TransportMode::Rtp {
583            pc.wait_for_gathering_complete().await;
584        }
585
586        let final_answer = pc
587            .local_description()
588            .ok_or(anyhow::anyhow!("No local description"))?;
589
590        Ok(final_answer.to_sdp_string())
591    }
592
593    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
594        self.update_remote_description_internal(answer).await
595    }
596
597    async fn start(
598        &mut self,
599        event_sender: EventSender,
600        packet_sender: TrackPacketSender,
601    ) -> Result<()> {
602        *self.packet_sender.lock().await = Some(packet_sender.clone());
603        let token_clone = self.cancel_token.clone();
604        let event_sender_clone = event_sender.clone();
605        let track_id = self.track_id.clone();
606        let ssrc = self.ssrc;
607
608        if self.rtc_config.mode != TransportMode::Rtp {
609            let start_time = crate::media::get_timestamp();
610            crate::spawn(async move {
611                token_clone.cancelled().await;
612                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
613                    track_id,
614                    timestamp: crate::media::get_timestamp(),
615                    duration: crate::media::get_timestamp() - start_time,
616                    ssrc,
617                    play_id: None,
618                });
619            });
620        }
621
622        Ok(())
623    }
624
625    async fn stop(&self) -> Result<()> {
626        self.cancel_token.cancel();
627        if let Some(pc) = &self.peer_connection {
628            pc.close();
629        }
630        Ok(())
631    }
632
633    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
634        let packet = packet.clone();
635
636        if let Some(source) = &self.local_source {
637            match &packet.samples {
638                crate::media::Samples::PCM { samples } => {
639                    let payload_type = self.get_payload_type();
640                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
641                    let target_codec = CodecType::try_from(payload_type)?;
642                    if !encoded.is_empty() {
643                        let clock_rate = target_codec.clock_rate();
644
645                        let now = Instant::now();
646                        if let Some(last_time) = self.last_packet_time {
647                            let elapsed = now.duration_since(last_time);
648                            if elapsed.as_millis() > 50 {
649                                let gap_increment =
650                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
651                                self.next_rtp_timestamp += gap_increment;
652                            }
653                        }
654
655                        self.last_packet_time = Some(now);
656
657                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
658                            / packet.sample_rate as u64
659                            / self.track_config.channels as u64)
660                            as u32;
661                        let rtp_timestamp = self.next_rtp_timestamp;
662                        self.next_rtp_timestamp += timestamp_increment;
663                        let sequence_number = self.next_rtp_sequence_number;
664                        self.next_rtp_sequence_number += 1;
665
666                        let frame = RtcAudioFrame {
667                            data: Bytes::from(encoded),
668                            clock_rate,
669                            payload_type: Some(payload_type),
670                            sequence_number: Some(sequence_number),
671                            rtp_timestamp,
672                        };
673                        source.send_audio(frame).await.ok();
674                    }
675                }
676                crate::media::Samples::RTP {
677                    payload,
678                    payload_type,
679                    sequence_number,
680                } => {
681                    let clock_rate = match *payload_type {
682                        0 | 8 | 9 | 18 => 8000,
683                        111 => 48000,
684                        _ => packet.sample_rate,
685                    };
686
687                    let now = Instant::now();
688                    if let Some(last_time) = self.last_packet_time {
689                        let elapsed = now.duration_since(last_time);
690                        if elapsed.as_millis() > 50 {
691                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
692                            self.next_rtp_timestamp += gap_increment;
693                        }
694                    }
695                    self.last_packet_time = Some(now);
696
697                    let increment = match *payload_type {
698                        0 | 8 | 18 => payload.len() as u32,
699                        9 => payload.len() as u32,
700                        111 => (clock_rate / 50) as u32,
701                        _ => (clock_rate / 50) as u32,
702                    };
703
704                    let rtp_timestamp = self.next_rtp_timestamp;
705                    self.next_rtp_timestamp += increment;
706                    let sequence_number = *sequence_number;
707
708                    let frame = RtcAudioFrame {
709                        data: Bytes::from(payload.clone()),
710                        clock_rate,
711                        payload_type: Some(*payload_type),
712                        sequence_number: Some(sequence_number),
713                        rtp_timestamp,
714                    };
715                    source.send_audio(frame).await.ok();
716                }
717                _ => {}
718            }
719        }
720        Ok(())
721    }
722}
723
724impl RtcTrack {
725    fn get_payload_type(&self) -> u8 {
726        let pt = self.payload_type;
727        if pt != 0 {
728            return pt;
729        }
730
731        self.rtc_config.payload_type.unwrap_or_else(|| {
732            match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
733                CodecType::PCMU => 0,
734                CodecType::PCMA => 8,
735                CodecType::Opus => 111,
736                CodecType::G722 => 9,
737                _ => 111,
738            }
739        })
740    }
741}
742
743#[cfg(test)]
744mod tests {
745    use super::*;
746    use crate::media::track::TrackConfig;
747
748    #[test]
749    fn test_parse_sdp_payload_types() {
750        let track_id = "test-track".to_string();
751        let cancel_token = CancellationToken::new();
752        let mut track = RtcTrack::new(
753            cancel_token,
754            track_id,
755            TrackConfig::default(),
756            RtcTrackConfig::default(),
757        );
758
759        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
760        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
761        track
762            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
763            .expect("parse offer");
764        assert_eq!(track.get_payload_type(), 8);
765
766        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
767        let mut rtc_config = RtcTrackConfig::default();
768        rtc_config.preferred_codec = Some(CodecType::PCMU);
769        let mut track2 = RtcTrack::new(
770            CancellationToken::new(),
771            "test-track-2".to_string(),
772            TrackConfig::default(),
773            rtc_config,
774        );
775
776        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
777        track2
778            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
779            .expect("parse offer");
780        assert_eq!(track2.get_payload_type(), 0);
781
782        // Case 3: Opus with dynamic payload type 111
783        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
784        track
785            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
786            .expect("parse offer");
787        assert_eq!(track.get_payload_type(), 111);
788    }
789}