Skip to main content

active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub bind_ip: Option<String>,
39    pub preferred_codec: Option<CodecType>,
40    pub codecs: Vec<CodecType>,
41    pub payload_type: Option<u8>,
42    pub enable_latching: Option<bool>,
43}
44
45impl Default for RtcTrackConfig {
46    fn default() -> Self {
47        Self {
48            mode: TransportMode::WebRtc, // Default WebRTC behavior
49            ice_servers: None,
50            external_ip: None,
51            rtp_port_range: None,
52            bind_ip: None,
53            preferred_codec: None,
54            codecs: Vec::new(),
55            payload_type: None,
56            enable_latching: None,
57        }
58    }
59}
60
61pub struct RtcTrack {
62    track_id: TrackId,
63    track_config: TrackConfig,
64    rtc_config: RtcTrackConfig,
65    processor_chain: ProcessorChain,
66    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
67    cancel_token: CancellationToken,
68    local_source: Option<Arc<SampleStreamSource>>,
69    encoder: TrackCodec,
70    ssrc: u32,
71    payload_type: Option<u8>,
72    pub peer_connection: Option<Arc<PeerConnection>>,
73    next_rtp_timestamp: u32,
74    next_rtp_sequence_number: u16,
75    last_packet_time: Option<Instant>,
76    last_remote_sdp: Option<String>,
77    need_marker: bool,
78}
79
80impl RtcTrack {
81    pub fn new(
82        cancel_token: CancellationToken,
83        id: TrackId,
84        track_config: TrackConfig,
85        rtc_config: RtcTrackConfig,
86    ) -> Self {
87        let processor_chain = ProcessorChain::new(track_config.samplerate);
88        Self {
89            track_id: id,
90            track_config,
91            rtc_config,
92            processor_chain,
93            packet_sender: Arc::new(Mutex::new(None)),
94            cancel_token,
95            local_source: None,
96            encoder: TrackCodec::new(),
97            ssrc: 0,
98            payload_type: None,
99            peer_connection: None,
100            next_rtp_timestamp: 0,
101            next_rtp_sequence_number: 0,
102            last_packet_time: None,
103            last_remote_sdp: None,
104            need_marker: false,
105        }
106    }
107
108    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
109        self.ssrc = ssrc;
110        self
111    }
112
113    pub fn create_audio_track(
114        _codec: CodecType,
115        _stream_id: Option<String>,
116    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
117        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
118        (Arc::new(source), track)
119    }
120
121    pub async fn local_description(&self) -> Result<String> {
122        let pc = self
123            .peer_connection
124            .as_ref()
125            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
126        let offer = pc.create_offer().await?;
127        pc.set_local_description(offer.clone())?;
128        Ok(offer.to_sdp_string())
129    }
130
131    pub async fn create(&mut self) -> Result<()> {
132        if self.peer_connection.is_some() {
133            return Ok(());
134        }
135
136        let mut config = RtcConfiguration::default();
137        if self.ssrc != 0 {
138            config.ssrc_start = self.ssrc;
139        }
140        config.transport_mode = self.rtc_config.mode.clone();
141
142        if let Some(ice_servers) = &self.rtc_config.ice_servers {
143            config.ice_servers = ice_servers.clone();
144        }
145
146        if let Some(external_ip) = &self.rtc_config.external_ip {
147            config.external_ip = Some(external_ip.clone());
148        }
149        if let Some(bind_ip) = &self.rtc_config.bind_ip {
150            config.bind_ip = Some(bind_ip.clone());
151        }
152        if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
153            config.rtp_start_port = Some(rtp_start_port);
154            config.rtp_end_port = Some(rtp_end_port);
155        }
156
157        config.enable_latching = self
158            .rtc_config
159            .enable_latching
160            .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
161
162        if !self.rtc_config.codecs.is_empty() {
163            let mut caps = MediaCapabilities::default();
164            caps.audio.clear();
165
166            for codec in &self.rtc_config.codecs {
167                let cap = match codec {
168                    CodecType::PCMU => AudioCapability::pcmu(),
169                    CodecType::PCMA => AudioCapability::pcma(),
170                    CodecType::G722 => AudioCapability::g722(),
171                    CodecType::G729 => AudioCapability::g729(),
172                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
173                    #[cfg(feature = "opus")]
174                    CodecType::Opus => AudioCapability::opus(),
175                };
176                caps.audio.push(cap);
177            }
178            config.media_capabilities = Some(caps);
179        }
180
181        let peer_connection = Arc::new(PeerConnection::new(config));
182        self.peer_connection = Some(peer_connection.clone());
183
184        let default_codec = CodecType::G722;
185        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
186
187        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
188        self.local_source = Some(source);
189
190        let payload_type = self
191            .rtc_config
192            .payload_type
193            .unwrap_or_else(|| codec.payload_type());
194
195        self.payload_type = Some(payload_type);
196
197        let params = RtpCodecParameters {
198            clock_rate: codec.clock_rate(),
199            channels: codec.channels() as u8,
200            payload_type,
201            ..Default::default()
202        };
203
204        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
205
206        // Spawn Handler Logic
207        self.spawn_handlers(
208            peer_connection.clone(),
209            self.track_id.clone(),
210            self.processor_chain.clone(),
211            payload_type,
212        );
213
214        Ok(())
215    }
216
217    fn spawn_handlers(
218        &self,
219        pc: Arc<PeerConnection>,
220        track_id: TrackId,
221        processor_chain: ProcessorChain,
222        default_payload_type: u8,
223    ) {
224        let cancel_token = self.cancel_token.clone();
225        let packet_sender = self.packet_sender.clone();
226        let pc_event = pc.clone();
227        let pc_stats = pc.clone();
228        let pc_state = pc.clone();
229        let track_id_log = track_id.clone();
230        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
231
232        crate::spawn(async move {
233            info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
234
235            let mut events = futures::stream::unfold(pc_event, |pc| async move {
236                pc.recv().await.map(|ev| (ev, pc))
237            })
238            .boxed();
239
240            let mut state_rx = if is_webrtc {
241                Some(pc_state.subscribe_peer_state())
242            } else {
243                None
244            };
245
246            let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
247            let mut event_count = 0;
248            let mut workers = FuturesUnordered::new();
249
250            loop {
251                tokio::select! {
252                    _ = cancel_token.cancelled() => {
253                        debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
254                        break;
255                    }
256
257                    Some(event) = events.next() => {
258                        event_count += 1;
259                        let event_type = match &event {
260                            PeerConnectionEvent::Track(_) => "Track",
261                            PeerConnectionEvent::DataChannel(_) => "DataChannel",
262                        };
263                        debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
264
265                        if let PeerConnectionEvent::Track(transceiver) = event {
266                            if let Some(receiver) = transceiver.receiver() {
267                                let track = receiver.track();
268                                info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
269
270                                let (f1, f2) = Self::create_track_workers(
271                                    track,
272                                    packet_sender.clone(),
273                                    track_id_log.clone(),
274                                    processor_chain.clone(),
275                                    default_payload_type,
276                                );
277                                workers.push(f1);
278                                workers.push(f2);
279                            }
280                        }
281                    }
282
283                    _ = workers.next(), if !workers.is_empty() => {}
284
285                    _ = stats_interval.tick() => {
286                        match pc_stats.get_stats().await {
287                            Ok(stats) => {
288                                info!(track_id=%track_id_log, %stats, "RTCP Stats");
289                            }
290                            Err(e) => {
291                                debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
292                            }
293                        }
294                    }
295
296                    // Handle State Changes (WebRTC Only)
297                    res = async {
298                        if let Some(rx) = state_rx.as_mut() {
299                            rx.changed().await
300                        } else {
301                            std::future::pending().await
302                        }
303                    } => {
304                        if res.is_ok() {
305                            if let Some(rx) = state_rx.as_ref() {
306                                let s = *rx.borrow();
307                                debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
308                                match s {
309                                    PeerConnectionState::Disconnected
310                                    | PeerConnectionState::Closed
311                                    | PeerConnectionState::Failed => {
312                                        info!(
313                                            track_id = %track_id_log,
314                                            "peer connection is {:?}, try to close", s
315                                        );
316                                        cancel_token.cancel();
317                                        pc_state.close();
318                                        break;
319                                    }
320                                    _ => {}
321                                }
322                            }
323                        }
324                    }
325                }
326            }
327            debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
328        });
329    }
330
331    fn create_track_workers(
332        track: Arc<SampleStreamTrack>,
333        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
334        track_id: TrackId,
335        processor_chain: ProcessorChain,
336        default_payload_type: u8,
337    ) -> (
338        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
339        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
340    ) {
341        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
342
343        // Processing Worker
344        let track_id_proc = track_id.clone();
345        let packet_sender_proc = packet_sender_arc.clone();
346        let processor_chain_proc = processor_chain.clone();
347        let proc_fut = Self::run_processing_worker(
348            rx,
349            track_id_proc,
350            packet_sender_proc,
351            processor_chain_proc,
352            default_payload_type,
353        );
354
355        // Receiving Worker
356        let track_id_recv = track_id.clone();
357        let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
358
359        (proc_fut.boxed(), recv_fut.boxed())
360    }
361
362    async fn run_processing_worker(
363        mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
364        track_id: TrackId,
365        packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
366        mut processor_chain: ProcessorChain,
367        default_payload_type: u8,
368    ) {
369        info!(track_id=%track_id, "RtcTrack processing worker started");
370        while let Some(frame) = rx.recv().await {
371            let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
372                frame,
373                &track_id,
374                &packet_sender,
375                &mut processor_chain,
376                default_payload_type,
377            ))
378            .catch_unwind()
379            .await;
380
381            if let Err(cause) = res {
382                let msg = if let Some(s) = cause.downcast_ref::<&str>() {
383                    *s
384                } else if let Some(s) = cause.downcast_ref::<String>() {
385                    &s[..]
386                } else {
387                    "Unknown panic"
388                };
389                tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
390                break;
391            }
392        }
393        info!(track_id=%track_id, "RtcTrack processing worker stopped");
394    }
395
396    async fn run_receiving_worker(
397        track: Arc<SampleStreamTrack>,
398        tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
399        track_id: TrackId,
400    ) {
401        let mut samples =
402            futures::stream::unfold(
403                track,
404                |t| async move { t.recv().await.ok().map(|s| (s, t)) },
405            )
406            .boxed();
407
408        while let Some(sample) = samples.next().await {
409            if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
410                if let Err(_) = tx.send(frame) {
411                    break;
412                }
413            } else {
414                debug!(track_id=%track_id, "Received non-audio sample");
415            }
416        }
417        info!(track_id=%track_id, "RtcTrack receiving worker stopped");
418    }
419
420    async fn process_audio_frame(
421        frame: rustrtc::media::frame::AudioFrame,
422        track_id: &TrackId,
423        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
424        processor_chain: &mut ProcessorChain,
425        default_payload_type: u8,
426    ) {
427        let packet_sender = packet_sender.lock().await;
428        if let Some(sender) = packet_sender.as_ref() {
429            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
430            let src_codec = match CodecType::try_from(payload_type) {
431                Ok(c) => c,
432                Err(_) => {
433                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
434                    return;
435                }
436            };
437
438            let mut af = AudioFrame {
439                track_id: track_id.clone(),
440                samples: crate::media::Samples::RTP {
441                    payload_type,
442                    payload: frame.data.to_vec(),
443                    sequence_number: frame.sequence_number.unwrap_or(0),
444                },
445                timestamp: crate::media::get_timestamp(),
446                sample_rate: src_codec.samplerate(),
447                channels: src_codec.channels(),
448            };
449            if let Err(e) = processor_chain.process_frame(&mut af) {
450                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
451            }
452
453            sender.send(af).ok();
454        }
455    }
456
457    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
458        use crate::media::negotiate::parse_rtpmap;
459        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
460
461        if let Some(media) = sdp
462            .media_sections
463            .iter()
464            .find(|m| m.kind == MediaKind::Audio)
465        {
466            for attr in &media.attributes {
467                if attr.key == "rtpmap" {
468                    if let Some(value) = &attr.value {
469                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
470                            self.encoder.set_payload_type(pt, codec.clone());
471                            self.processor_chain.codec.set_payload_type(pt, codec);
472                        }
473                    }
474                }
475            }
476
477            // Negotiate primary audio codec
478            let mut negotiated = None;
479
480            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
481            // that is also present in the answer.
482            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
483                for preferred_codec in &self.rtc_config.codecs {
484                    if *preferred_codec == CodecType::TelephoneEvent {
485                        continue;
486                    }
487                    for fmt in &media.formats {
488                        if let Ok(pt) = fmt.parse::<u8>() {
489                            let codec = self
490                                .encoder
491                                .payload_type_map
492                                .get(&pt)
493                                .cloned()
494                                .or_else(|| CodecType::try_from(pt).ok());
495                            if let Some(c) = codec {
496                                if c == *preferred_codec {
497                                    negotiated = Some((pt, c));
498                                    break;
499                                }
500                            }
501                        }
502                    }
503                    if negotiated.is_some() {
504                        break;
505                    }
506                }
507            }
508
509            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
510            if negotiated.is_none() {
511                for fmt in &media.formats {
512                    if let Ok(pt) = fmt.parse::<u8>() {
513                        let codec = self
514                            .encoder
515                            .payload_type_map
516                            .get(&pt)
517                            .cloned()
518                            .or_else(|| CodecType::try_from(pt).ok());
519
520                        if let Some(codec) = codec {
521                            if codec != CodecType::TelephoneEvent {
522                                negotiated = Some((pt, codec));
523                                break;
524                            }
525                        }
526                    }
527                }
528            }
529
530            if let Some((pt, codec)) = negotiated {
531                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
532                self.payload_type = Some(pt);
533            }
534        }
535        Ok(())
536    }
537
538    fn normalize_sdp(sdp: &str) -> String {
539        sdp.lines()
540            .map(|line| {
541                if line.starts_with("o=") {
542                    let parts: Vec<&str> = line.split_whitespace().collect();
543                    if parts.len() >= 3 {
544                        return format!("o= {} {}", parts[1], parts[2]);
545                    }
546                }
547                line.to_string()
548            })
549            .filter(|line| {
550                !line.starts_with("t=") &&  // timing line can vary
551                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
552                !line.starts_with("a=msid:") &&  // media stream ID
553                !line.trim().is_empty()
554            })
555            .collect::<Vec<_>>()
556            .join("\n")
557    }
558
559    async fn update_remote_description_internal(
560        &mut self,
561        answer: &String,
562        force_update: bool,
563    ) -> Result<()> {
564        info!(
565            track_id=%self.track_id,
566            "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
567            force_update,
568            self.last_remote_sdp.is_some(),
569            self.rtc_config.mode
570        );
571
572        if let Some(pc) = &self.peer_connection {
573            if !force_update {
574                if let Some(ref last_sdp) = self.last_remote_sdp {
575                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
576                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
577                        return Ok(());
578                    }
579                }
580            } else {
581                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
582            }
583
584            let _is_first_remote_sdp = self.last_remote_sdp.is_none();
585
586            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
587            match pc.set_remote_description(sdp_obj.clone()).await {
588                Ok(_) => {
589                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
590                    self.last_remote_sdp = Some(answer.clone());
591                }
592                Err(e) => {
593                    if self.rtc_config.mode == TransportMode::Rtp {
594                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
595
596                        if let Some(current_local) = pc.local_description() {
597                            let sdp = current_local.to_sdp_string();
598                            for line in sdp.lines() {
599                                if line.starts_with("a=ssrc:") {
600                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
601                                }
602                            }
603                        }
604
605                        let offer = pc.create_offer().await?;
606
607                        let sdp = offer.to_sdp_string();
608                        for line in sdp.lines() {
609                            if line.starts_with("a=ssrc:") {
610                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
611                            }
612                        }
613
614                        pc.set_local_description(offer)?;
615                        pc.set_remote_description(sdp_obj).await?;
616                        self.last_remote_sdp = Some(answer.clone());
617                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
618                    } else {
619                        return Err(e.into());
620                    }
621                }
622            }
623
624            // Track events will be handled by the event loop after SSRC latching
625
626            // Extract negotiated payload types from SDP string
627            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
628        }
629        Ok(())
630    }
631}
632
633#[async_trait]
634impl Track for RtcTrack {
635    fn ssrc(&self) -> u32 {
636        self.ssrc
637    }
638    fn id(&self) -> &TrackId {
639        &self.track_id
640    }
641    fn config(&self) -> &TrackConfig {
642        &self.track_config
643    }
644    fn processor_chain(&mut self) -> &mut ProcessorChain {
645        &mut self.processor_chain
646    }
647
648    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
649        info!(track_id=%self.track_id, "rtc handshake start");
650        self.create().await?;
651
652        let pc = self.peer_connection.clone().ok_or_else(|| {
653            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
654        })?;
655
656        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
657        for (i, t) in pc.get_transceivers().iter().enumerate() {
658            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
659                i, t.kind(), t.mid(), t.direction());
660        }
661
662        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
663        pc.set_remote_description(sdp.clone()).await?;
664
665        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
666        for (i, t) in pc.get_transceivers().iter().enumerate() {
667            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
668                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
669        }
670
671        // For RTP mode: Wait for PeerConnectionEvent::Track after SSRC latching completes
672        // For WebRTC mode: The event loop will handle Track events
673        info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
674
675        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
676
677        let mut answer = pc.create_answer().await?;
678        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
679
680        pc.set_local_description(answer.clone())?;
681
682        if self.rtc_config.mode != TransportMode::Rtp {
683            pc.wait_for_gathering_complete().await;
684        }
685
686        let final_answer = pc
687            .local_description()
688            .ok_or(anyhow::anyhow!("No local description"))?;
689
690        Ok(final_answer.to_sdp_string())
691    }
692
693    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
694        self.update_remote_description_internal(answer, false).await
695    }
696
697    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
698        self.update_remote_description_internal(answer, true).await
699    }
700
701    async fn start(
702        &mut self,
703        event_sender: EventSender,
704        packet_sender: TrackPacketSender,
705    ) -> Result<()> {
706        *self.packet_sender.lock().await = Some(packet_sender.clone());
707        let token_clone = self.cancel_token.clone();
708        let event_sender_clone = event_sender.clone();
709        let track_id = self.track_id.clone();
710        let ssrc = self.ssrc;
711
712        if self.rtc_config.mode != TransportMode::Rtp {
713            let start_time = crate::media::get_timestamp();
714            crate::spawn(async move {
715                token_clone.cancelled().await;
716                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
717                    track_id,
718                    timestamp: crate::media::get_timestamp(),
719                    duration: crate::media::get_timestamp() - start_time,
720                    ssrc,
721                    play_id: None,
722                });
723            });
724        }
725
726        Ok(())
727    }
728
729    async fn stop(&self) -> Result<()> {
730        self.cancel_token.cancel();
731        if let Some(pc) = &self.peer_connection {
732            pc.close();
733        }
734        Ok(())
735    }
736
737    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
738        let packet = packet.clone();
739
740        if let Some(source) = &self.local_source {
741            match &packet.samples {
742                crate::media::Samples::PCM { samples } => {
743                    let payload_type = self.get_payload_type();
744                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
745                    let target_codec = CodecType::try_from(payload_type)?;
746                    if !encoded.is_empty() {
747                        let clock_rate = target_codec.clock_rate();
748
749                        let now = Instant::now();
750                        if let Some(last_time) = self.last_packet_time {
751                            let elapsed = now.duration_since(last_time);
752                            if elapsed.as_millis() > 50 {
753                                let gap_increment =
754                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
755                                self.next_rtp_timestamp += gap_increment;
756                                self.need_marker = true;
757                            }
758                        }
759
760                        self.last_packet_time = Some(now);
761
762                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
763                            / packet.sample_rate as u64
764                            / self.track_config.channels as u64)
765                            as u32;
766                        let rtp_timestamp = self.next_rtp_timestamp;
767                        self.next_rtp_timestamp += timestamp_increment;
768                        let sequence_number = self.next_rtp_sequence_number;
769                        self.next_rtp_sequence_number += 1;
770
771                        let mut marker = false;
772                        if self.need_marker {
773                            marker = true;
774                            self.need_marker = false;
775                        }
776
777                        let frame = RtcAudioFrame {
778                            data: Bytes::from(encoded),
779                            clock_rate,
780                            payload_type: Some(payload_type),
781                            sequence_number: Some(sequence_number),
782                            rtp_timestamp,
783                            marker,
784                            ..Default::default()
785                        };
786                        source.send_audio(frame).await.ok();
787                    }
788                }
789                crate::media::Samples::RTP {
790                    payload,
791                    payload_type,
792                    sequence_number,
793                } => {
794                    let clock_rate = match *payload_type {
795                        0 | 8 | 9 | 18 => 8000,
796                        111 => 48000,
797                        _ => packet.sample_rate,
798                    };
799
800                    let now = Instant::now();
801                    if let Some(last_time) = self.last_packet_time {
802                        let elapsed = now.duration_since(last_time);
803                        if elapsed.as_millis() > 50 {
804                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
805                            self.next_rtp_timestamp += gap_increment;
806                            self.need_marker = true;
807                        }
808                    }
809                    self.last_packet_time = Some(now);
810
811                    let increment = match *payload_type {
812                        0 | 8 | 18 => payload.len() as u32,
813                        9 => payload.len() as u32,
814                        111 => (clock_rate / 50) as u32,
815                        _ => (clock_rate / 50) as u32,
816                    };
817
818                    let rtp_timestamp = self.next_rtp_timestamp;
819                    self.next_rtp_timestamp += increment;
820                    let sequence_number = *sequence_number;
821
822                    let mut marker = false;
823                    if self.need_marker {
824                        marker = true;
825                        self.need_marker = false;
826                    }
827
828                    let frame = RtcAudioFrame {
829                        data: Bytes::from(payload.clone()),
830                        clock_rate,
831                        payload_type: Some(*payload_type),
832                        sequence_number: Some(sequence_number),
833                        rtp_timestamp,
834                        marker,
835                        ..Default::default()
836                    };
837                    source.send_audio(frame).await.ok();
838                }
839                _ => {}
840            }
841        }
842        Ok(())
843    }
844}
845
846impl RtcTrack {
847    fn get_payload_type(&self) -> u8 {
848        if let Some(pt) = self.payload_type {
849            return pt;
850        }
851
852        self.rtc_config.payload_type.unwrap_or_else(|| {
853            match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
854                CodecType::PCMU => 0,
855                CodecType::PCMA => 8,
856                CodecType::Opus => 111,
857                CodecType::G722 => 9,
858                _ => 111,
859            }
860        })
861    }
862}
863
864#[cfg(test)]
865mod tests {
866    use super::*;
867    use crate::media::track::TrackConfig;
868
869    #[test]
870    fn test_parse_sdp_payload_types() {
871        let track_id = "test-track".to_string();
872        let cancel_token = CancellationToken::new();
873        let mut track = RtcTrack::new(
874            cancel_token,
875            track_id,
876            TrackConfig::default(),
877            RtcTrackConfig::default(),
878        );
879
880        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
881        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
882        track
883            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
884            .expect("parse offer");
885        assert_eq!(track.get_payload_type(), 8);
886
887        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
888        let mut rtc_config = RtcTrackConfig::default();
889        rtc_config.preferred_codec = Some(CodecType::PCMU);
890        let mut track2 = RtcTrack::new(
891            CancellationToken::new(),
892            "test-track-2".to_string(),
893            TrackConfig::default(),
894            rtc_config,
895        );
896
897        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
898        track2
899            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
900            .expect("parse offer");
901        assert_eq!(track2.get_payload_type(), 0);
902
903        // Case 3: Opus with dynamic payload type 111
904        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
905        track
906            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
907            .expect("parse offer");
908        assert_eq!(track.get_payload_type(), 111);
909    }
910
911    #[tokio::test]
912    async fn test_rtp_mode_handshake_spawns_handler() {
913        use rustrtc::TransportMode;
914
915        let track_id = "test-track-sip".to_string();
916        let cancel = CancellationToken::new();
917        let track_config = TrackConfig::default();
918        let mut rtc_config = RtcTrackConfig::default();
919        rtc_config.mode = TransportMode::Rtp;
920
921        let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
922
923        // Standard SIP/SDP offer
924        let offer = "v=0\r\n\
925o=- 123456 123456 IN IP4 172.0.0.1\r\n\
926s=-\r\n\
927c=IN IP4 172.0.0.1\r\n\
928t=0 0\r\n\
929m=audio 10000 RTP/AVP 0 101\r\n\
930a=rtpmap:0 PCMU/8000\r\n\
931a=rtpmap:101 telephone-event/8000\r\n\
932a=sendrecv\r\n";
933
934        // This should not panic and should set up the transceiver
935        let res = track.handshake(offer.to_string(), None).await;
936        assert!(res.is_ok());
937
938        // We can inspect the PeerConnection to ensure it has a transceiver with a receiver
939        if let Some(pc) = &track.peer_connection {
940            let transceivers = pc.get_transceivers();
941            // With the fix, we expect the logic to have iterated these transceivers.
942            // In RTP/Receive mode, we should have 1 transceiver with a receiver.
943            assert_eq!(transceivers.len(), 1);
944            assert!(transceivers[0].receiver().is_some());
945        } else {
946            panic!("PeerConnection not initialized");
947        }
948    }
949}