Skip to main content

active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub bind_ip: Option<String>,
39    pub preferred_codec: Option<CodecType>,
40    pub codecs: Vec<CodecType>,
41    pub payload_type: Option<u8>,
42    pub enable_latching: Option<bool>,
43    pub enable_ice_lite: Option<bool>,
44}
45
46impl Default for RtcTrackConfig {
47    fn default() -> Self {
48        Self {
49            mode: TransportMode::WebRtc, // Default WebRTC behavior
50            ice_servers: None,
51            external_ip: None,
52            rtp_port_range: None,
53            bind_ip: None,
54            preferred_codec: None,
55            codecs: Vec::new(),
56            payload_type: None,
57            enable_latching: None,
58            enable_ice_lite: None,
59        }
60    }
61}
62
63pub struct RtcTrack {
64    track_id: TrackId,
65    track_config: TrackConfig,
66    rtc_config: RtcTrackConfig,
67    processor_chain: ProcessorChain,
68    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
69    cancel_token: CancellationToken,
70    local_source: Option<Arc<SampleStreamSource>>,
71    encoder: TrackCodec,
72    ssrc: u32,
73    payload_type: Option<u8>,
74    pub peer_connection: Option<Arc<PeerConnection>>,
75    next_rtp_timestamp: u32,
76    next_rtp_sequence_number: u16,
77    last_packet_time: Option<Instant>,
78    last_remote_sdp: Option<String>,
79    need_marker: bool,
80}
81
82impl RtcTrack {
83    pub fn new(
84        cancel_token: CancellationToken,
85        id: TrackId,
86        track_config: TrackConfig,
87        rtc_config: RtcTrackConfig,
88    ) -> Self {
89        let processor_chain = ProcessorChain::new(track_config.samplerate);
90        Self {
91            track_id: id,
92            track_config,
93            rtc_config,
94            processor_chain,
95            packet_sender: Arc::new(Mutex::new(None)),
96            cancel_token,
97            local_source: None,
98            encoder: TrackCodec::new(),
99            ssrc: 0,
100            payload_type: None,
101            peer_connection: None,
102            next_rtp_timestamp: 0,
103            next_rtp_sequence_number: 0,
104            last_packet_time: None,
105            last_remote_sdp: None,
106            need_marker: false,
107        }
108    }
109
110    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
111        self.ssrc = ssrc;
112        self
113    }
114
115    pub fn create_audio_track(
116        _codec: CodecType,
117        _stream_id: Option<String>,
118    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
119        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
120        (Arc::new(source), track)
121    }
122
123    pub async fn local_description(&self) -> Result<String> {
124        let pc = self
125            .peer_connection
126            .as_ref()
127            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
128        let offer = pc.create_offer().await?;
129        pc.set_local_description(offer.clone())?;
130        Ok(offer.to_sdp_string())
131    }
132
133    pub async fn create(&mut self) -> Result<()> {
134        if self.peer_connection.is_some() {
135            return Ok(());
136        }
137
138        let mut config = RtcConfiguration::default();
139        if self.ssrc != 0 {
140            config.ssrc_start = self.ssrc;
141        }
142        config.transport_mode = self.rtc_config.mode.clone();
143
144        if let Some(ice_servers) = &self.rtc_config.ice_servers {
145            config.ice_servers = ice_servers.clone();
146        }
147
148        if let Some(external_ip) = &self.rtc_config.external_ip {
149            config.external_ip = Some(external_ip.clone());
150        }
151        if let Some(bind_ip) = &self.rtc_config.bind_ip {
152            config.bind_ip = Some(bind_ip.clone());
153        }
154        if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
155            config.rtp_start_port = Some(rtp_start_port);
156            config.rtp_end_port = Some(rtp_end_port);
157        }
158        config.enable_ice_lite = self.rtc_config.enable_ice_lite.unwrap_or(false);
159        config.enable_latching = self
160            .rtc_config
161            .enable_latching
162            .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
163
164        if !self.rtc_config.codecs.is_empty() {
165            let mut caps = MediaCapabilities::default();
166            caps.audio.clear();
167
168            for codec in &self.rtc_config.codecs {
169                let cap = match codec {
170                    CodecType::PCMU => AudioCapability::pcmu(),
171                    CodecType::PCMA => AudioCapability::pcma(),
172                    CodecType::G722 => AudioCapability::g722(),
173                    CodecType::G729 => AudioCapability::g729(),
174                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
175                    #[cfg(feature = "opus")]
176                    CodecType::Opus => AudioCapability::opus(),
177                };
178                caps.audio.push(cap);
179            }
180            config.media_capabilities = Some(caps);
181        }
182
183        let peer_connection = Arc::new(PeerConnection::new(config));
184        self.peer_connection = Some(peer_connection.clone());
185
186        let default_codec = CodecType::G722;
187        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
188
189        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
190        self.local_source = Some(source);
191
192        let payload_type = self
193            .rtc_config
194            .payload_type
195            .unwrap_or_else(|| codec.payload_type());
196
197        self.payload_type = Some(payload_type);
198
199        let params = RtpCodecParameters {
200            clock_rate: codec.clock_rate(),
201            channels: codec.channels() as u8,
202            payload_type,
203            ..Default::default()
204        };
205
206        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
207
208        // Spawn Handler Logic
209        self.spawn_handlers(
210            peer_connection.clone(),
211            self.track_id.clone(),
212            self.processor_chain.clone(),
213            payload_type,
214        );
215
216        Ok(())
217    }
218
219    fn spawn_handlers(
220        &self,
221        pc: Arc<PeerConnection>,
222        track_id: TrackId,
223        processor_chain: ProcessorChain,
224        default_payload_type: u8,
225    ) {
226        let cancel_token = self.cancel_token.clone();
227        let packet_sender = self.packet_sender.clone();
228        let pc_event = pc.clone();
229        let pc_stats = pc.clone();
230        let pc_state = pc.clone();
231        let track_id_log = track_id.clone();
232        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
233
234        crate::spawn(async move {
235            info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
236
237            let mut events = futures::stream::unfold(pc_event, |pc| async move {
238                pc.recv().await.map(|ev| (ev, pc))
239            })
240            .boxed();
241
242            let mut state_rx = if is_webrtc {
243                Some(pc_state.subscribe_peer_state())
244            } else {
245                None
246            };
247
248            let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
249            let mut event_count = 0;
250            let mut workers = FuturesUnordered::new();
251
252            loop {
253                tokio::select! {
254                    _ = cancel_token.cancelled() => {
255                        debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
256                        break;
257                    }
258
259                    Some(event) = events.next() => {
260                        event_count += 1;
261                        let event_type = match &event {
262                            PeerConnectionEvent::Track(_) => "Track",
263                            PeerConnectionEvent::DataChannel(_) => "DataChannel",
264                        };
265                        debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
266
267                        if let PeerConnectionEvent::Track(transceiver) = event {
268                            if let Some(receiver) = transceiver.receiver() {
269                                let track = receiver.track();
270                                info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
271
272                                let (f1, f2) = Self::create_track_workers(
273                                    track,
274                                    packet_sender.clone(),
275                                    track_id_log.clone(),
276                                    processor_chain.clone(),
277                                    default_payload_type,
278                                );
279                                workers.push(f1);
280                                workers.push(f2);
281                            }
282                        }
283                    }
284
285                    _ = workers.next(), if !workers.is_empty() => {}
286
287                    _ = stats_interval.tick() => {
288                        match pc_stats.get_stats().await {
289                            Ok(stats) => {
290                                info!(track_id=%track_id_log, %stats, "RTCP Stats");
291                            }
292                            Err(e) => {
293                                debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
294                            }
295                        }
296                    }
297
298                    // Handle State Changes (WebRTC Only)
299                    res = async {
300                        if let Some(rx) = state_rx.as_mut() {
301                            rx.changed().await
302                        } else {
303                            std::future::pending().await
304                        }
305                    } => {
306                        if res.is_ok() {
307                            if let Some(rx) = state_rx.as_ref() {
308                                let s = *rx.borrow();
309                                debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
310                                match s {
311                                    PeerConnectionState::Disconnected
312                                    | PeerConnectionState::Closed
313                                    | PeerConnectionState::Failed => {
314                                        info!(
315                                            track_id = %track_id_log,
316                                            "peer connection is {:?}, try to close", s
317                                        );
318                                        cancel_token.cancel();
319                                        pc_state.close();
320                                        break;
321                                    }
322                                    _ => {}
323                                }
324                            }
325                        }
326                    }
327                }
328            }
329            debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
330        });
331    }
332
333    fn create_track_workers(
334        track: Arc<SampleStreamTrack>,
335        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
336        track_id: TrackId,
337        processor_chain: ProcessorChain,
338        default_payload_type: u8,
339    ) -> (
340        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
341        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
342    ) {
343        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
344
345        // Processing Worker
346        let track_id_proc = track_id.clone();
347        let packet_sender_proc = packet_sender_arc.clone();
348        let processor_chain_proc = processor_chain.clone();
349        let proc_fut = Self::run_processing_worker(
350            rx,
351            track_id_proc,
352            packet_sender_proc,
353            processor_chain_proc,
354            default_payload_type,
355        );
356
357        // Receiving Worker
358        let track_id_recv = track_id.clone();
359        let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
360
361        (proc_fut.boxed(), recv_fut.boxed())
362    }
363
364    async fn run_processing_worker(
365        mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
366        track_id: TrackId,
367        packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
368        mut processor_chain: ProcessorChain,
369        default_payload_type: u8,
370    ) {
371        info!(track_id=%track_id, "RtcTrack processing worker started");
372        while let Some(frame) = rx.recv().await {
373            let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
374                frame,
375                &track_id,
376                &packet_sender,
377                &mut processor_chain,
378                default_payload_type,
379            ))
380            .catch_unwind()
381            .await;
382
383            if let Err(cause) = res {
384                let msg = if let Some(s) = cause.downcast_ref::<&str>() {
385                    *s
386                } else if let Some(s) = cause.downcast_ref::<String>() {
387                    &s[..]
388                } else {
389                    "Unknown panic"
390                };
391                tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
392                break;
393            }
394        }
395        info!(track_id=%track_id, "RtcTrack processing worker stopped");
396    }
397
398    async fn run_receiving_worker(
399        track: Arc<SampleStreamTrack>,
400        tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
401        track_id: TrackId,
402    ) {
403        let mut samples =
404            futures::stream::unfold(
405                track,
406                |t| async move { t.recv().await.ok().map(|s| (s, t)) },
407            )
408            .boxed();
409
410        while let Some(sample) = samples.next().await {
411            if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
412                if let Err(_) = tx.send(frame) {
413                    break;
414                }
415            } else {
416                debug!(track_id=%track_id, "Received non-audio sample");
417            }
418        }
419        info!(track_id=%track_id, "RtcTrack receiving worker stopped");
420    }
421
422    async fn process_audio_frame(
423        frame: rustrtc::media::frame::AudioFrame,
424        track_id: &TrackId,
425        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
426        processor_chain: &mut ProcessorChain,
427        default_payload_type: u8,
428    ) {
429        let packet_sender = packet_sender.lock().await;
430        if let Some(sender) = packet_sender.as_ref() {
431            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
432            let src_codec = match CodecType::try_from(payload_type) {
433                Ok(c) => c,
434                Err(_) => {
435                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
436                    return;
437                }
438            };
439
440            let mut af = AudioFrame {
441                track_id: track_id.clone(),
442                samples: crate::media::Samples::RTP {
443                    payload_type,
444                    payload: frame.data.to_vec(),
445                    sequence_number: frame.sequence_number.unwrap_or(0),
446                },
447                timestamp: crate::media::get_timestamp(),
448                sample_rate: src_codec.samplerate(),
449                channels: src_codec.channels(),
450                ..Default::default()
451            };
452            if let Err(e) = processor_chain.process_frame(&mut af) {
453                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
454            }
455
456            sender.send(af).ok();
457        }
458    }
459
460    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
461        use crate::media::negotiate::parse_rtpmap;
462        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
463
464        if let Some(media) = sdp
465            .media_sections
466            .iter()
467            .find(|m| m.kind == MediaKind::Audio)
468        {
469            for attr in &media.attributes {
470                if attr.key == "rtpmap" {
471                    if let Some(value) = &attr.value {
472                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
473                            self.encoder.set_payload_type(pt, codec.clone());
474                            self.processor_chain.codec.set_payload_type(pt, codec);
475                        }
476                    }
477                }
478            }
479
480            // Negotiate primary audio codec
481            let mut negotiated = None;
482
483            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
484            // that is also present in the answer.
485            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
486                for preferred_codec in &self.rtc_config.codecs {
487                    if *preferred_codec == CodecType::TelephoneEvent {
488                        continue;
489                    }
490                    for fmt in &media.formats {
491                        if let Ok(pt) = fmt.parse::<u8>() {
492                            let codec = self
493                                .encoder
494                                .payload_type_map
495                                .get(&pt)
496                                .cloned()
497                                .or_else(|| CodecType::try_from(pt).ok());
498                            if let Some(c) = codec {
499                                if c == *preferred_codec {
500                                    negotiated = Some((pt, c));
501                                    break;
502                                }
503                            }
504                        }
505                    }
506                    if negotiated.is_some() {
507                        break;
508                    }
509                }
510            }
511
512            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
513            if negotiated.is_none() {
514                for fmt in &media.formats {
515                    if let Ok(pt) = fmt.parse::<u8>() {
516                        let codec = self
517                            .encoder
518                            .payload_type_map
519                            .get(&pt)
520                            .cloned()
521                            .or_else(|| CodecType::try_from(pt).ok());
522
523                        if let Some(codec) = codec {
524                            if codec != CodecType::TelephoneEvent {
525                                negotiated = Some((pt, codec));
526                                break;
527                            }
528                        }
529                    }
530                }
531            }
532
533            if let Some((pt, codec)) = negotiated {
534                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
535                self.payload_type = Some(pt);
536            }
537        }
538        Ok(())
539    }
540
541    fn normalize_sdp(sdp: &str) -> String {
542        sdp.lines()
543            .map(|line| {
544                if line.starts_with("o=") {
545                    let parts: Vec<&str> = line.split_whitespace().collect();
546                    if parts.len() >= 3 {
547                        return format!("o= {} {}", parts[1], parts[2]);
548                    }
549                }
550                line.to_string()
551            })
552            .filter(|line| {
553                !line.starts_with("t=") &&  // timing line can vary
554                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
555                !line.starts_with("a=msid:") &&  // media stream ID
556                !line.trim().is_empty()
557            })
558            .collect::<Vec<_>>()
559            .join("\n")
560    }
561
562    async fn update_remote_description_internal(
563        &mut self,
564        answer: &String,
565        force_update: bool,
566    ) -> Result<()> {
567        info!(
568            track_id=%self.track_id,
569            "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
570            force_update,
571            self.last_remote_sdp.is_some(),
572            self.rtc_config.mode
573        );
574
575        if let Some(pc) = &self.peer_connection {
576            if !force_update {
577                if let Some(ref last_sdp) = self.last_remote_sdp {
578                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
579                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
580                        return Ok(());
581                    }
582                }
583            } else {
584                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
585            }
586
587            let _is_first_remote_sdp = self.last_remote_sdp.is_none();
588
589            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
590            match pc.set_remote_description(sdp_obj.clone()).await {
591                Ok(_) => {
592                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
593                    self.last_remote_sdp = Some(answer.clone());
594                }
595                Err(e) => {
596                    if self.rtc_config.mode == TransportMode::Rtp {
597                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
598
599                        if let Some(current_local) = pc.local_description() {
600                            let sdp = current_local.to_sdp_string();
601                            for line in sdp.lines() {
602                                if line.starts_with("a=ssrc:") {
603                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
604                                }
605                            }
606                        }
607
608                        let offer = pc.create_offer().await?;
609
610                        let sdp = offer.to_sdp_string();
611                        for line in sdp.lines() {
612                            if line.starts_with("a=ssrc:") {
613                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
614                            }
615                        }
616
617                        pc.set_local_description(offer)?;
618                        pc.set_remote_description(sdp_obj).await?;
619                        self.last_remote_sdp = Some(answer.clone());
620                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
621                    } else {
622                        return Err(e.into());
623                    }
624                }
625            }
626
627            // Track events will be handled by the event loop after SSRC latching
628
629            // Extract negotiated payload types from SDP string
630            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
631        }
632        Ok(())
633    }
634}
635
636#[async_trait]
637impl Track for RtcTrack {
638    fn ssrc(&self) -> u32 {
639        self.ssrc
640    }
641    fn id(&self) -> &TrackId {
642        &self.track_id
643    }
644    fn config(&self) -> &TrackConfig {
645        &self.track_config
646    }
647    fn processor_chain(&mut self) -> &mut ProcessorChain {
648        &mut self.processor_chain
649    }
650
651    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
652        info!(track_id=%self.track_id, "rtc handshake start");
653        self.create().await?;
654
655        let pc = self.peer_connection.clone().ok_or_else(|| {
656            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
657        })?;
658
659        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
660        for (i, t) in pc.get_transceivers().iter().enumerate() {
661            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
662                i, t.kind(), t.mid(), t.direction());
663        }
664
665        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
666        pc.set_remote_description(sdp.clone()).await?;
667
668        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
669        for (i, t) in pc.get_transceivers().iter().enumerate() {
670            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
671                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
672        }
673
674        // For RTP mode: Wait for PeerConnectionEvent::Track after SSRC latching completes
675        // For WebRTC mode: The event loop will handle Track events
676        info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
677
678        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
679
680        let mut answer = pc.create_answer().await?;
681        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
682
683        pc.set_local_description(answer.clone())?;
684
685        if self.rtc_config.mode != TransportMode::Rtp {
686            pc.wait_for_gathering_complete().await;
687        }
688
689        let final_answer = pc
690            .local_description()
691            .ok_or(anyhow::anyhow!("No local description"))?;
692
693        Ok(final_answer.to_sdp_string())
694    }
695
696    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
697        self.update_remote_description_internal(answer, false).await
698    }
699
700    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
701        self.update_remote_description_internal(answer, true).await
702    }
703
704    async fn start(
705        &mut self,
706        event_sender: EventSender,
707        packet_sender: TrackPacketSender,
708    ) -> Result<()> {
709        *self.packet_sender.lock().await = Some(packet_sender.clone());
710        let token_clone = self.cancel_token.clone();
711        let event_sender_clone = event_sender.clone();
712        let track_id = self.track_id.clone();
713        let ssrc = self.ssrc;
714
715        if self.rtc_config.mode != TransportMode::Rtp {
716            let start_time = crate::media::get_timestamp();
717            crate::spawn(async move {
718                token_clone.cancelled().await;
719                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
720                    track_id,
721                    timestamp: crate::media::get_timestamp(),
722                    duration: crate::media::get_timestamp() - start_time,
723                    ssrc,
724                    play_id: None,
725                });
726            });
727        }
728
729        Ok(())
730    }
731
732    async fn stop(&self) -> Result<()> {
733        self.cancel_token.cancel();
734        if let Some(pc) = &self.peer_connection {
735            pc.close();
736        }
737        Ok(())
738    }
739
740    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
741        let packet = packet.clone();
742
743        if let Some(source) = &self.local_source {
744            match &packet.samples {
745                crate::media::Samples::PCM { samples } => {
746                    let payload_type = self.get_payload_type();
747                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
748                    let target_codec = self
749                        .encoder
750                        .payload_type_map
751                        .get(&payload_type)
752                        .cloned()
753                        .or_else(|| CodecType::try_from(payload_type).ok())
754                        .ok_or_else(|| anyhow::anyhow!("Invalid codec type: {}", payload_type))?;
755                    if !encoded.is_empty() {
756                        let clock_rate = target_codec.clock_rate();
757
758                        let now = Instant::now();
759                        if let Some(last_time) = self.last_packet_time {
760                            let elapsed = now.duration_since(last_time);
761                            if elapsed.as_millis() > 50 {
762                                let gap_increment =
763                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
764                                self.next_rtp_timestamp += gap_increment;
765                                self.need_marker = true;
766                            }
767                        }
768
769                        self.last_packet_time = Some(now);
770
771                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
772                            / packet.sample_rate as u64
773                            / self.track_config.channels as u64)
774                            as u32;
775                        let rtp_timestamp = self.next_rtp_timestamp;
776                        self.next_rtp_timestamp += timestamp_increment;
777                        let sequence_number = self.next_rtp_sequence_number;
778                        self.next_rtp_sequence_number += 1;
779
780                        let mut marker = false;
781                        if self.need_marker {
782                            marker = true;
783                            self.need_marker = false;
784                        }
785
786                        let frame = RtcAudioFrame {
787                            data: Bytes::from(encoded),
788                            clock_rate,
789                            payload_type: Some(payload_type),
790                            sequence_number: Some(sequence_number),
791                            rtp_timestamp,
792                            marker,
793                            ..Default::default()
794                        };
795                        source.try_send_audio(frame).ok();
796                    }
797                }
798                crate::media::Samples::RTP {
799                    payload,
800                    payload_type,
801                    sequence_number,
802                } => {
803                    let target_codec = self
804                        .encoder
805                        .payload_type_map
806                        .get(payload_type)
807                        .cloned()
808                        .or_else(|| CodecType::try_from(*payload_type).ok())
809                        .ok_or_else(|| anyhow::anyhow!("Invalid codec type: {}", payload_type))?;
810                    let clock_rate = target_codec.clock_rate();
811
812                    let now = Instant::now();
813                    if let Some(last_time) = self.last_packet_time {
814                        let elapsed = now.duration_since(last_time);
815                        if elapsed.as_millis() > 50 {
816                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
817                            self.next_rtp_timestamp += gap_increment;
818                            self.need_marker = true;
819                        }
820                    }
821                    self.last_packet_time = Some(now);
822
823                    let increment = match *payload_type {
824                        0 | 8 | 18 => payload.len() as u32,
825                        9 => payload.len() as u32,
826                        111 => (clock_rate / 50) as u32,
827                        _ => (clock_rate / 50) as u32,
828                    };
829
830                    let rtp_timestamp = self.next_rtp_timestamp;
831                    self.next_rtp_timestamp += increment;
832                    let sequence_number = *sequence_number;
833
834                    let mut marker = false;
835                    if self.need_marker {
836                        marker = true;
837                        self.need_marker = false;
838                    }
839
840                    let frame = RtcAudioFrame {
841                        data: Bytes::from(payload.clone()),
842                        clock_rate,
843                        payload_type: Some(*payload_type),
844                        sequence_number: Some(sequence_number),
845                        rtp_timestamp,
846                        marker,
847                        ..Default::default()
848                    };
849                    source.try_send_audio(frame).ok();
850                }
851                _ => {}
852            }
853        }
854        Ok(())
855    }
856}
857
858impl RtcTrack {
859    fn get_payload_type(&self) -> u8 {
860        if let Some(pt) = self.payload_type {
861            return pt;
862        }
863
864        self.rtc_config.payload_type.unwrap_or_else(|| {
865            match self.rtc_config.preferred_codec.unwrap_or(CodecType::G722) {
866                CodecType::PCMU => 0,
867                CodecType::PCMA => 8,
868                #[cfg(feature = "opus")]
869                CodecType::Opus => 111,
870                CodecType::G722 => 9,
871                CodecType::G729 => 18,
872                _ => 111,
873            }
874        })
875    }
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use crate::media::track::TrackConfig;
882
883    #[test]
884    fn test_parse_sdp_payload_types() {
885        let track_id = "test-track".to_string();
886        let cancel_token = CancellationToken::new();
887        let mut track = RtcTrack::new(
888            cancel_token,
889            track_id,
890            TrackConfig::default(),
891            RtcTrackConfig::default(),
892        );
893
894        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
895        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
896        track
897            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
898            .expect("parse offer");
899        assert_eq!(track.get_payload_type(), 8);
900
901        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
902        let mut rtc_config = RtcTrackConfig::default();
903        rtc_config.preferred_codec = Some(CodecType::PCMU);
904        let mut track2 = RtcTrack::new(
905            CancellationToken::new(),
906            "test-track-2".to_string(),
907            TrackConfig::default(),
908            rtc_config,
909        );
910
911        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
912        track2
913            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
914            .expect("parse offer");
915        assert_eq!(track2.get_payload_type(), 0);
916
917        // Case 3: Opus with dynamic payload type 111
918        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
919        track
920            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
921            .expect("parse offer");
922        assert_eq!(track.get_payload_type(), 111);
923    }
924
925    #[tokio::test]
926    async fn test_rtp_mode_handshake_spawns_handler() {
927        use rustrtc::TransportMode;
928
929        let track_id = "test-track-sip".to_string();
930        let cancel = CancellationToken::new();
931        let track_config = TrackConfig::default();
932        let mut rtc_config = RtcTrackConfig::default();
933        rtc_config.mode = TransportMode::Rtp;
934
935        let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
936
937        // Standard SIP/SDP offer
938        let offer = "v=0\r\n\
939o=- 123456 123456 IN IP4 172.0.0.1\r\n\
940s=-\r\n\
941c=IN IP4 172.0.0.1\r\n\
942t=0 0\r\n\
943m=audio 10000 RTP/AVP 0 101\r\n\
944a=rtpmap:0 PCMU/8000\r\n\
945a=rtpmap:101 telephone-event/8000\r\n\
946a=sendrecv\r\n";
947
948        // This should not panic and should set up the transceiver
949        let res = track.handshake(offer.to_string(), None).await;
950        assert!(res.is_ok());
951
952        // We can inspect the PeerConnection to ensure it has a transceiver with a receiver
953        if let Some(pc) = &track.peer_connection {
954            let transceivers = pc.get_transceivers();
955            // With the fix, we expect the logic to have iterated these transceivers.
956            // In RTP/Receive mode, we should have 1 transceiver with a receiver.
957            assert_eq!(transceivers.len(), 1);
958            assert!(transceivers[0].receiver().is_some());
959        } else {
960            panic!("PeerConnection not initialized");
961        }
962    }
963}