Skip to main content

active_call/media/track/
rtc.rs

1use super::track_codec::TrackCodec;
2use crate::{
3    event::{EventSender, SessionEvent},
4    media::AudioFrame,
5    media::{
6        processor::ProcessorChain,
7        track::{Track, TrackConfig, TrackId, TrackPacketSender},
8    },
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use audio_codec::CodecType;
13use bytes::Bytes;
14use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
15use rustrtc::{
16    AudioCapability, IceServer, MediaKind, PeerConnection, PeerConnectionEvent,
17    PeerConnectionState, RtcConfiguration, RtpCodecParameters, SdpType, TransportMode,
18    config::MediaCapabilities,
19    media::{
20        MediaStreamTrack, SampleStreamSource, frame::AudioFrame as RtcAudioFrame, sample_track,
21        track::SampleStreamTrack,
22    },
23};
24use std::{
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info};
31
32#[derive(Clone)]
33pub struct RtcTrackConfig {
34    pub mode: TransportMode,
35    pub ice_servers: Option<Vec<IceServer>>,
36    pub external_ip: Option<String>,
37    pub rtp_port_range: Option<(u16, u16)>,
38    pub bind_ip: Option<String>,
39    pub preferred_codec: Option<CodecType>,
40    pub codecs: Vec<CodecType>,
41    pub payload_type: Option<u8>,
42    pub enable_latching: Option<bool>,
43    pub enable_ice_lite: Option<bool>,
44}
45
46impl Default for RtcTrackConfig {
47    fn default() -> Self {
48        Self {
49            mode: TransportMode::WebRtc, // Default WebRTC behavior
50            ice_servers: None,
51            external_ip: None,
52            rtp_port_range: None,
53            bind_ip: None,
54            preferred_codec: None,
55            codecs: Vec::new(),
56            payload_type: None,
57            enable_latching: None,
58            enable_ice_lite: None,
59        }
60    }
61}
62
63pub struct RtcTrack {
64    track_id: TrackId,
65    track_config: TrackConfig,
66    rtc_config: RtcTrackConfig,
67    processor_chain: ProcessorChain,
68    packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
69    cancel_token: CancellationToken,
70    local_source: Option<Arc<SampleStreamSource>>,
71    encoder: TrackCodec,
72    ssrc: u32,
73    payload_type: Option<u8>,
74    pub peer_connection: Option<Arc<PeerConnection>>,
75    next_rtp_timestamp: u32,
76    next_rtp_sequence_number: u16,
77    last_packet_time: Option<Instant>,
78    last_remote_sdp: Option<String>,
79    need_marker: bool,
80}
81
82impl RtcTrack {
83    pub fn new(
84        cancel_token: CancellationToken,
85        id: TrackId,
86        track_config: TrackConfig,
87        rtc_config: RtcTrackConfig,
88    ) -> Self {
89        let processor_chain = ProcessorChain::new(track_config.samplerate);
90        Self {
91            track_id: id,
92            track_config,
93            rtc_config,
94            processor_chain,
95            packet_sender: Arc::new(Mutex::new(None)),
96            cancel_token,
97            local_source: None,
98            encoder: TrackCodec::new(),
99            ssrc: 0,
100            payload_type: None,
101            peer_connection: None,
102            next_rtp_timestamp: 0,
103            next_rtp_sequence_number: 0,
104            last_packet_time: None,
105            last_remote_sdp: None,
106            need_marker: false,
107        }
108    }
109
110    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
111        self.ssrc = ssrc;
112        self
113    }
114
115    pub fn create_audio_track(
116        _codec: CodecType,
117        _stream_id: Option<String>,
118    ) -> (Arc<SampleStreamSource>, Arc<SampleStreamTrack>) {
119        let (source, track, _) = sample_track(rustrtc::media::MediaKind::Audio, 100);
120        (Arc::new(source), track)
121    }
122
123    pub async fn local_description(&self) -> Result<String> {
124        let pc = self
125            .peer_connection
126            .as_ref()
127            .ok_or_else(|| anyhow::anyhow!("No PeerConnection"))?;
128        let offer = pc.create_offer().await?;
129        pc.set_local_description(offer.clone())?;
130        Ok(offer.to_sdp_string())
131    }
132
133    pub async fn create(&mut self) -> Result<()> {
134        if self.peer_connection.is_some() {
135            return Ok(());
136        }
137
138        let mut config = RtcConfiguration::default();
139        if self.ssrc != 0 {
140            config.ssrc_start = self.ssrc;
141        }
142        config.transport_mode = self.rtc_config.mode.clone();
143
144        if let Some(ice_servers) = &self.rtc_config.ice_servers {
145            config.ice_servers = ice_servers.clone();
146        }
147
148        if let Some(external_ip) = &self.rtc_config.external_ip {
149            config.external_ip = Some(external_ip.clone());
150        }
151        if let Some(bind_ip) = &self.rtc_config.bind_ip {
152            config.bind_ip = Some(bind_ip.clone());
153        }
154        if let Some((rtp_start_port, rtp_end_port)) = self.rtc_config.rtp_port_range {
155            config.rtp_start_port = Some(rtp_start_port);
156            config.rtp_end_port = Some(rtp_end_port);
157        }
158        config.enable_ice_lite = self.rtc_config.enable_ice_lite.unwrap_or(false);
159        config.enable_latching = self
160            .rtc_config
161            .enable_latching
162            .unwrap_or_else(|| self.rtc_config.mode == TransportMode::Rtp);
163
164        if !self.rtc_config.codecs.is_empty() {
165            let mut caps = MediaCapabilities::default();
166            caps.audio.clear();
167
168            for codec in &self.rtc_config.codecs {
169                let cap = match codec {
170                    CodecType::PCMU => AudioCapability::pcmu(),
171                    CodecType::PCMA => AudioCapability::pcma(),
172                    CodecType::G722 => AudioCapability::g722(),
173                    CodecType::G729 => AudioCapability::g729(),
174                    CodecType::TelephoneEvent => AudioCapability::telephone_event(),
175                    #[cfg(feature = "opus")]
176                    CodecType::Opus => AudioCapability::opus(),
177                };
178                caps.audio.push(cap);
179            }
180            config.media_capabilities = Some(caps);
181        }
182
183        let peer_connection = Arc::new(PeerConnection::new(config));
184        self.peer_connection = Some(peer_connection.clone());
185
186        let default_codec = CodecType::G722;
187        let codec = self.rtc_config.preferred_codec.unwrap_or(default_codec);
188
189        let (source, track) = Self::create_audio_track(codec, Some(self.track_id.clone()));
190        self.local_source = Some(source);
191
192        let payload_type = self
193            .rtc_config
194            .payload_type
195            .unwrap_or_else(|| codec.payload_type());
196
197        self.payload_type = Some(payload_type);
198
199        let params = RtpCodecParameters {
200            clock_rate: codec.clock_rate(),
201            channels: codec.channels() as u8,
202            payload_type,
203            ..Default::default()
204        };
205
206        peer_connection.add_track_with_stream_id(track, self.track_id.clone(), params)?;
207
208        // Spawn Handler Logic
209        self.spawn_handlers(
210            peer_connection.clone(),
211            self.track_id.clone(),
212            self.processor_chain.clone(),
213            payload_type,
214        );
215
216        Ok(())
217    }
218
219    fn spawn_handlers(
220        &self,
221        pc: Arc<PeerConnection>,
222        track_id: TrackId,
223        processor_chain: ProcessorChain,
224        default_payload_type: u8,
225    ) {
226        let cancel_token = self.cancel_token.clone();
227        let packet_sender = self.packet_sender.clone();
228        let pc_event = pc.clone();
229        let pc_stats = pc.clone();
230        let pc_state = pc.clone();
231        let track_id_log = track_id.clone();
232        let is_webrtc = self.rtc_config.mode != TransportMode::Rtp;
233
234        crate::spawn(async move {
235            info!(track_id=%track_id_log, "RtcTrack event/stats loop started");
236
237            let mut events = futures::stream::unfold(pc_event, |pc| async move {
238                pc.recv().await.map(|ev| (ev, pc))
239            })
240            .boxed();
241
242            let mut state_rx = if is_webrtc {
243                Some(pc_state.subscribe_peer_state())
244            } else {
245                None
246            };
247
248            let mut stats_interval = tokio::time::interval(Duration::from_secs(5));
249            let mut event_count = 0;
250            let mut workers = FuturesUnordered::new();
251
252            loop {
253                tokio::select! {
254                    _ = cancel_token.cancelled() => {
255                        debug!(track_id=%track_id_log, "RtcTrack loop cancelled");
256                        break;
257                    }
258
259                    Some(event) = events.next() => {
260                        event_count += 1;
261                        let event_type = match &event {
262                            PeerConnectionEvent::Track(_) => "Track",
263                            PeerConnectionEvent::DataChannel(_) => "DataChannel",
264                        };
265                        debug!(track_id=%track_id_log, "Received PeerConnectionEvent #{}: {}", event_count, event_type);
266
267                        if let PeerConnectionEvent::Track(transceiver) = event {
268                            if let Some(receiver) = transceiver.receiver() {
269                                let track = receiver.track();
270                                info!(track_id=%track_id_log, "New track received (SSRC latching complete)");
271
272                                let (f1, f2) = Self::create_track_workers(
273                                    track,
274                                    packet_sender.clone(),
275                                    track_id_log.clone(),
276                                    processor_chain.clone(),
277                                    default_payload_type,
278                                );
279                                workers.push(f1);
280                                workers.push(f2);
281                            }
282                        }
283                    }
284
285                    _ = workers.next(), if !workers.is_empty() => {}
286
287                    _ = stats_interval.tick() => {
288                        match pc_stats.get_stats().await {
289                            Ok(stats) => {
290                                info!(track_id=%track_id_log, %stats, "RTCP Stats");
291                            }
292                            Err(e) => {
293                                debug!(track_id=%track_id_log, "Failed to get stats: {:?}", e);
294                            }
295                        }
296                    }
297
298                    // Handle State Changes (WebRTC Only)
299                    res = async {
300                        if let Some(rx) = state_rx.as_mut() {
301                            rx.changed().await
302                        } else {
303                            std::future::pending().await
304                        }
305                    } => {
306                        if res.is_ok() {
307                            if let Some(rx) = state_rx.as_ref() {
308                                let s = *rx.borrow();
309                                debug!(track_id=%track_id_log, "peer connection state changed: {:?}", s);
310                                match s {
311                                    PeerConnectionState::Disconnected
312                                    | PeerConnectionState::Closed
313                                    | PeerConnectionState::Failed => {
314                                        info!(
315                                            track_id = %track_id_log,
316                                            "peer connection is {:?}, try to close", s
317                                        );
318                                        cancel_token.cancel();
319                                        pc_state.close();
320                                        break;
321                                    }
322                                    _ => {}
323                                }
324                            }
325                        }
326                    }
327                }
328            }
329            debug!(track_id=%track_id_log, "RtcTrack event/stats loop ended, total events: {}", event_count);
330        });
331    }
332
333    fn create_track_workers(
334        track: Arc<SampleStreamTrack>,
335        packet_sender_arc: Arc<Mutex<Option<TrackPacketSender>>>,
336        track_id: TrackId,
337        processor_chain: ProcessorChain,
338        default_payload_type: u8,
339    ) -> (
340        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
341        std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
342    ) {
343        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<rustrtc::media::frame::AudioFrame>();
344
345        // Processing Worker
346        let track_id_proc = track_id.clone();
347        let packet_sender_proc = packet_sender_arc.clone();
348        let processor_chain_proc = processor_chain.clone();
349        let proc_fut = Self::run_processing_worker(
350            rx,
351            track_id_proc,
352            packet_sender_proc,
353            processor_chain_proc,
354            default_payload_type,
355        );
356
357        // Receiving Worker
358        let track_id_recv = track_id.clone();
359        let recv_fut = Self::run_receiving_worker(track, tx, track_id_recv);
360
361        (proc_fut.boxed(), recv_fut.boxed())
362    }
363
364    async fn run_processing_worker(
365        mut rx: tokio::sync::mpsc::UnboundedReceiver<rustrtc::media::frame::AudioFrame>,
366        track_id: TrackId,
367        packet_sender: Arc<Mutex<Option<TrackPacketSender>>>,
368        mut processor_chain: ProcessorChain,
369        default_payload_type: u8,
370    ) {
371        info!(track_id=%track_id, "RtcTrack processing worker started");
372        while let Some(frame) = rx.recv().await {
373            let res = std::panic::AssertUnwindSafe(Self::process_audio_frame(
374                frame,
375                &track_id,
376                &packet_sender,
377                &mut processor_chain,
378                default_payload_type,
379            ))
380            .catch_unwind()
381            .await;
382
383            if let Err(cause) = res {
384                let msg = if let Some(s) = cause.downcast_ref::<&str>() {
385                    *s
386                } else if let Some(s) = cause.downcast_ref::<String>() {
387                    &s[..]
388                } else {
389                    "Unknown panic"
390                };
391                tracing::error!(track_id=%track_id, "RtcTrack processing worker PANIC: {}", msg);
392                break;
393            }
394        }
395        info!(track_id=%track_id, "RtcTrack processing worker stopped");
396    }
397
398    async fn run_receiving_worker(
399        track: Arc<SampleStreamTrack>,
400        tx: tokio::sync::mpsc::UnboundedSender<rustrtc::media::frame::AudioFrame>,
401        track_id: TrackId,
402    ) {
403        let mut samples =
404            futures::stream::unfold(
405                track,
406                |t| async move { t.recv().await.ok().map(|s| (s, t)) },
407            )
408            .boxed();
409
410        while let Some(sample) = samples.next().await {
411            if let rustrtc::media::frame::MediaSample::Audio(frame) = sample {
412                if let Err(_) = tx.send(frame) {
413                    break;
414                }
415            } else {
416                debug!(track_id=%track_id, "Received non-audio sample");
417            }
418        }
419        info!(track_id=%track_id, "RtcTrack receiving worker stopped");
420    }
421
422    async fn process_audio_frame(
423        frame: rustrtc::media::frame::AudioFrame,
424        track_id: &TrackId,
425        packet_sender: &Arc<Mutex<Option<TrackPacketSender>>>,
426        processor_chain: &mut ProcessorChain,
427        default_payload_type: u8,
428    ) {
429        let packet_sender = packet_sender.lock().await;
430        if let Some(sender) = packet_sender.as_ref() {
431            let payload_type = frame.payload_type.unwrap_or(default_payload_type);
432            let src_codec = match CodecType::try_from(payload_type) {
433                Ok(c) => c,
434                Err(_) => {
435                    debug!(track_id=%track_id, "Unknown payload type {}, skipping frame", payload_type);
436                    return;
437                }
438            };
439
440            let mut af = AudioFrame {
441                track_id: track_id.clone(),
442                samples: crate::media::Samples::RTP {
443                    payload_type,
444                    payload: frame.data.to_vec(),
445                    sequence_number: frame.sequence_number.unwrap_or(0),
446                },
447                timestamp: crate::media::get_timestamp(),
448                sample_rate: src_codec.samplerate(),
449                channels: src_codec.channels(),
450                ..Default::default()
451            };
452            if let Err(e) = processor_chain.process_frame(&mut af) {
453                debug!(track_id=%track_id, "processor_chain process_frame error: {:?}", e);
454            }
455
456            sender.send(af).ok();
457        }
458    }
459
460    pub fn parse_sdp_payload_types(&mut self, sdp_type: SdpType, sdp_str: &str) -> Result<()> {
461        use crate::media::negotiate::parse_rtpmap;
462        let sdp = rustrtc::SessionDescription::parse(sdp_type, sdp_str)?;
463
464        if let Some(media) = sdp
465            .media_sections
466            .iter()
467            .find(|m| m.kind == MediaKind::Audio)
468        {
469            for attr in &media.attributes {
470                if attr.key == "rtpmap" {
471                    if let Some(value) = &attr.value {
472                        if let Ok((pt, codec, _, _)) = parse_rtpmap(value) {
473                            self.encoder.set_payload_type(pt, codec.clone());
474                            self.processor_chain.codec.set_payload_type(pt, codec);
475                        }
476                    }
477                }
478            }
479
480            // Negotiate primary audio codec
481            let mut negotiated = None;
482
483            // If we are the offerer (receiving an Answer), we prioritize our own preferred codec order
484            // that is also present in the answer.
485            if sdp_type == rustrtc::sdp::SdpType::Answer && !self.rtc_config.codecs.is_empty() {
486                for preferred_codec in &self.rtc_config.codecs {
487                    if *preferred_codec == CodecType::TelephoneEvent {
488                        continue;
489                    }
490                    for fmt in &media.formats {
491                        if let Ok(pt) = fmt.parse::<u8>() {
492                            let codec = self
493                                .encoder
494                                .payload_type_map
495                                .get(&pt)
496                                .cloned()
497                                .or_else(|| CodecType::try_from(pt).ok());
498                            if let Some(c) = codec {
499                                if c == *preferred_codec {
500                                    negotiated = Some((pt, c));
501                                    break;
502                                }
503                            }
504                        }
505                    }
506                    if negotiated.is_some() {
507                        break;
508                    }
509                }
510            }
511
512            // Fallback: use the first codec in the SDP (matches offerer's preference if we are answerer)
513            if negotiated.is_none() {
514                for fmt in &media.formats {
515                    if let Ok(pt) = fmt.parse::<u8>() {
516                        let codec = self
517                            .encoder
518                            .payload_type_map
519                            .get(&pt)
520                            .cloned()
521                            .or_else(|| CodecType::try_from(pt).ok());
522
523                        if let Some(codec) = codec {
524                            if codec != CodecType::TelephoneEvent {
525                                negotiated = Some((pt, codec));
526                                break;
527                            }
528                        }
529                    }
530                }
531            }
532
533            if let Some((pt, codec)) = negotiated {
534                info!(track_id=%self.track_id, "Negotiated primary audio PT {} ({:?})", pt, codec);
535                self.payload_type = Some(pt);
536            }
537        }
538        Ok(())
539    }
540
541    fn normalize_sdp(sdp: &str) -> String {
542        sdp.lines()
543            .map(|line| {
544                if line.starts_with("o=") {
545                    let parts: Vec<&str> = line.split_whitespace().collect();
546                    if parts.len() >= 3 {
547                        return format!("o= {} {}", parts[1], parts[2]);
548                    }
549                }
550                line.to_string()
551            })
552            .filter(|line| {
553                !line.starts_with("t=") &&  // timing line can vary
554                !line.starts_with("a=ssrc:") &&  // SSRC attributes (but SSRC change shows in o= version)
555                !line.starts_with("a=msid:") &&  // media stream ID
556                !line.trim().is_empty()
557            })
558            .collect::<Vec<_>>()
559            .join("\n")
560    }
561
562    async fn update_remote_description_internal(
563        &mut self,
564        answer: &String,
565        force_update: bool,
566    ) -> Result<()> {
567        info!(
568            track_id=%self.track_id,
569            "update_remote_description_internal called. force={}, last_sdp_is_some={}, mode={:?}",
570            force_update,
571            self.last_remote_sdp.is_some(),
572            self.rtc_config.mode
573        );
574
575        if let Some(pc) = &self.peer_connection {
576            if !force_update {
577                if let Some(ref last_sdp) = self.last_remote_sdp {
578                    if Self::normalize_sdp(last_sdp) == Self::normalize_sdp(answer) {
579                        debug!(track_id=%self.track_id, "SDP unchanged, skipping update_remote_description");
580                        return Ok(());
581                    }
582                }
583            } else {
584                debug!(track_id=%self.track_id, "Force update requested, skipping SDP comparison");
585            }
586
587            let _is_first_remote_sdp = self.last_remote_sdp.is_none();
588
589            let sdp_obj = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, answer)?;
590            match pc.set_remote_description(sdp_obj.clone()).await {
591                Ok(_) => {
592                    debug!(track_id=%self.track_id, "set_remote_description succeeded");
593                    self.last_remote_sdp = Some(answer.clone());
594                }
595                Err(e) => {
596                    if self.rtc_config.mode == TransportMode::Rtp {
597                        info!(track_id=%self.track_id, "set_remote_description failed ({}), attempting to re-sync state for SIP update", e);
598
599                        if let Some(current_local) = pc.local_description() {
600                            let sdp = current_local.to_sdp_string();
601                            for line in sdp.lines() {
602                                if line.starts_with("a=ssrc:") {
603                                    info!(track_id=%self.track_id, "SSRC before re-sync: {}", line);
604                                }
605                            }
606                        }
607
608                        let offer = pc.create_offer().await?;
609
610                        let sdp = offer.to_sdp_string();
611                        for line in sdp.lines() {
612                            if line.starts_with("a=ssrc:") {
613                                info!(track_id=%self.track_id, "SSRC in new offer (re-sync): {}", line);
614                            }
615                        }
616
617                        pc.set_local_description(offer)?;
618                        pc.set_remote_description(sdp_obj).await?;
619                        self.last_remote_sdp = Some(answer.clone());
620                        info!(track_id=%self.track_id, "successfully re-synced WebRTC state for SIP update");
621                    } else {
622                        return Err(e.into());
623                    }
624                }
625            }
626
627            // Track events will be handled by the event loop after SSRC latching
628
629            // Extract negotiated payload types from SDP string
630            self.parse_sdp_payload_types(rustrtc::SdpType::Answer, answer)?;
631        }
632        Ok(())
633    }
634}
635
636#[async_trait]
637impl Track for RtcTrack {
638    fn ssrc(&self) -> u32 {
639        self.ssrc
640    }
641    fn id(&self) -> &TrackId {
642        &self.track_id
643    }
644    fn config(&self) -> &TrackConfig {
645        &self.track_config
646    }
647    fn processor_chain(&mut self) -> &mut ProcessorChain {
648        &mut self.processor_chain
649    }
650
651    async fn handshake(&mut self, offer: String, _: Option<Duration>) -> Result<String> {
652        info!(track_id=%self.track_id, "rtc handshake start");
653        self.create().await?;
654
655        let pc = self.peer_connection.clone().ok_or_else(|| {
656            anyhow::anyhow!("No PeerConnection available for track {}", self.track_id)
657        })?;
658
659        debug!(track_id=%self.track_id, "Before set_remote_description: transceivers count = {}", pc.get_transceivers().len());
660        for (i, t) in pc.get_transceivers().iter().enumerate() {
661            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}",
662                i, t.kind(), t.mid(), t.direction());
663        }
664
665        let sdp = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer)?;
666        pc.set_remote_description(sdp.clone()).await?;
667
668        debug!(track_id=%self.track_id, "After set_remote_description: transceivers count = {}", pc.get_transceivers().len());
669        for (i, t) in pc.get_transceivers().iter().enumerate() {
670            debug!(track_id=%self.track_id, "  Transceiver #{}: kind={:?}, mid={:?}, direction={:?}, has_receiver={}",
671                i, t.kind(), t.mid(), t.direction(), t.receiver().is_some());
672        }
673
674        // For RTP mode: Wait for PeerConnectionEvent::Track after SSRC latching completes
675        // For WebRTC mode: The event loop will handle Track events
676        info!(track_id=%self.track_id, "Waiting for Track events (SSRC latching for RTP mode)");
677
678        self.parse_sdp_payload_types(rustrtc::SdpType::Offer, &offer)?;
679
680        let mut answer = pc.create_answer().await?;
681        crate::media::negotiate::intersect_answer(&sdp, &mut answer);
682
683        pc.set_local_description(answer.clone())?;
684
685        if self.rtc_config.mode != TransportMode::Rtp {
686            pc.wait_for_gathering_complete().await;
687        }
688
689        let final_answer = pc
690            .local_description()
691            .ok_or(anyhow::anyhow!("No local description"))?;
692
693        Ok(final_answer.to_sdp_string())
694    }
695
696    async fn update_remote_description(&mut self, answer: &String) -> Result<()> {
697        self.update_remote_description_internal(answer, false).await
698    }
699
700    async fn update_remote_description_force(&mut self, answer: &String) -> Result<()> {
701        self.update_remote_description_internal(answer, true).await
702    }
703
704    async fn start(
705        &mut self,
706        event_sender: EventSender,
707        packet_sender: TrackPacketSender,
708    ) -> Result<()> {
709        *self.packet_sender.lock().await = Some(packet_sender.clone());
710        let token_clone = self.cancel_token.clone();
711        let event_sender_clone = event_sender.clone();
712        let track_id = self.track_id.clone();
713        let ssrc = self.ssrc;
714
715        if self.rtc_config.mode != TransportMode::Rtp {
716            let start_time = crate::media::get_timestamp();
717            crate::spawn(async move {
718                token_clone.cancelled().await;
719                let _ = event_sender_clone.send(SessionEvent::TrackEnd {
720                    track_id,
721                    timestamp: crate::media::get_timestamp(),
722                    duration: crate::media::get_timestamp() - start_time,
723                    ssrc,
724                    play_id: None,
725                });
726            });
727        }
728
729        Ok(())
730    }
731
732    async fn stop(&self) -> Result<()> {
733        self.cancel_token.cancel();
734        if let Some(pc) = &self.peer_connection {
735            pc.close();
736        }
737        Ok(())
738    }
739
740    async fn send_packet(&mut self, packet: &AudioFrame) -> Result<()> {
741        let packet = packet.clone();
742
743        if let Some(source) = &self.local_source {
744            match &packet.samples {
745                crate::media::Samples::PCM { samples } => {
746                    let payload_type = self.get_payload_type();
747                    let (_, encoded) = self.encoder.encode(payload_type, packet.clone());
748                    let target_codec = CodecType::try_from(payload_type)?;
749                    if !encoded.is_empty() {
750                        let clock_rate = target_codec.clock_rate();
751
752                        let now = Instant::now();
753                        if let Some(last_time) = self.last_packet_time {
754                            let elapsed = now.duration_since(last_time);
755                            if elapsed.as_millis() > 50 {
756                                let gap_increment =
757                                    (elapsed.as_millis() as u32 * clock_rate) / 1000;
758                                self.next_rtp_timestamp += gap_increment;
759                                self.need_marker = true;
760                            }
761                        }
762
763                        self.last_packet_time = Some(now);
764
765                        let timestamp_increment = (samples.len() as u64 * clock_rate as u64
766                            / packet.sample_rate as u64
767                            / self.track_config.channels as u64)
768                            as u32;
769                        let rtp_timestamp = self.next_rtp_timestamp;
770                        self.next_rtp_timestamp += timestamp_increment;
771                        let sequence_number = self.next_rtp_sequence_number;
772                        self.next_rtp_sequence_number += 1;
773
774                        let mut marker = false;
775                        if self.need_marker {
776                            marker = true;
777                            self.need_marker = false;
778                        }
779
780                        let frame = RtcAudioFrame {
781                            data: Bytes::from(encoded),
782                            clock_rate,
783                            payload_type: Some(payload_type),
784                            sequence_number: Some(sequence_number),
785                            rtp_timestamp,
786                            marker,
787                            ..Default::default()
788                        };
789                        source.try_send_audio(frame).ok();
790                    }
791                }
792                crate::media::Samples::RTP {
793                    payload,
794                    payload_type,
795                    sequence_number,
796                } => {
797                    let target_codec = CodecType::try_from(*payload_type)?;
798                    let clock_rate = target_codec.clock_rate();
799
800                    let now = Instant::now();
801                    if let Some(last_time) = self.last_packet_time {
802                        let elapsed = now.duration_since(last_time);
803                        if elapsed.as_millis() > 50 {
804                            let gap_increment = (elapsed.as_millis() as u32 * clock_rate) / 1000;
805                            self.next_rtp_timestamp += gap_increment;
806                            self.need_marker = true;
807                        }
808                    }
809                    self.last_packet_time = Some(now);
810
811                    let increment = match *payload_type {
812                        0 | 8 | 18 => payload.len() as u32,
813                        9 => payload.len() as u32,
814                        111 => (clock_rate / 50) as u32,
815                        _ => (clock_rate / 50) as u32,
816                    };
817
818                    let rtp_timestamp = self.next_rtp_timestamp;
819                    self.next_rtp_timestamp += increment;
820                    let sequence_number = *sequence_number;
821
822                    let mut marker = false;
823                    if self.need_marker {
824                        marker = true;
825                        self.need_marker = false;
826                    }
827
828                    let frame = RtcAudioFrame {
829                        data: Bytes::from(payload.clone()),
830                        clock_rate,
831                        payload_type: Some(*payload_type),
832                        sequence_number: Some(sequence_number),
833                        rtp_timestamp,
834                        marker,
835                        ..Default::default()
836                    };
837                    source.try_send_audio(frame).ok();
838                }
839                _ => {}
840            }
841        }
842        Ok(())
843    }
844}
845
846impl RtcTrack {
847    fn get_payload_type(&self) -> u8 {
848        if let Some(pt) = self.payload_type {
849            return pt;
850        }
851
852        self.rtc_config.payload_type.unwrap_or_else(|| {
853            match self.rtc_config.preferred_codec.unwrap_or(CodecType::G722) {
854                CodecType::PCMU => 0,
855                CodecType::PCMA => 8,
856                #[cfg(feature = "opus")]
857                CodecType::Opus => 111,
858                CodecType::G722 => 9,
859                CodecType::G729 => 18,
860                _ => 111,
861            }
862        })
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869    use crate::media::track::TrackConfig;
870
871    #[test]
872    fn test_parse_sdp_payload_types() {
873        let track_id = "test-track".to_string();
874        let cancel_token = CancellationToken::new();
875        let mut track = RtcTrack::new(
876            cancel_token,
877            track_id,
878            TrackConfig::default(),
879            RtcTrackConfig::default(),
880        );
881
882        // Case 1: Multiple audio codecs, telephone-event at the end. Primary should be PCMA (8)
883        let sdp1 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 8 0 101\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
884        track
885            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp1)
886            .expect("parse offer");
887        assert_eq!(track.get_payload_type(), 8);
888
889        // Case 2: telephone-event at the beginning, should skip it and pick PCMU (0)
890        let mut rtc_config = RtcTrackConfig::default();
891        rtc_config.preferred_codec = Some(CodecType::PCMU);
892        let mut track2 = RtcTrack::new(
893            CancellationToken::new(),
894            "test-track-2".to_string(),
895            TrackConfig::default(),
896            rtc_config,
897        );
898
899        let sdp2 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 101 0 8\r\na=rtpmap:101 telephone-event/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\n";
900        track2
901            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp2)
902            .expect("parse offer");
903        assert_eq!(track2.get_payload_type(), 0);
904
905        // Case 3: Opus with dynamic payload type 111
906        let sdp3 = "v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 1234 RTP/AVP 111 101\r\na=rtpmap:111 opus/48000/2\r\na=rtpmap:101 telephone-event/8000\r\n";
907        track
908            .parse_sdp_payload_types(rustrtc::SdpType::Offer, sdp3)
909            .expect("parse offer");
910        assert_eq!(track.get_payload_type(), 111);
911    }
912
913    #[tokio::test]
914    async fn test_rtp_mode_handshake_spawns_handler() {
915        use rustrtc::TransportMode;
916
917        let track_id = "test-track-sip".to_string();
918        let cancel = CancellationToken::new();
919        let track_config = TrackConfig::default();
920        let mut rtc_config = RtcTrackConfig::default();
921        rtc_config.mode = TransportMode::Rtp;
922
923        let mut track = RtcTrack::new(cancel, track_id, track_config, rtc_config);
924
925        // Standard SIP/SDP offer
926        let offer = "v=0\r\n\
927o=- 123456 123456 IN IP4 172.0.0.1\r\n\
928s=-\r\n\
929c=IN IP4 172.0.0.1\r\n\
930t=0 0\r\n\
931m=audio 10000 RTP/AVP 0 101\r\n\
932a=rtpmap:0 PCMU/8000\r\n\
933a=rtpmap:101 telephone-event/8000\r\n\
934a=sendrecv\r\n";
935
936        // This should not panic and should set up the transceiver
937        let res = track.handshake(offer.to_string(), None).await;
938        assert!(res.is_ok());
939
940        // We can inspect the PeerConnection to ensure it has a transceiver with a receiver
941        if let Some(pc) = &track.peer_connection {
942            let transceivers = pc.get_transceivers();
943            // With the fix, we expect the logic to have iterated these transceivers.
944            // In RTP/Receive mode, we should have 1 transceiver with a receiver.
945            assert_eq!(transceivers.len(), 1);
946            assert!(transceivers[0].receiver().is_some());
947        } else {
948            panic!("PeerConnection not initialized");
949        }
950    }
951}