Skip to main content

active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub preferred_codec: Option<CodecType>,
39    pub codecs: Vec<CodecType>,
40    pub payload_type: Option<u8>,
41    pub enable_latching: Option<bool>,
42}
43
44impl Default for RtcTrackConfig {
45    fn default() -> Self {
46        Self {
47            mode: TransportMode::WebRtc, // Default WebRTC behavior
48            ice_servers: None,
49            external_ip: None,
50            rtp_port_range: None,
51            preferred_codec: None,
52            codecs: Vec::new(),
53            payload_type: None,
54            enable_latching: None,
55        }
56    }
57}
58
59pub struct RtcTrack {
60    track_id: TrackId,
61    track_config: TrackConfig,
62    rtc_config: RtcTrackConfig,
63    processor_chain: ProcessorChain,
64    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
65    cancel_token: CancellationToken,
66    local_source: Option<Arc<SampleStreamSource>>,
67    encoder: TrackCodec,
68    ssrc: u32,
69    payload_type: Option<u8>,
70    pub peer_connection: Option<Arc<PeerConnection>>,
71    next_rtp_timestamp: u32,
72    next_rtp_sequence_number: u16,
73    last_packet_time: Option<Instant>,
74    last_remote_sdp: Option<String>,
75    need_marker: bool,
76}
77
78impl RtcTrack {
79    pub fn new(
80        cancel_token: CancellationToken,
81        id: TrackId,
82        track_config: TrackConfig,
83        rtc_config: RtcTrackConfig,
84    ) -> Self {
85        let processor_chain = ProcessorChain::new(track_config.samplerate);
86        Self {
87            track_id: id,
88            track_config,
89            rtc_config,
90            processor_chain,
91            packet_sender: Arc::new(Mutex::new(None)),
92            cancel_token,
93            local_source: None,
94            encoder: TrackCodec::new(),
95            ssrc: 0,
96            payload_type: None,
97            peer_connection: None,
98            next_rtp_timestamp: 0,
99            next_rtp_sequence_number: 0,
100            last_packet_time: None,
101            last_remote_sdp: None,
102            need_marker: false,
103        }
104    }
105
106    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
107        self.ssrc = ssrc;
108        self
109    }
110
111    pub fn create_audio_track(
112        _codec: CodecType,
113        _stream_id: Option<String>,
114    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
115        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
116        (Arc::new(source), track)
117    }
118
119    pub async fn local_description(&self) -> Result<String> {
120        let pc = self
121            .peer_connection
122            .as_ref()
123            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
124        let offer = pc.create_offer().await?;
125        pc.set_local_description(offer.clone())?;
126        Ok(offer.to_sdp_string())
127    }
128
129    pub async fn create(&mut self) -> Result<()> {
130        if self.peer_connection.is_some() {
131            return Ok(());
132        }
133
134        let mut config = RtcConfiguration::default();
135        if self.ssrc != 0 {
136            config.ssrc_start = self.ssrc;
137        }
138        config.transport_mode = self.rtc_config.mode.clone();
139
140        if let Some(ice_servers) = &self.rtc_config.ice_servers {
141            config.ice_servers = ice_servers.clone();
142        }
143
144        if let Some(external_ip) = &self.rtc_config.external_ip {
145            config.external_ip = Some(external_ip.clone());
146        }
147
148        config.enable_latching = self
149            .rtc_config
150            .enable_latching
151            .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
152
153        if !self.rtc_config.codecs.is_empty() {
154            let mut caps = MediaCapabilities::default();
155            caps.audio.clear();
156
157            for codec in &self.rtc_config.codecs {
158                let cap = match codec {
159                    CodecType::PCMU => AudioCapability::pcmu(),
160                    CodecType::PCMA => AudioCapability::pcma(),
161                    CodecType::G722 => AudioCapability::g722(),
162                    CodecType::G729 => AudioCapability::g729(),
163                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
164                    #[cfg(feature = "opus")]
165                    CodecType::Opus => AudioCapability::opus(),
166                };
167                caps.audio.push(cap);
168            }
169            config.media_capabilities = Some(caps);
170        }
171
172        let peer_connection = Arc::new(PeerConnection::new(config));
173        self.peer_connection = Some(peer_connection.clone());
174
175        let default_codec = CodecType::G722;
176        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
177
178        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
179        self.local_source = Some(source);
180
181        let payload_type = self
182            .rtc_config
183            .payload_type
184            .unwrap_or_else(|| codec.payload_type());
185
186        self.payload_type = Some(payload_type);
187
188        let params = RtpCodecParameters {
189            clock_rate: codec.clock_rate(),
190            channels: codec.channels() as u8,
191            payload_type,
192            ..Default::default()
193        };
194
195        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
196
197        // Spawn Handler Logic
198        self.spawn_handlers(
199            peer_connection.clone(),
200            self.track_id.clone(),
201            self.processor_chain.clone(),
202            payload_type,
203        );
204
205        Ok(())
206    }
207
208    fn spawn_handlers(
209        &self,
210        pc: Arc<PeerConnection>,
211        track_id: TrackId,
212        processor_chain: ProcessorChain,
213        default_payload_type: u8,
214    ) {
215        let cancel_token = self.cancel_token.clone();
216        let packet_sender = self.packet_sender.clone();
217        let pc_event = pc.clone();
218        let pc_stats = pc.clone();
219        let pc_state = pc.clone();
220        let track_id_log = track_id.clone();
221        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
222
223        crate::spawn(async move {
224            info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
225
226            let mut events = futures::stream::unfold(pc_event, |pc| async move {
227                pc.recv().await.map(|ev| (ev, pc))
228            })
229            .boxed();
230
231            let mut state_rx = if is_webrtc {
232                Some(pc_state.subscribe_peer_state())
233            } else {
234                None
235            };
236
237            let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
238            let mut event_count = 0;
239            let mut workers = FuturesUnordered::new();
240
241            loop {
242                tokio::select! {
243                    _ = cancel_token.cancelled() => {
244                        debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
245                        break;
246                    }
247
248                    Some(event) = events.next() => {
249                        event_count += 1;
250                        let event_type = match &event {
251                            PeerConnectionEvent::Track(_) => "Track",
252                            PeerConnectionEvent::DataChannel(_) => "DataChannel",
253                        };
254                        debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
255
256                        if let PeerConnectionEvent::Track(transceiver) = event {
257                            if let Some(receiver) = transceiver.receiver() {
258                                let track = receiver.track();
259                                info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
260
261                                let (f1, f2) = Self::create_track_workers(
262                                    track,
263                                    packet_sender.clone(),
264                                    track_id_log.clone(),
265                                    processor_chain.clone(),
266                                    default_payload_type,
267                                );
268                                workers.push(f1);
269                                workers.push(f2);
270                            }
271                        }
272                    }
273
274                    _ = workers.next(), if !workers.is_empty() => {}
275
276                    _ = stats_interval.tick() => {
277                        match pc_stats.get_stats().await {
278                            Ok(stats) => {
279                                info!(track_id=%track_id_log, %stats, "RTCP Stats");
280                            }
281                            Err(e) => {
282                                debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
283                            }
284                        }
285                    }
286
287                    // Handle State Changes (WebRTC Only)
288                    res = async {
289                        if let Some(rx) = state_rx.as_mut() {
290                            rx.changed().await
291                        } else {
292                            std::future::pending().await
293                        }
294                    } => {
295                        if res.is_ok() {
296                            if let Some(rx) = state_rx.as_ref() {
297                                let s = *rx.borrow();
298                                debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
299                                match s {
300                                    PeerConnectionState::Disconnected
301                                    | PeerConnectionState::Closed
302                                    | PeerConnectionState::Failed => {
303                                        info!(
304                                            track_id = %track_id_log,
305                                            "peer connection is {:?}, try to close", s
306                                        );
307                                        cancel_token.cancel();
308                                        pc_state.close();
309                                        break;
310                                    }
311                                    _ => {}
312                                }
313                            }
314                        }
315                    }
316                }
317            }
318            debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
319        });
320    }
321
322    fn create_track_workers(
323        track: Arc<SampleStreamTrack>,
324        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
325        track_id: TrackId,
326        processor_chain: ProcessorChain,
327        default_payload_type: u8,
328    ) -> (
329        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
330        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
331    ) {
332        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
333
334        // Processing Worker
335        let track_id_proc = track_id.clone();
336        let packet_sender_proc = packet_sender_arc.clone();
337        let processor_chain_proc = processor_chain.clone();
338        let proc_fut = Self::run_processing_worker(
339            rx,
340            track_id_proc,
341            packet_sender_proc,
342            processor_chain_proc,
343            default_payload_type,
344        );
345
346        // Receiving Worker
347        let track_id_recv = track_id.clone();
348        let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
349
350        (proc_fut.boxed(), recv_fut.boxed())
351    }
352
353    async fn run_processing_worker(
354        mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
355        track_id: TrackId,
356        packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
357        mut processor_chain: ProcessorChain,
358        default_payload_type: u8,
359    ) {
360        info!(track_id=%track_id, "RtcTrack processing worker started");
361        while let Some(frame) = rx.recv().await {
362            let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
363                frame,
364                &track_id,
365                &packet_sender,
366                &mut processor_chain,
367                default_payload_type,
368            ))
369            .catch_unwind()
370            .await;
371
372            if let Err(cause) = res {
373                let msg = if let Some(s) = cause.downcast_ref::<&str>() {
374                    *s
375                } else if let Some(s) = cause.downcast_ref::<String>() {
376                    &s[..]
377                } else {
378                    "Unknown panic"
379                };
380                tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
381                break;
382            }
383        }
384        info!(track_id=%track_id, "RtcTrack processing worker stopped");
385    }
386
387    async fn run_receiving_worker(
388        track: Arc<SampleStreamTrack>,
389        tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
390        track_id: TrackId,
391    ) {
392        let mut samples =
393            futures::stream::unfold(
394                track,
395                |t| async move { t.recv().await.ok().map(|s| (s, t)) },
396            )
397            .boxed();
398
399        while let Some(sample) = samples.next().await {
400            if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
401                if let Err(_) = tx.send(frame) {
402                    break;
403                }
404            } else {
405                debug!(track_id=%track_id, "Received non-audio sample");
406            }
407        }
408        info!(track_id=%track_id, "RtcTrack receiving worker stopped");
409    }
410
411    async fn process_audio_frame(
412        frame: rustrtc::media::frame::AudioFrame,
413        track_id: &TrackId,
414        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
415        processor_chain: &mut ProcessorChain,
416        default_payload_type: u8,
417    ) {
418        let packet_sender = packet_sender.lock().await;
419        if let Some(sender) = packet_sender.as_ref() {
420            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
421            let src_codec = match CodecType::try_from(payload_type) {
422                Ok(c) => c,
423                Err(_) => {
424                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
425                    return;
426                }
427            };
428
429            let mut af = AudioFrame {
430                track_id: track_id.clone(),
431                samples: crate::media::Samples::RTP {
432                    payload_type,
433                    payload: frame.data.to_vec(),
434                    sequence_number: frame.sequence_number.unwrap_or(0),
435                },
436                timestamp: crate::media::get_timestamp(),
437                sample_rate: src_codec.samplerate(),
438                channels: src_codec.channels(),
439            };
440            if let Err(e) = processor_chain.process_frame(&mut af) {
441                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
442            }
443
444            sender.send(af).ok();
445        }
446    }
447
448    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
449        use crate::media::negotiate::parse_rtpmap;
450        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
451
452        if let Some(media) = sdp
453            .media_sections
454            .iter()
455            .find(|m| m.kind == MediaKind::Audio)
456        {
457            for attr in &media.attributes {
458                if attr.key == "rtpmap" {
459                    if let Some(value) = &attr.value {
460                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
461                            self.encoder.set_payload_type(pt, codec.clone());
462                            self.processor_chain.codec.set_payload_type(pt, codec);
463                        }
464                    }
465                }
466            }
467
468            // Negotiate primary audio codec
469            let mut negotiated = None;
470
471            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
472            // that is also present in the answer.
473            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
474                for preferred_codec in &self.rtc_config.codecs {
475                    if *preferred_codec == CodecType::TelephoneEvent {
476                        continue;
477                    }
478                    for fmt in &media.formats {
479                        if let Ok(pt) = fmt.parse::<u8>() {
480                            let codec = self
481                                .encoder
482                                .payload_type_map
483                                .get(&pt)
484                                .cloned()
485                                .or_else(|| CodecType::try_from(pt).ok());
486                            if let Some(c) = codec {
487                                if c == *preferred_codec {
488                                    negotiated = Some((pt, c));
489                                    break;
490                                }
491                            }
492                        }
493                    }
494                    if negotiated.is_some() {
495                        break;
496                    }
497                }
498            }
499
500            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
501            if negotiated.is_none() {
502                for fmt in &media.formats {
503                    if let Ok(pt) = fmt.parse::<u8>() {
504                        let codec = self
505                            .encoder
506                            .payload_type_map
507                            .get(&pt)
508                            .cloned()
509                            .or_else(|| CodecType::try_from(pt).ok());
510
511                        if let Some(codec) = codec {
512                            if codec != CodecType::TelephoneEvent {
513                                negotiated = Some((pt, codec));
514                                break;
515                            }
516                        }
517                    }
518                }
519            }
520
521            if let Some((pt, codec)) = negotiated {
522                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
523                self.payload_type = Some(pt);
524            }
525        }
526        Ok(())
527    }
528
529    fn normalize_sdp(sdp: &str) -> String {
530        sdp.lines()
531            .map(|line| {
532                if line.starts_with("o=") {
533                    let parts: Vec<&str> = line.split_whitespace().collect();
534                    if parts.len() >= 3 {
535                        return format!("o= {} {}", parts[1], parts[2]);
536                    }
537                }
538                line.to_string()
539            })
540            .filter(|line| {
541                !line.starts_with("t=") &&  // timing line can vary
542                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
543                !line.starts_with("a=msid:") &&  // media stream ID
544                !line.trim().is_empty()
545            })
546            .collect::<Vec<_>>()
547            .join("\n")
548    }
549
550    async fn update_remote_description_internal(
551        &mut self,
552        answer: &String,
553        force_update: bool,
554    ) -> Result<()> {
555        info!(
556            track_id=%self.track_id,
557            "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
558            force_update,
559            self.last_remote_sdp.is_some(),
560            self.rtc_config.mode
561        );
562
563        if let Some(pc) = &self.peer_connection {
564            if !force_update {
565                if let Some(ref last_sdp) = self.last_remote_sdp {
566                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
567                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
568                        return Ok(());
569                    }
570                }
571            } else {
572                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
573            }
574
575            let _is_first_remote_sdp = self.last_remote_sdp.is_none();
576
577            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
578            match pc.set_remote_description(sdp_obj.clone()).await {
579                Ok(_) => {
580                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
581                    self.last_remote_sdp = Some(answer.clone());
582                }
583                Err(e) => {
584                    if self.rtc_config.mode == TransportMode::Rtp {
585                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
586
587                        if let Some(current_local) = pc.local_description() {
588                            let sdp = current_local.to_sdp_string();
589                            for line in sdp.lines() {
590                                if line.starts_with("a=ssrc:") {
591                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
592                                }
593                            }
594                        }
595
596                        let offer = pc.create_offer().await?;
597
598                        let sdp = offer.to_sdp_string();
599                        for line in sdp.lines() {
600                            if line.starts_with("a=ssrc:") {
601                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
602                            }
603                        }
604
605                        pc.set_local_description(offer)?;
606                        pc.set_remote_description(sdp_obj).await?;
607                        self.last_remote_sdp = Some(answer.clone());
608                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
609                    } else {
610                        return Err(e.into());
611                    }
612                }
613            }
614
615            // Track events will be handled by the event loop after SSRC latching
616
617            // Extract negotiated payload types from SDP string
618            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
619        }
620        Ok(())
621    }
622}
623
624#[async_trait]
625impl Track for RtcTrack {
626    fn ssrc(&self) -> u32 {
627        self.ssrc
628    }
629    fn id(&self) -> &TrackId {
630        &self.track_id
631    }
632    fn config(&self) -> &TrackConfig {
633        &self.track_config
634    }
635    fn processor_chain(&mut self) -> &mut ProcessorChain {
636        &mut self.processor_chain
637    }
638
639    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
640        info!(track_id=%self.track_id, "rtc handshake start");
641        self.create().await?;
642
643        let pc = self.peer_connection.clone().ok_or_else(|| {
644            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
645        })?;
646
647        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
648        for (i, t) in pc.get_transceivers().iter().enumerate() {
649            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}", 
650                i, t.kind(), t.mid(), t.direction());
651        }
652
653        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
654        pc.set_remote_description(sdp.clone()).await?;
655
656        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
657        for (i, t) in pc.get_transceivers().iter().enumerate() {
658            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}", 
659                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
660        }
661
662        // For RTP mode: Wait for PeerConnectionEvent::Track after SSRC latching completes
663        // For WebRTC mode: The event loop will handle Track events
664        info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
665
666        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
667
668        let mut answer = pc.create_answer().await?;
669        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
670
671        pc.set_local_description(answer.clone())?;
672
673        if self.rtc_config.mode != TransportMode::Rtp {
674            pc.wait_for_gathering_complete().await;
675        }
676
677        let final_answer = pc
678            .local_description()
679            .ok_or(anyhow::anyhow!("No local description"))?;
680
681        Ok(final_answer.to_sdp_string())
682    }
683
684    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
685        self.update_remote_description_internal(answer, false).await
686    }
687
688    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
689        self.update_remote_description_internal(answer, true).await
690    }
691
692    async fn start(
693        &mut self,
694        event_sender: EventSender,
695        packet_sender: TrackPacketSender,
696    ) -> Result<()> {
697        *self.packet_sender.lock().await = Some(packet_sender.clone());
698        let token_clone = self.cancel_token.clone();
699        let event_sender_clone = event_sender.clone();
700        let track_id = self.track_id.clone();
701        let ssrc = self.ssrc;
702
703        if self.rtc_config.mode != TransportMode::Rtp {
704            let start_time = crate::media::get_timestamp();
705            crate::spawn(async move {
706                token_clone.cancelled().await;
707                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
708                    track_id,
709                    timestamp: crate::media::get_timestamp(),
710                    duration: crate::media::get_timestamp() - start_time,
711                    ssrc,
712                    play_id: None,
713                });
714            });
715        }
716
717        Ok(())
718    }
719
720    async fn stop(&self) -> Result<()> {
721        self.cancel_token.cancel();
722        if let Some(pc) = &self.peer_connection {
723            pc.close();
724        }
725        Ok(())
726    }
727
728    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
729        let packet = packet.clone();
730
731        if let Some(source) = &self.local_source {
732            match &packet.samples {
733                crate::media::Samples::PCM { samples } => {
734                    let payload_type = self.get_payload_type();
735                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
736                    let target_codec = CodecType::try_from(payload_type)?;
737                    if !encoded.is_empty() {
738                        let clock_rate = target_codec.clock_rate();
739
740                        let now = Instant::now();
741                        if let Some(last_time) = self.last_packet_time {
742                            let elapsed = now.duration_since(last_time);
743                            if elapsed.as_millis() > 50 {
744                                let gap_increment =
745                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
746                                self.next_rtp_timestamp += gap_increment;
747                                self.need_marker = true;
748                            }
749                        }
750
751                        self.last_packet_time = Some(now);
752
753                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
754                            / packet.sample_rate as u64
755                            / self.track_config.channels as u64)
756                            as u32;
757                        let rtp_timestamp = self.next_rtp_timestamp;
758                        self.next_rtp_timestamp += timestamp_increment;
759                        let sequence_number = self.next_rtp_sequence_number;
760                        self.next_rtp_sequence_number += 1;
761
762                        let mut marker = false;
763                        if self.need_marker {
764                            marker = true;
765                            self.need_marker = false;
766                        }
767
768                        let frame = RtcAudioFrame {
769                            data: Bytes::from(encoded),
770                            clock_rate,
771                            payload_type: Some(payload_type),
772                            sequence_number: Some(sequence_number),
773                            rtp_timestamp,
774                            marker,
775                            ..Default::default()
776                        };
777                        source.send_audio(frame).await.ok();
778                    }
779                }
780                crate::media::Samples::RTP {
781                    payload,
782                    payload_type,
783                    sequence_number,
784                } => {
785                    let clock_rate = match *payload_type {
786                        0 | 8 | 9 | 18 => 8000,
787                        111 => 48000,
788                        _ => packet.sample_rate,
789                    };
790
791                    let now = Instant::now();
792                    if let Some(last_time) = self.last_packet_time {
793                        let elapsed = now.duration_since(last_time);
794                        if elapsed.as_millis() > 50 {
795                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
796                            self.next_rtp_timestamp += gap_increment;
797                            self.need_marker = true;
798                        }
799                    }
800                    self.last_packet_time = Some(now);
801
802                    let increment = match *payload_type {
803                        0 | 8 | 18 => payload.len() as u32,
804                        9 => payload.len() as u32,
805                        111 => (clock_rate / 50) as u32,
806                        _ => (clock_rate / 50) as u32,
807                    };
808
809                    let rtp_timestamp = self.next_rtp_timestamp;
810                    self.next_rtp_timestamp += increment;
811                    let sequence_number = *sequence_number;
812
813                    let mut marker = false;
814                    if self.need_marker {
815                        marker = true;
816                        self.need_marker = false;
817                    }
818
819                    let frame = RtcAudioFrame {
820                        data: Bytes::from(payload.clone()),
821                        clock_rate,
822                        payload_type: Some(*payload_type),
823                        sequence_number: Some(sequence_number),
824                        rtp_timestamp,
825                        marker,
826                        ..Default::default()
827                    };
828                    source.send_audio(frame).await.ok();
829                }
830                _ => {}
831            }
832        }
833        Ok(())
834    }
835}
836
837impl RtcTrack {
838    fn get_payload_type(&self) -> u8 {
839        if let Some(pt) = self.payload_type {
840            return pt;
841        }
842
843        self.rtc_config.payload_type.unwrap_or_else(|| {
844            match self.rtc_config.preferred_codec.unwrap_or(CodecType::Opus) {
845                CodecType::PCMU => 0,
846                CodecType::PCMA => 8,
847                CodecType::Opus => 111,
848                CodecType::G722 => 9,
849                _ => 111,
850            }
851        })
852    }
853}
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858    use crate::media::track::TrackConfig;
859
860    #[test]
861    fn test_parse_sdp_payload_types() {
862        let track_id = "test-track".to_string();
863        let cancel_token = CancellationToken::new();
864        let mut track = RtcTrack::new(
865            cancel_token,
866            track_id,
867            TrackConfig::default(),
868            RtcTrackConfig::default(),
869        );
870
871        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
872        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
873        track
874            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
875            .expect("parse offer");
876        assert_eq!(track.get_payload_type(), 8);
877
878        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
879        let mut rtc_config = RtcTrackConfig::default();
880        rtc_config.preferred_codec = Some(CodecType::PCMU);
881        let mut track2 = RtcTrack::new(
882            CancellationToken::new(),
883            "test-track-2".to_string(),
884            TrackConfig::default(),
885            rtc_config,
886        );
887
888        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
889        track2
890            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
891            .expect("parse offer");
892        assert_eq!(track2.get_payload_type(), 0);
893
894        // Case 3: Opus with dynamic payload type 111
895        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
896        track
897            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
898            .expect("parse offer");
899        assert_eq!(track.get_payload_type(), 111);
900    }
901
902    #[tokio::test]
903    async fn test_rtp_mode_handshake_spawns_handler() {
904        use rustrtc::TransportMode;
905
906        let track_id = "test-track-sip".to_string();
907        let cancel = CancellationToken::new();
908        let track_config = TrackConfig::default();
909        let mut rtc_config = RtcTrackConfig::default();
910        rtc_config.mode = TransportMode::Rtp;
911
912        let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
913
914        // Standard SIP/SDP offer
915        let offer = "v=0\r\n\
916o=- 123456 123456 IN IP4 172.0.0.1\r\n\
917s=-\r\n\
918c=IN IP4 172.0.0.1\r\n\
919t=0 0\r\n\
920m=audio 10000 RTP/AVP 0 101\r\n\
921a=rtpmap:0 PCMU/8000\r\n\
922a=rtpmap:101 telephone-event/8000\r\n\
923a=sendrecv\r\n";
924
925        // This should not panic and should set up the transceiver
926        let res = track.handshake(offer.to_string(), None).await;
927        assert!(res.is_ok());
928
929        // We can inspect the PeerConnection to ensure it has a transceiver with a receiver
930        if let Some(pc) = &track.peer_connection {
931            let transceivers = pc.get_transceivers();
932            // With the fix, we expect the logic to have iterated these transceivers.
933            // In RTP/Receive mode, we should have 1 transceiver with a receiver.
934            assert_eq!(transceivers.len(), 1);
935            assert!(transceivers[0].receiver().is_some());
936        } else {
937            panic!("PeerConnection not initialized");
938        }
939    }
940}