Skip to main content

active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub bind_ip: Option<String>,
39    pub preferred_codec: Option<CodecType>,
40    pub codecs: Vec<CodecType>,
41    pub payload_type: Option<u8>,
42    pub enable_latching: Option<bool>,
43}
44
45impl Default for RtcTrackConfig {
46    fn default() -> Self {
47        Self {
48            mode: TransportMode::WebRtc, // Default WebRTC behavior
49            ice_servers: None,
50            external_ip: None,
51            rtp_port_range: None,
52            bind_ip: None,
53            preferred_codec: None,
54            codecs: Vec::new(),
55            payload_type: None,
56            enable_latching: None,
57        }
58    }
59}
60
61pub struct RtcTrack {
62    track_id: TrackId,
63    track_config: TrackConfig,
64    rtc_config: RtcTrackConfig,
65    processor_chain: ProcessorChain,
66    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
67    cancel_token: CancellationToken,
68    local_source: Option<Arc<SampleStreamSource>>,
69    encoder: TrackCodec,
70    ssrc: u32,
71    payload_type: Option<u8>,
72    pub peer_connection: Option<Arc<PeerConnection>>,
73    next_rtp_timestamp: u32,
74    next_rtp_sequence_number: u16,
75    last_packet_time: Option<Instant>,
76    last_remote_sdp: Option<String>,
77    need_marker: bool,
78}
79
80impl RtcTrack {
81    pub fn new(
82        cancel_token: CancellationToken,
83        id: TrackId,
84        track_config: TrackConfig,
85        rtc_config: RtcTrackConfig,
86    ) -> Self {
87        let processor_chain = ProcessorChain::new(track_config.samplerate);
88        Self {
89            track_id: id,
90            track_config,
91            rtc_config,
92            processor_chain,
93            packet_sender: Arc::new(Mutex::new(None)),
94            cancel_token,
95            local_source: None,
96            encoder: TrackCodec::new(),
97            ssrc: 0,
98            payload_type: None,
99            peer_connection: None,
100            next_rtp_timestamp: 0,
101            next_rtp_sequence_number: 0,
102            last_packet_time: None,
103            last_remote_sdp: None,
104            need_marker: false,
105        }
106    }
107
108    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
109        self.ssrc = ssrc;
110        self
111    }
112
113    pub fn create_audio_track(
114        _codec: CodecType,
115        _stream_id: Option<String>,
116    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
117        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
118        (Arc::new(source), track)
119    }
120
121    pub async fn local_description(&self) -> Result<String> {
122        let pc = self
123            .peer_connection
124            .as_ref()
125            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
126        let offer = pc.create_offer().await?;
127        pc.set_local_description(offer.clone())?;
128        Ok(offer.to_sdp_string())
129    }
130
131    pub async fn create(&mut self) -> Result<()> {
132        if self.peer_connection.is_some() {
133            return Ok(());
134        }
135
136        let mut config = RtcConfiguration::default();
137        if self.ssrc != 0 {
138            config.ssrc_start = self.ssrc;
139        }
140        config.transport_mode = self.rtc_config.mode.clone();
141
142        if let Some(ice_servers) = &self.rtc_config.ice_servers {
143            config.ice_servers = ice_servers.clone();
144        }
145
146        if let Some(external_ip) = &self.rtc_config.external_ip {
147            config.external_ip = Some(external_ip.clone());
148        }
149        if let Some(bind_ip) = &self.rtc_config.bind_ip {
150            config.bind_ip = Some(bind_ip.clone());
151        }
152        if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
153            config.rtp_start_port = Some(rtp_start_port);
154            config.rtp_end_port = Some(rtp_end_port);
155        }
156
157        config.enable_latching = self
158            .rtc_config
159            .enable_latching
160            .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
161
162        if !self.rtc_config.codecs.is_empty() {
163            let mut caps = MediaCapabilities::default();
164            caps.audio.clear();
165
166            for codec in &self.rtc_config.codecs {
167                let cap = match codec {
168                    CodecType::PCMU => AudioCapability::pcmu(),
169                    CodecType::PCMA => AudioCapability::pcma(),
170                    CodecType::G722 => AudioCapability::g722(),
171                    CodecType::G729 => AudioCapability::g729(),
172                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
173                    #[cfg(feature = "opus")]
174                    CodecType::Opus => AudioCapability::opus(),
175                };
176                caps.audio.push(cap);
177            }
178            config.media_capabilities = Some(caps);
179        }
180
181        let peer_connection = Arc::new(PeerConnection::new(config));
182        self.peer_connection = Some(peer_connection.clone());
183
184        let default_codec = CodecType::G722;
185        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
186
187        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
188        self.local_source = Some(source);
189
190        let payload_type = self
191            .rtc_config
192            .payload_type
193            .unwrap_or_else(|| codec.payload_type());
194
195        self.payload_type = Some(payload_type);
196
197        let params = RtpCodecParameters {
198            clock_rate: codec.clock_rate(),
199            channels: codec.channels() as u8,
200            payload_type,
201            ..Default::default()
202        };
203
204        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
205
206        // Spawn Handler Logic
207        self.spawn_handlers(
208            peer_connection.clone(),
209            self.track_id.clone(),
210            self.processor_chain.clone(),
211            payload_type,
212        );
213
214        Ok(())
215    }
216
217    fn spawn_handlers(
218        &self,
219        pc: Arc<PeerConnection>,
220        track_id: TrackId,
221        processor_chain: ProcessorChain,
222        default_payload_type: u8,
223    ) {
224        let cancel_token = self.cancel_token.clone();
225        let packet_sender = self.packet_sender.clone();
226        let pc_event = pc.clone();
227        let pc_stats = pc.clone();
228        let pc_state = pc.clone();
229        let track_id_log = track_id.clone();
230        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
231
232        crate::spawn(async move {
233            info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
234
235            let mut events = futures::stream::unfold(pc_event, |pc| async move {
236                pc.recv().await.map(|ev| (ev, pc))
237            })
238            .boxed();
239
240            let mut state_rx = if is_webrtc {
241                Some(pc_state.subscribe_peer_state())
242            } else {
243                None
244            };
245
246            let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
247            let mut event_count = 0;
248            let mut workers = FuturesUnordered::new();
249
250            loop {
251                tokio::select! {
252                    _ = cancel_token.cancelled() => {
253                        debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
254                        break;
255                    }
256
257                    Some(event) = events.next() => {
258                        event_count += 1;
259                        let event_type = match &event {
260                            PeerConnectionEvent::Track(_) => "Track",
261                            PeerConnectionEvent::DataChannel(_) => "DataChannel",
262                        };
263                        debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
264
265                        if let PeerConnectionEvent::Track(transceiver) = event {
266                            if let Some(receiver) = transceiver.receiver() {
267                                let track = receiver.track();
268                                info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
269
270                                let (f1, f2) = Self::create_track_workers(
271                                    track,
272                                    packet_sender.clone(),
273                                    track_id_log.clone(),
274                                    processor_chain.clone(),
275                                    default_payload_type,
276                                );
277                                workers.push(f1);
278                                workers.push(f2);
279                            }
280                        }
281                    }
282
283                    _ = workers.next(), if !workers.is_empty() => {}
284
285                    _ = stats_interval.tick() => {
286                        match pc_stats.get_stats().await {
287                            Ok(stats) => {
288                                info!(track_id=%track_id_log, %stats, "RTCP Stats");
289                            }
290                            Err(e) => {
291                                debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
292                            }
293                        }
294                    }
295
296                    // Handle State Changes (WebRTC Only)
297                    res = async {
298                        if let Some(rx) = state_rx.as_mut() {
299                            rx.changed().await
300                        } else {
301                            std::future::pending().await
302                        }
303                    } => {
304                        if res.is_ok() {
305                            if let Some(rx) = state_rx.as_ref() {
306                                let s = *rx.borrow();
307                                debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
308                                match s {
309                                    PeerConnectionState::Disconnected
310                                    | PeerConnectionState::Closed
311                                    | PeerConnectionState::Failed => {
312                                        info!(
313                                            track_id = %track_id_log,
314                                            "peer connection is {:?}, try to close", s
315                                        );
316                                        cancel_token.cancel();
317                                        pc_state.close();
318                                        break;
319                                    }
320                                    _ => {}
321                                }
322                            }
323                        }
324                    }
325                }
326            }
327            debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
328        });
329    }
330
331    fn create_track_workers(
332        track: Arc<SampleStreamTrack>,
333        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
334        track_id: TrackId,
335        processor_chain: ProcessorChain,
336        default_payload_type: u8,
337    ) -> (
338        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
339        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
340    ) {
341        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
342
343        // Processing Worker
344        let track_id_proc = track_id.clone();
345        let packet_sender_proc = packet_sender_arc.clone();
346        let processor_chain_proc = processor_chain.clone();
347        let proc_fut = Self::run_processing_worker(
348            rx,
349            track_id_proc,
350            packet_sender_proc,
351            processor_chain_proc,
352            default_payload_type,
353        );
354
355        // Receiving Worker
356        let track_id_recv = track_id.clone();
357        let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
358
359        (proc_fut.boxed(), recv_fut.boxed())
360    }
361
362    async fn run_processing_worker(
363        mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
364        track_id: TrackId,
365        packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
366        mut processor_chain: ProcessorChain,
367        default_payload_type: u8,
368    ) {
369        info!(track_id=%track_id, "RtcTrack processing worker started");
370        while let Some(frame) = rx.recv().await {
371            let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
372                frame,
373                &track_id,
374                &packet_sender,
375                &mut processor_chain,
376                default_payload_type,
377            ))
378            .catch_unwind()
379            .await;
380
381            if let Err(cause) = res {
382                let msg = if let Some(s) = cause.downcast_ref::<&str>() {
383                    *s
384                } else if let Some(s) = cause.downcast_ref::<String>() {
385                    &s[..]
386                } else {
387                    "Unknown panic"
388                };
389                tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
390                break;
391            }
392        }
393        info!(track_id=%track_id, "RtcTrack processing worker stopped");
394    }
395
396    async fn run_receiving_worker(
397        track: Arc<SampleStreamTrack>,
398        tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
399        track_id: TrackId,
400    ) {
401        let mut samples =
402            futures::stream::unfold(
403                track,
404                |t| async move { t.recv().await.ok().map(|s| (s, t)) },
405            )
406            .boxed();
407
408        while let Some(sample) = samples.next().await {
409            if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
410                if let Err(_) = tx.send(frame) {
411                    break;
412                }
413            } else {
414                debug!(track_id=%track_id, "Received non-audio sample");
415            }
416        }
417        info!(track_id=%track_id, "RtcTrack receiving worker stopped");
418    }
419
420    async fn process_audio_frame(
421        frame: rustrtc::media::frame::AudioFrame,
422        track_id: &TrackId,
423        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
424        processor_chain: &mut ProcessorChain,
425        default_payload_type: u8,
426    ) {
427        let packet_sender = packet_sender.lock().await;
428        if let Some(sender) = packet_sender.as_ref() {
429            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
430            let src_codec = match CodecType::try_from(payload_type) {
431                Ok(c) => c,
432                Err(_) => {
433                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
434                    return;
435                }
436            };
437
438            let mut af = AudioFrame {
439                track_id: track_id.clone(),
440                samples: crate::media::Samples::RTP {
441                    payload_type,
442                    payload: frame.data.to_vec(),
443                    sequence_number: frame.sequence_number.unwrap_or(0),
444                },
445                timestamp: crate::media::get_timestamp(),
446                sample_rate: src_codec.samplerate(),
447                channels: src_codec.channels(),
448                ..Default::default()
449            };
450            if let Err(e) = processor_chain.process_frame(&mut af) {
451                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
452            }
453
454            sender.send(af).ok();
455        }
456    }
457
458    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
459        use crate::media::negotiate::parse_rtpmap;
460        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
461
462        if let Some(media) = sdp
463            .media_sections
464            .iter()
465            .find(|m| m.kind == MediaKind::Audio)
466        {
467            for attr in &media.attributes {
468                if attr.key == "rtpmap" {
469                    if let Some(value) = &attr.value {
470                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
471                            self.encoder.set_payload_type(pt, codec.clone());
472                            self.processor_chain.codec.set_payload_type(pt, codec);
473                        }
474                    }
475                }
476            }
477
478            // Negotiate primary audio codec
479            let mut negotiated = None;
480
481            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
482            // that is also present in the answer.
483            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
484                for preferred_codec in &self.rtc_config.codecs {
485                    if *preferred_codec == CodecType::TelephoneEvent {
486                        continue;
487                    }
488                    for fmt in &media.formats {
489                        if let Ok(pt) = fmt.parse::<u8>() {
490                            let codec = self
491                                .encoder
492                                .payload_type_map
493                                .get(&pt)
494                                .cloned()
495                                .or_else(|| CodecType::try_from(pt).ok());
496                            if let Some(c) = codec {
497                                if c == *preferred_codec {
498                                    negotiated = Some((pt, c));
499                                    break;
500                                }
501                            }
502                        }
503                    }
504                    if negotiated.is_some() {
505                        break;
506                    }
507                }
508            }
509
510            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
511            if negotiated.is_none() {
512                for fmt in &media.formats {
513                    if let Ok(pt) = fmt.parse::<u8>() {
514                        let codec = self
515                            .encoder
516                            .payload_type_map
517                            .get(&pt)
518                            .cloned()
519                            .or_else(|| CodecType::try_from(pt).ok());
520
521                        if let Some(codec) = codec {
522                            if codec != CodecType::TelephoneEvent {
523                                negotiated = Some((pt, codec));
524                                break;
525                            }
526                        }
527                    }
528                }
529            }
530
531            if let Some((pt, codec)) = negotiated {
532                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
533                self.payload_type = Some(pt);
534            }
535        }
536        Ok(())
537    }
538
539    fn normalize_sdp(sdp: &str) -> String {
540        sdp.lines()
541            .map(|line| {
542                if line.starts_with("o=") {
543                    let parts: Vec<&str> = line.split_whitespace().collect();
544                    if parts.len() >= 3 {
545                        return format!("o= {} {}", parts[1], parts[2]);
546                    }
547                }
548                line.to_string()
549            })
550            .filter(|line| {
551                !line.starts_with("t=") &&  // timing line can vary
552                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
553                !line.starts_with("a=msid:") &&  // media stream ID
554                !line.trim().is_empty()
555            })
556            .collect::<Vec<_>>()
557            .join("\n")
558    }
559
560    async fn update_remote_description_internal(
561        &mut self,
562        answer: &String,
563        force_update: bool,
564    ) -> Result<()> {
565        info!(
566            track_id=%self.track_id,
567            "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
568            force_update,
569            self.last_remote_sdp.is_some(),
570            self.rtc_config.mode
571        );
572
573        if let Some(pc) = &self.peer_connection {
574            if !force_update {
575                if let Some(ref last_sdp) = self.last_remote_sdp {
576                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
577                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
578                        return Ok(());
579                    }
580                }
581            } else {
582                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
583            }
584
585            let _is_first_remote_sdp = self.last_remote_sdp.is_none();
586
587            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
588            match pc.set_remote_description(sdp_obj.clone()).await {
589                Ok(_) => {
590                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
591                    self.last_remote_sdp = Some(answer.clone());
592                }
593                Err(e) => {
594                    if self.rtc_config.mode == TransportMode::Rtp {
595                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
596
597                        if let Some(current_local) = pc.local_description() {
598                            let sdp = current_local.to_sdp_string();
599                            for line in sdp.lines() {
600                                if line.starts_with("a=ssrc:") {
601                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
602                                }
603                            }
604                        }
605
606                        let offer = pc.create_offer().await?;
607
608                        let sdp = offer.to_sdp_string();
609                        for line in sdp.lines() {
610                            if line.starts_with("a=ssrc:") {
611                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
612                            }
613                        }
614
615                        pc.set_local_description(offer)?;
616                        pc.set_remote_description(sdp_obj).await?;
617                        self.last_remote_sdp = Some(answer.clone());
618                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
619                    } else {
620                        return Err(e.into());
621                    }
622                }
623            }
624
625            // Track events will be handled by the event loop after SSRC latching
626
627            // Extract negotiated payload types from SDP string
628            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
629        }
630        Ok(())
631    }
632}
633
634#[async_trait]
635impl Track for RtcTrack {
636    fn ssrc(&self) -> u32 {
637        self.ssrc
638    }
639    fn id(&self) -> &TrackId {
640        &self.track_id
641    }
642    fn config(&self) -> &TrackConfig {
643        &self.track_config
644    }
645    fn processor_chain(&mut self) -> &mut ProcessorChain {
646        &mut self.processor_chain
647    }
648
649    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
650        info!(track_id=%self.track_id, "rtc handshake start");
651        self.create().await?;
652
653        let pc = self.peer_connection.clone().ok_or_else(|| {
654            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
655        })?;
656
657        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
658        for (i, t) in pc.get_transceivers().iter().enumerate() {
659            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
660                i, t.kind(), t.mid(), t.direction());
661        }
662
663        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
664        pc.set_remote_description(sdp.clone()).await?;
665
666        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
667        for (i, t) in pc.get_transceivers().iter().enumerate() {
668            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
669                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
670        }
671
672        // For RTP mode: Wait for PeerConnectionEvent::Track after SSRC latching completes
673        // For WebRTC mode: The event loop will handle Track events
674        info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
675
676        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
677
678        let mut answer = pc.create_answer().await?;
679        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
680
681        pc.set_local_description(answer.clone())?;
682
683        if self.rtc_config.mode != TransportMode::Rtp {
684            pc.wait_for_gathering_complete().await;
685        }
686
687        let final_answer = pc
688            .local_description()
689            .ok_or(anyhow::anyhow!("No local description"))?;
690
691        Ok(final_answer.to_sdp_string())
692    }
693
694    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
695        self.update_remote_description_internal(answer, false).await
696    }
697
698    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
699        self.update_remote_description_internal(answer, true).await
700    }
701
702    async fn start(
703        &mut self,
704        event_sender: EventSender,
705        packet_sender: TrackPacketSender,
706    ) -> Result<()> {
707        *self.packet_sender.lock().await = Some(packet_sender.clone());
708        let token_clone = self.cancel_token.clone();
709        let event_sender_clone = event_sender.clone();
710        let track_id = self.track_id.clone();
711        let ssrc = self.ssrc;
712
713        if self.rtc_config.mode != TransportMode::Rtp {
714            let start_time = crate::media::get_timestamp();
715            crate::spawn(async move {
716                token_clone.cancelled().await;
717                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
718                    track_id,
719                    timestamp: crate::media::get_timestamp(),
720                    duration: crate::media::get_timestamp() - start_time,
721                    ssrc,
722                    play_id: None,
723                });
724            });
725        }
726
727        Ok(())
728    }
729
730    async fn stop(&self) -> Result<()> {
731        self.cancel_token.cancel();
732        if let Some(pc) = &self.peer_connection {
733            pc.close();
734        }
735        Ok(())
736    }
737
738    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
739        let packet = packet.clone();
740
741        if let Some(source) = &self.local_source {
742            match &packet.samples {
743                crate::media::Samples::PCM { samples } => {
744                    let payload_type = self.get_payload_type();
745                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
746                    let target_codec = CodecType::try_from(payload_type)?;
747                    if !encoded.is_empty() {
748                        let clock_rate = target_codec.clock_rate();
749
750                        let now = Instant::now();
751                        if let Some(last_time) = self.last_packet_time {
752                            let elapsed = now.duration_since(last_time);
753                            if elapsed.as_millis() > 50 {
754                                let gap_increment =
755                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
756                                self.next_rtp_timestamp += gap_increment;
757                                self.need_marker = true;
758                            }
759                        }
760
761                        self.last_packet_time = Some(now);
762
763                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
764                            / packet.sample_rate as u64
765                            / self.track_config.channels as u64)
766                            as u32;
767                        let rtp_timestamp = self.next_rtp_timestamp;
768                        self.next_rtp_timestamp += timestamp_increment;
769                        let sequence_number = self.next_rtp_sequence_number;
770                        self.next_rtp_sequence_number += 1;
771
772                        let mut marker = false;
773                        if self.need_marker {
774                            marker = true;
775                            self.need_marker = false;
776                        }
777
778                        let frame = RtcAudioFrame {
779                            data: Bytes::from(encoded),
780                            clock_rate,
781                            payload_type: Some(payload_type),
782                            sequence_number: Some(sequence_number),
783                            rtp_timestamp,
784                            marker,
785                            ..Default::default()
786                        };
787                        source.send_audio(frame).await.ok();
788                    }
789                }
790                crate::media::Samples::RTP {
791                    payload,
792                    payload_type,
793                    sequence_number,
794                } => {
795                    let clock_rate = match *payload_type {
796                        0 | 8 | 9 | 18 => 8000,
797                        111 => 48000,
798                        _ => packet.sample_rate,
799                    };
800
801                    let now = Instant::now();
802                    if let Some(last_time) = self.last_packet_time {
803                        let elapsed = now.duration_since(last_time);
804                        if elapsed.as_millis() > 50 {
805                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
806                            self.next_rtp_timestamp += gap_increment;
807                            self.need_marker = true;
808                        }
809                    }
810                    self.last_packet_time = Some(now);
811
812                    let increment = match *payload_type {
813                        0 | 8 | 18 => payload.len() as u32,
814                        9 => payload.len() as u32,
815                        111 => (clock_rate / 50) as u32,
816                        _ => (clock_rate / 50) as u32,
817                    };
818
819                    let rtp_timestamp = self.next_rtp_timestamp;
820                    self.next_rtp_timestamp += increment;
821                    let sequence_number = *sequence_number;
822
823                    let mut marker = false;
824                    if self.need_marker {
825                        marker = true;
826                        self.need_marker = false;
827                    }
828
829                    let frame = RtcAudioFrame {
830                        data: Bytes::from(payload.clone()),
831                        clock_rate,
832                        payload_type: Some(*payload_type),
833                        sequence_number: Some(sequence_number),
834                        rtp_timestamp,
835                        marker,
836                        ..Default::default()
837                    };
838                    source.send_audio(frame).await.ok();
839                }
840                _ => {}
841            }
842        }
843        Ok(())
844    }
845}
846
847impl RtcTrack {
848    fn get_payload_type(&self) -> u8 {
849        if let Some(pt) = self.payload_type {
850            return pt;
851        }
852
853        self.rtc_config.payload_type.unwrap_or_else(|| {
854            match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
855                CodecType::PCMU => 0,
856                CodecType::PCMA => 8,
857                CodecType::Opus => 111,
858                CodecType::G722 => 9,
859                _ => 111,
860            }
861        })
862    }
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868    use crate::media::track::TrackConfig;
869
870    #[test]
871    fn test_parse_sdp_payload_types() {
872        let track_id = "test-track".to_string();
873        let cancel_token = CancellationToken::new();
874        let mut track = RtcTrack::new(
875            cancel_token,
876            track_id,
877            TrackConfig::default(),
878            RtcTrackConfig::default(),
879        );
880
881        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
882        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
883        track
884            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
885            .expect("parse offer");
886        assert_eq!(track.get_payload_type(), 8);
887
888        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
889        let mut rtc_config = RtcTrackConfig::default();
890        rtc_config.preferred_codec = Some(CodecType::PCMU);
891        let mut track2 = RtcTrack::new(
892            CancellationToken::new(),
893            "test-track-2".to_string(),
894            TrackConfig::default(),
895            rtc_config,
896        );
897
898        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
899        track2
900            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
901            .expect("parse offer");
902        assert_eq!(track2.get_payload_type(), 0);
903
904        // Case 3: Opus with dynamic payload type 111
905        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
906        track
907            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
908            .expect("parse offer");
909        assert_eq!(track.get_payload_type(), 111);
910    }
911
912    #[tokio::test]
913    async fn test_rtp_mode_handshake_spawns_handler() {
914        use rustrtc::TransportMode;
915
916        let track_id = "test-track-sip".to_string();
917        let cancel = CancellationToken::new();
918        let track_config = TrackConfig::default();
919        let mut rtc_config = RtcTrackConfig::default();
920        rtc_config.mode = TransportMode::Rtp;
921
922        let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
923
924        // Standard SIP/SDP offer
925        let offer = "v=0\r\n\
926o=- 123456 123456 IN IP4 172.0.0.1\r\n\
927s=-\r\n\
928c=IN IP4 172.0.0.1\r\n\
929t=0 0\r\n\
930m=audio 10000 RTP/AVP 0 101\r\n\
931a=rtpmap:0 PCMU/8000\r\n\
932a=rtpmap:101 telephone-event/8000\r\n\
933a=sendrecv\r\n";
934
935        // This should not panic and should set up the transceiver
936        let res = track.handshake(offer.to_string(), None).await;
937        assert!(res.is_ok());
938
939        // We can inspect the PeerConnection to ensure it has a transceiver with a receiver
940        if let Some(pc) = &track.peer_connection {
941            let transceivers = pc.get_transceivers();
942            // With the fix, we expect the logic to have iterated these transceivers.
943            // In RTP/Receive mode, we should have 1 transceiver with a receiver.
944            assert_eq!(transceivers.len(), 1);
945            assert!(transceivers[0].receiver().is_some());
946        } else {
947            panic!("PeerConnection not initialized");
948        }
949    }
950}