Skip to main content

rtp_engine/
session.rs

1//! Media session management.
2//!
3//! The `MediaSession` struct orchestrates the complete audio pipeline:
4//! microphone capture → encoding → RTP transmission → reception → decoding → speaker playback.
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tokio::net::UdpSocket;
10
11use crate::codec::{AudioDecoder, AudioEncoder, CodecType, create_decoder, create_encoder};
12use crate::error::{Error, Result};
13use crate::resample::{f32_to_i16, i16_to_f32, resample_linear};
14use crate::rtp::{
15    RtpCounters, RtpHeader, RtpStats, build_rtcp_rr, build_rtcp_sr, parse_rtp, parse_sequence,
16    parse_timestamp,
17};
18
19#[cfg(feature = "srtp")]
20use crate::srtp::SrtpContext;
21
22#[cfg(feature = "device")]
23use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
24
25type SharedSrtp = Arc<std::sync::Mutex<SrtpContext>>;
26
27/// A complete RTP media session with bidirectional audio.
28///
29/// Manages:
30/// - Audio capture from microphone
31/// - Encoding with the configured codec
32/// - RTP packet transmission
33/// - RTP packet reception
34/// - Decoding and playback to speaker
35/// - RTCP statistics reporting
36pub struct MediaSession {
37    muted: Arc<AtomicBool>,
38    running: Arc<AtomicBool>,
39    counters: RtpCounters,
40    codec: CodecType,
41    learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>>,
42    rtp_socket: Arc<UdpSocket>,
43    ssrc: u32,
44    remote_addr: SocketAddr,
45}
46
47impl MediaSession {
48    /// Start a media session with the specified codec.
49    ///
50    /// # Arguments
51    /// * `local_rtp_port` - Local UDP port for RTP
52    /// * `remote_addr` - Remote RTP endpoint address
53    /// * `codec_type` - Audio codec to use
54    pub async fn start(
55        local_rtp_port: u16,
56        remote_addr: SocketAddr,
57        codec_type: CodecType,
58    ) -> Result<Self> {
59        Self::start_internal(local_rtp_port, remote_addr, codec_type, None).await
60    }
61
62    /// Start a media session with SRTP encryption.
63    #[cfg(feature = "srtp")]
64    pub async fn start_with_srtp(
65        local_rtp_port: u16,
66        remote_addr: SocketAddr,
67        codec_type: CodecType,
68        srtp_ctx: SrtpContext,
69    ) -> Result<Self> {
70        Self::start_internal(local_rtp_port, remote_addr, codec_type, Some(srtp_ctx)).await
71    }
72
73    async fn start_internal(
74        local_rtp_port: u16,
75        remote_addr: SocketAddr,
76        codec_type: CodecType,
77        #[allow(unused_variables)] srtp_ctx: Option<SrtpContext>,
78    ) -> Result<Self> {
79        let rtp_socket = UdpSocket::bind(format!("0.0.0.0:{}", local_rtp_port))
80            .await
81            .map_err(|e| Error::Network(e))?;
82
83        let rtp_socket = Arc::new(rtp_socket);
84        let muted = Arc::new(AtomicBool::new(false));
85        let running = Arc::new(AtomicBool::new(true));
86        let ssrc: u32 = rand::random();
87        let counters = RtpCounters::new(codec_type.name());
88        let learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>> =
89            Arc::new(std::sync::Mutex::new(None));
90
91        let encoder = create_encoder(codec_type)?;
92        let decoder = create_decoder(codec_type)?;
93
94        #[cfg(feature = "srtp")]
95        let shared_srtp: Option<SharedSrtp> =
96            srtp_ctx.map(|ctx| Arc::new(std::sync::Mutex::new(ctx)));
97        #[cfg(not(feature = "srtp"))]
98        let shared_srtp: Option<SharedSrtp> = None;
99
100        // RTCP socket (RTP port + 1)
101        let rtcp_port = local_rtp_port + 1;
102        let rtcp_socket = UdpSocket::bind(format!("0.0.0.0:{}", rtcp_port))
103            .await
104            .map_err(|e| Error::Network(e))?;
105        let rtcp_socket = Arc::new(rtcp_socket);
106        let remote_rtcp_addr: SocketAddr =
107            format!("{}:{}", remote_addr.ip(), remote_addr.port() + 1)
108                .parse()
109                .unwrap_or(remote_addr);
110
111        // Start TX thread (microphone → RTP)
112        #[cfg(feature = "device")]
113        {
114            let tx_socket = rtp_socket.clone();
115            let tx_muted = muted.clone();
116            let tx_running = running.clone();
117            let tx_counters = counters.clone();
118            let tx_learned = learned_remote.clone();
119            let tx_srtp = shared_srtp.clone();
120
121            std::thread::spawn(move || {
122                if let Err(e) = run_audio_tx(
123                    tx_socket,
124                    remote_addr,
125                    ssrc,
126                    tx_muted,
127                    tx_running,
128                    encoder,
129                    tx_counters,
130                    tx_learned,
131                    tx_srtp,
132                ) {
133                    log::error!("Audio TX error: {}", e);
134                }
135            });
136        }
137
138        // Start RX thread (RTP → speaker)
139        #[cfg(feature = "device")]
140        {
141            let rx_socket = rtp_socket.clone();
142            let rx_running = running.clone();
143            let rx_counters = counters.clone();
144            let rx_learned = learned_remote.clone();
145            let rx_srtp = shared_srtp.clone();
146
147            std::thread::spawn(move || {
148                if let Err(e) = run_audio_rx(
149                    rx_socket,
150                    rx_running,
151                    decoder,
152                    rx_counters,
153                    rx_learned,
154                    rx_srtp,
155                ) {
156                    log::error!("Audio RX error: {}", e);
157                }
158            });
159        }
160
161        // Start RTCP task
162        {
163            let rtcp_running = running.clone();
164            let rtcp_counters = counters.clone();
165            let rtcp_srtp = shared_srtp;
166            tokio::spawn(async move {
167                run_rtcp(
168                    rtcp_socket,
169                    remote_rtcp_addr,
170                    ssrc,
171                    rtcp_running,
172                    rtcp_counters,
173                    rtcp_srtp,
174                )
175                .await;
176            });
177        }
178
179        log::info!(
180            "Media session started: local RTP :{}, remote {}, codec {:?}",
181            local_rtp_port,
182            remote_addr,
183            codec_type,
184        );
185
186        Ok(Self {
187            muted,
188            running,
189            counters,
190            codec: codec_type,
191            learned_remote,
192            rtp_socket,
193            ssrc,
194            remote_addr,
195        })
196    }
197
198    /// Send an RFC 2833 DTMF digit.
199    pub fn send_dtmf(&self, digit: &str) {
200        let event_code: u8 = match digit {
201            "0" => 0,
202            "1" => 1,
203            "2" => 2,
204            "3" => 3,
205            "4" => 4,
206            "5" => 5,
207            "6" => 6,
208            "7" => 7,
209            "8" => 8,
210            "9" => 9,
211            "*" => 10,
212            "#" => 11,
213            _ => {
214                log::warn!("Unknown DTMF digit: {}", digit);
215                return;
216            }
217        };
218
219        let socket = self.rtp_socket.clone();
220        let ssrc = self.ssrc;
221        let dest = self
222            .learned_remote
223            .lock()
224            .ok()
225            .and_then(|g| *g)
226            .unwrap_or(self.remote_addr);
227        let counters = self.counters.clone();
228
229        tokio::spawn(async move {
230            let base_ts: u32 = rand::random();
231            let base_seq: u16 = rand::random();
232            let volume: u8 = 10;
233            let pt: u8 = 101;
234            let durations = [160u16, 320, 480];
235
236            for (i, &duration) in durations.iter().enumerate() {
237                let is_end = i == durations.len() - 1;
238                let seq = base_seq.wrapping_add(i as u16);
239
240                let mut packet = Vec::with_capacity(16);
241                packet.push(0x80);
242                let marker = if i == 0 { 0x80 } else { 0x00 };
243                packet.push(pt | marker);
244                packet.extend_from_slice(&seq.to_be_bytes());
245                packet.extend_from_slice(&base_ts.to_be_bytes());
246                packet.extend_from_slice(&ssrc.to_be_bytes());
247
248                let end_flag: u8 = if is_end { 0x80 } else { 0x00 };
249                packet.push(event_code);
250                packet.push(end_flag | (volume & 0x3F));
251                packet.extend_from_slice(&duration.to_be_bytes());
252
253                let _ = socket.send_to(&packet, dest).await;
254                counters.record_sent(packet.len() as u64);
255
256                if is_end {
257                    for _ in 0..2 {
258                        let repeat_seq = seq.wrapping_add(1);
259                        packet[2..4].copy_from_slice(&repeat_seq.to_be_bytes());
260                        let _ = socket.send_to(&packet, dest).await;
261                    }
262                }
263
264                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
265            }
266        });
267    }
268
269    /// Set mute state.
270    pub fn set_mute(&self, mute: bool) {
271        self.muted.store(mute, Ordering::Relaxed);
272    }
273
274    /// Check if muted.
275    pub fn is_muted(&self) -> bool {
276        self.muted.load(Ordering::Relaxed)
277    }
278
279    /// Get current statistics.
280    pub fn stats(&self) -> RtpStats {
281        self.counters.snapshot()
282    }
283
284    /// Get the codec in use.
285    pub fn codec(&self) -> CodecType {
286        self.codec
287    }
288
289    /// Get the SSRC.
290    pub fn ssrc(&self) -> u32 {
291        self.ssrc
292    }
293
294    /// Get the remote address.
295    pub fn remote_addr(&self) -> SocketAddr {
296        self.remote_addr
297    }
298
299    /// Get the learned remote address (for symmetric RTP/comedia).
300    pub fn learned_remote(&self) -> Option<SocketAddr> {
301        self.learned_remote.lock().ok().and_then(|g| *g)
302    }
303
304    /// Stop the media session.
305    pub fn stop(&self) {
306        self.running.store(false, Ordering::Relaxed);
307        log::info!("Media session stopped");
308    }
309}
310
311impl Drop for MediaSession {
312    fn drop(&mut self) {
313        self.stop();
314    }
315}
316
317impl std::fmt::Debug for MediaSession {
318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        f.debug_struct("MediaSession")
320            .field("codec", &self.codec)
321            .field("ssrc", &self.ssrc)
322            .field("remote_addr", &self.remote_addr)
323            .field("muted", &self.muted.load(Ordering::Relaxed))
324            .field("running", &self.running.load(Ordering::Relaxed))
325            .finish()
326    }
327}
328
329// --- Audio TX/RX implementation ---
330
331#[cfg(feature = "device")]
332fn run_audio_tx(
333    socket: Arc<UdpSocket>,
334    remote: SocketAddr,
335    ssrc: u32,
336    muted: Arc<AtomicBool>,
337    running: Arc<AtomicBool>,
338    encoder: Box<dyn AudioEncoder>,
339    counters: RtpCounters,
340    learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>>,
341    _srtp: Option<SharedSrtp>,
342) -> Result<()> {
343    use std::sync::atomic::AtomicU16;
344
345    let host = cpal::default_host();
346    let device = host
347        .default_input_device()
348        .ok_or_else(|| Error::device("No input device"))?;
349
350    let default_config = device
351        .default_input_config()
352        .map_err(|e| Error::device(format!("No input config: {}", e)))?;
353
354    let native_rate = default_config.sample_rate();
355    log::info!("Audio TX: native rate = {} Hz", native_rate);
356
357    let config = cpal::StreamConfig {
358        channels: 1,
359        sample_rate: default_config.sample_rate(),
360        buffer_size: cpal::BufferSize::Default,
361    };
362
363    let codec_rate = 8000u32;
364    let resample_ratio = codec_rate as f64 / native_rate as f64;
365
366    let rt = tokio::runtime::Handle::current();
367    let seq = Arc::new(AtomicU16::new(0));
368    let ts = Arc::new(std::sync::atomic::AtomicU32::new(0));
369    let pt = encoder.payload_type();
370    let encoder = Arc::new(std::sync::Mutex::new(encoder));
371    let sample_buffer = Arc::new(std::sync::Mutex::new(Vec::<f32>::with_capacity(1024)));
372    let samples_per_frame = 160usize;
373
374    let cb_running = running.clone();
375    let stream = device
376        .build_input_stream(
377            &config,
378            move |data: &[f32], _: &cpal::InputCallbackInfo| {
379                if !cb_running.load(Ordering::Relaxed) || muted.load(Ordering::Relaxed) {
380                    return;
381                }
382
383                let mut buffer = match sample_buffer.lock() {
384                    Ok(b) => b,
385                    Err(_) => return,
386                };
387                buffer.extend_from_slice(data);
388
389                let native_samples_per_frame =
390                    ((samples_per_frame as f64) / resample_ratio).ceil() as usize;
391
392                while buffer.len() >= native_samples_per_frame {
393                    let chunk: Vec<f32> = buffer.drain(..native_samples_per_frame).collect();
394                    let resampled = resample_linear(&chunk, native_rate, codec_rate);
395                    let pcm = f32_to_i16(&resampled);
396
397                    let current_seq = seq.fetch_add(1, Ordering::Relaxed);
398                    let current_ts = ts.fetch_add(samples_per_frame as u32, Ordering::Relaxed);
399
400                    let header = RtpHeader::new(pt, current_seq, current_ts, ssrc);
401                    let mut packet = header.to_bytes();
402
403                    if let Ok(mut enc) = encoder.lock() {
404                        enc.encode(&pcm, &mut packet);
405                    }
406
407                    #[cfg(feature = "srtp")]
408                    let send_packet = if let Some(ref srtp_ctx) = _srtp {
409                        match srtp_ctx.lock() {
410                            Ok(mut ctx) => match ctx.protect_rtp(&packet) {
411                                Ok(encrypted) => encrypted,
412                                Err(e) => {
413                                    log::error!("SRTP protect failed: {}", e);
414                                    continue;
415                                }
416                            },
417                            Err(_) => packet,
418                        }
419                    } else {
420                        packet
421                    };
422
423                    #[cfg(not(feature = "srtp"))]
424                    let send_packet = packet;
425
426                    counters.record_sent(send_packet.len() as u64);
427
428                    let dest = learned_remote
429                        .lock()
430                        .ok()
431                        .and_then(|g| *g)
432                        .unwrap_or(remote);
433                    let socket = socket.clone();
434                    rt.spawn(async move {
435                        let _ = socket.send_to(&send_packet, dest).await;
436                    });
437                }
438            },
439            |err| log::error!("Audio input error: {}", err),
440            None,
441        )
442        .map_err(|e| Error::device(format!("Failed to build input stream: {}", e)))?;
443
444    stream
445        .play()
446        .map_err(|e| Error::device(format!("Failed to start input: {}", e)))?;
447
448    while running.load(Ordering::Relaxed) {
449        std::thread::sleep(std::time::Duration::from_millis(50));
450    }
451
452    drop(stream);
453    Ok(())
454}
455
456#[cfg(feature = "device")]
457fn run_audio_rx(
458    socket: Arc<UdpSocket>,
459    running: Arc<AtomicBool>,
460    mut decoder: Box<dyn AudioDecoder>,
461    counters: RtpCounters,
462    learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>>,
463    _srtp: Option<SharedSrtp>,
464) -> Result<()> {
465    use std::collections::VecDeque;
466
467    let host = cpal::default_host();
468    let device = host
469        .default_output_device()
470        .ok_or_else(|| Error::device("No output device"))?;
471
472    let default_config = device
473        .default_output_config()
474        .map_err(|e| Error::device(format!("No output config: {}", e)))?;
475
476    let native_rate = default_config.sample_rate();
477    log::info!("Audio RX: native rate = {} Hz", native_rate);
478
479    let config = cpal::StreamConfig {
480        channels: 1,
481        sample_rate: default_config.sample_rate(),
482        buffer_size: cpal::BufferSize::Default,
483    };
484
485    let codec_rate = 8000u32;
486
487    let sample_buffer: Arc<std::sync::Mutex<VecDeque<f32>>> = Arc::new(std::sync::Mutex::new(
488        VecDeque::with_capacity(native_rate as usize),
489    ));
490    let rx_buffer = sample_buffer.clone();
491
492    let stream = device
493        .build_output_stream(
494            &config,
495            move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
496                if let Ok(mut buffer) = rx_buffer.lock() {
497                    for out in data.iter_mut() {
498                        *out = buffer.pop_front().unwrap_or(0.0);
499                    }
500                } else {
501                    for out in data.iter_mut() {
502                        *out = 0.0;
503                    }
504                }
505            },
506            |err| log::error!("Audio output error: {}", err),
507            None,
508        )
509        .map_err(|e| Error::device(format!("Failed to build output stream: {}", e)))?;
510
511    stream
512        .play()
513        .map_err(|e| Error::device(format!("Failed to start output: {}", e)))?;
514
515    let rt = tokio::runtime::Builder::new_current_thread()
516        .enable_all()
517        .build()
518        .map_err(|e| Error::device(format!("Failed to create runtime: {}", e)))?;
519
520    rt.block_on(async {
521        let mut buf = [0u8; 2048];
522        let mut last_transit: Option<i64> = None;
523        let mut first_seq: Option<u16> = None;
524
525        while running.load(Ordering::Relaxed) {
526            let recv = tokio::time::timeout(
527                std::time::Duration::from_millis(100),
528                socket.recv_from(&mut buf),
529            )
530            .await;
531
532            match recv {
533                Ok(Ok((len, from_addr))) => {
534                    // Learn remote address for symmetric RTP
535                    if let Ok(mut lr) = learned_remote.lock()
536                        && lr.is_none()
537                    {
538                        log::info!("Comedia: learned remote RTP address {}", from_addr);
539                        *lr = Some(from_addr);
540                    }
541
542                    #[cfg(feature = "srtp")]
543                    let rtp_data: Vec<u8> = if let Some(ref srtp_ctx) = _srtp {
544                        match srtp_ctx.lock() {
545                            Ok(mut ctx) => match ctx.unprotect_rtp(&buf[..len]) {
546                                Ok(decrypted) => decrypted,
547                                Err(e) => {
548                                    log::warn!("SRTP unprotect failed: {}", e);
549                                    continue;
550                                }
551                            },
552                            Err(_) => buf[..len].to_vec(),
553                        }
554                    } else {
555                        buf[..len].to_vec()
556                    };
557
558                    #[cfg(not(feature = "srtp"))]
559                    let rtp_data: Vec<u8> = buf[..len].to_vec();
560
561                    // Track stats
562                    if let Some(seq) = parse_sequence(&rtp_data) {
563                        if first_seq.is_none() {
564                            first_seq = Some(seq);
565                        }
566                        counters.record_received(len as u64, seq);
567                    }
568
569                    // Jitter calculation
570                    if let Some(rtp_ts) = parse_timestamp(&rtp_data) {
571                        let arrival = std::time::SystemTime::now()
572                            .duration_since(std::time::UNIX_EPOCH)
573                            .unwrap_or_default()
574                            .as_micros() as i64;
575                        let transit = arrival - (rtp_ts as i64 * 125);
576                        if let Some(prev) = last_transit {
577                            let d = (transit - prev).unsigned_abs();
578                            counters.update_jitter(d);
579                        }
580                        last_transit = Some(transit);
581                    }
582
583                    // Decode and play
584                    if let Some((_, payload)) = parse_rtp(&rtp_data) {
585                        let mut pcm = Vec::with_capacity(payload.len());
586                        decoder.decode(payload, &mut pcm);
587
588                        let f32_samples = i16_to_f32(&pcm);
589                        let resampled = resample_linear(&f32_samples, codec_rate, native_rate);
590
591                        if let Ok(mut buffer) = sample_buffer.lock() {
592                            for s in resampled {
593                                buffer.push_back(s);
594                            }
595                            while buffer.len() > native_rate as usize {
596                                buffer.pop_front();
597                            }
598                        }
599                    }
600                }
601                Ok(Err(e)) => {
602                    log::error!("RTP recv error: {}", e);
603                }
604                Err(_) => {} // Timeout
605            }
606        }
607    });
608
609    drop(stream);
610    Ok(())
611}
612
613async fn run_rtcp(
614    socket: Arc<UdpSocket>,
615    remote_addr: SocketAddr,
616    ssrc: u32,
617    running: Arc<AtomicBool>,
618    counters: RtpCounters,
619    _srtp: Option<SharedSrtp>,
620) {
621    let mut remote_ssrc: u32 = 0;
622    let mut buf = [0u8; 512];
623
624    while running.load(Ordering::Relaxed) {
625        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
626        if !running.load(Ordering::Relaxed) {
627            break;
628        }
629
630        // Send Sender Report
631        let stats = counters.snapshot();
632        let sr = build_rtcp_sr(ssrc, stats.packets_sent as u32, stats.bytes_sent as u32);
633
634        #[cfg(feature = "srtp")]
635        let sr_to_send = if let Some(ref srtp_ctx) = _srtp {
636            match srtp_ctx.lock() {
637                Ok(mut ctx) => ctx.protect_rtcp(&sr).unwrap_or(sr),
638                Err(_) => sr,
639            }
640        } else {
641            sr
642        };
643
644        #[cfg(not(feature = "srtp"))]
645        let sr_to_send = sr;
646
647        let _ = socket.send_to(&sr_to_send, remote_addr).await;
648
649        // Send Receiver Report if we know remote SSRC
650        if remote_ssrc != 0 {
651            let received = stats.packets_received;
652            let expected = counters.expected_packets.load(Ordering::Relaxed);
653            let lost = expected.saturating_sub(received);
654            let loss_fraction = if expected > 0 {
655                ((lost * 256) / expected) as u8
656            } else {
657                0
658            };
659            let rr = build_rtcp_rr(
660                ssrc,
661                remote_ssrc,
662                loss_fraction,
663                lost as u32,
664                counters.highest_seq.load(Ordering::Relaxed),
665                (counters.jitter_us.load(Ordering::Relaxed) / 125) as u32,
666            );
667
668            #[cfg(feature = "srtp")]
669            let rr_to_send = if let Some(ref srtp_ctx) = _srtp {
670                match srtp_ctx.lock() {
671                    Ok(mut ctx) => ctx.protect_rtcp(&rr).unwrap_or(rr),
672                    Err(_) => rr,
673                }
674            } else {
675                rr
676            };
677
678            #[cfg(not(feature = "srtp"))]
679            let rr_to_send = rr;
680
681            let _ = socket.send_to(&rr_to_send, remote_addr).await;
682        }
683
684        // Receive RTCP
685        if let Ok(Ok((len, _))) = tokio::time::timeout(
686            std::time::Duration::from_millis(50),
687            socket.recv_from(&mut buf),
688        )
689        .await
690        {
691            #[cfg(feature = "srtp")]
692            let rtcp_data: Vec<u8> = if let Some(ref srtp_ctx) = _srtp {
693                match srtp_ctx.lock() {
694                    Ok(mut ctx) => ctx
695                        .unprotect_rtcp(&buf[..len])
696                        .unwrap_or_else(|_| buf[..len].to_vec()),
697                    Err(_) => buf[..len].to_vec(),
698                }
699            } else {
700                buf[..len].to_vec()
701            };
702
703            #[cfg(not(feature = "srtp"))]
704            let rtcp_data: Vec<u8> = buf[..len].to_vec();
705
706            if rtcp_data.len() >= 8 && (rtcp_data[1] == 200 || rtcp_data[1] == 201) {
707                remote_ssrc =
708                    u32::from_be_bytes([rtcp_data[4], rtcp_data[5], rtcp_data[6], rtcp_data[7]]);
709            }
710        }
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use std::net::{IpAddr, Ipv4Addr};
718
719    #[test]
720    fn test_codec_type_properties() {
721        // Test that codec type constants are correct
722        assert_eq!(CodecType::Pcmu.payload_type(), 0);
723        assert_eq!(CodecType::Pcma.payload_type(), 8);
724        assert_eq!(CodecType::Pcmu.clock_rate(), 8000);
725        assert_eq!(CodecType::Pcmu.samples_per_frame(), 160);
726    }
727
728    #[tokio::test]
729    async fn test_media_session_start_invalid_port() {
730        // Try to bind to a privileged port (requires root)
731        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
732        let result = MediaSession::start(80, remote, CodecType::Pcmu).await;
733
734        // Should fail on non-root systems
735        // This tests error handling path
736        if result.is_err() {
737            assert!(matches!(result, Err(Error::Network(_))));
738        }
739    }
740
741    #[tokio::test]
742    async fn test_media_session_basic_creation() {
743        // Use a random high port to avoid conflicts
744        let port = 50000 + (rand::random::<u16>() % 10000);
745        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
746
747        // This will fail in CI without audio devices, but tests the creation path
748        let result = MediaSession::start(port, remote, CodecType::Pcmu).await;
749
750        // In environments without audio, this fails at device setup
751        // In environments with audio, it succeeds
752        // Either way, we're testing the code path
753        match result {
754            Ok(session) => {
755                // Session created successfully
756                assert!(!session.is_muted());
757                session.stop();
758            }
759            Err(e) => {
760                // Expected on CI without audio devices
761                assert!(
762                    matches!(e, Error::Device(_)) || matches!(e, Error::Network(_)),
763                    "Unexpected error type: {:?}",
764                    e
765                );
766            }
767        }
768    }
769
770    #[test]
771    fn test_rtp_counters_initialization() {
772        let counters = RtpCounters::new("PCMU");
773        let stats = counters.snapshot();
774
775        assert_eq!(stats.packets_sent, 0);
776        assert_eq!(stats.bytes_sent, 0);
777        assert_eq!(stats.packets_received, 0);
778        assert_eq!(stats.packets_lost, 0);
779    }
780
781    #[test]
782    fn test_socket_addr_creation() {
783        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5060);
784        assert_eq!(addr.port(), 5060);
785        assert_eq!(addr.ip(), IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)));
786    }
787
788    #[test]
789    fn test_create_encoder_decoder() {
790        // Test encoder creation
791        let encoder = create_encoder(CodecType::Pcmu);
792        assert!(encoder.is_ok());
793
794        let encoder = create_encoder(CodecType::Pcma);
795        assert!(encoder.is_ok());
796
797        // Test decoder creation
798        let decoder = create_decoder(CodecType::Pcmu);
799        assert!(decoder.is_ok());
800
801        let decoder = create_decoder(CodecType::Pcma);
802        assert!(decoder.is_ok());
803    }
804
805    #[cfg(feature = "srtp")]
806    #[test]
807    fn test_srtp_context_for_session() {
808        use crate::srtp::SrtpContext;
809
810        let (_ctx, key) = SrtpContext::generate().unwrap();
811        assert!(!key.is_empty());
812
813        // Context should be able to protect/unprotect
814        let mut test_rtp = vec![
815            0x80, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0xA0, 0x12, 0x34, 0x56, 0x78,
816        ];
817        test_rtp.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
818
819        let mut ctx_clone = SrtpContext::from_base64(&key).unwrap();
820        let protected = ctx_clone.protect_rtp(&test_rtp);
821        assert!(protected.is_ok());
822    }
823}