1use std::net::SocketAddr;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tokio::net::UdpSocket;
10
11use crate::codec::{AudioDecoder, AudioEncoder, CodecType, create_decoder, create_encoder};
12use crate::error::{Error, Result};
13use crate::resample::{f32_to_i16, i16_to_f32, resample_linear};
14use crate::rtp::{
15 RtpCounters, RtpHeader, RtpStats, build_rtcp_rr, build_rtcp_sr, parse_rtp, parse_sequence,
16 parse_timestamp,
17};
18
19#[cfg(feature = "srtp")]
20use crate::srtp::SrtpContext;
21
22#[cfg(feature = "device")]
23use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
24
25type SharedSrtp = Arc<std::sync::Mutex<SrtpContext>>;
26
27pub struct MediaSession {
37 muted: Arc<AtomicBool>,
38 running: Arc<AtomicBool>,
39 counters: RtpCounters,
40 codec: CodecType,
41 learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>>,
42 rtp_socket: Arc<UdpSocket>,
43 ssrc: u32,
44 remote_addr: SocketAddr,
45}
46
47impl MediaSession {
48 pub async fn start(
55 local_rtp_port: u16,
56 remote_addr: SocketAddr,
57 codec_type: CodecType,
58 ) -> Result<Self> {
59 Self::start_internal(local_rtp_port, remote_addr, codec_type, None).await
60 }
61
62 #[cfg(feature = "srtp")]
64 pub async fn start_with_srtp(
65 local_rtp_port: u16,
66 remote_addr: SocketAddr,
67 codec_type: CodecType,
68 srtp_ctx: SrtpContext,
69 ) -> Result<Self> {
70 Self::start_internal(local_rtp_port, remote_addr, codec_type, Some(srtp_ctx)).await
71 }
72
73 async fn start_internal(
74 local_rtp_port: u16,
75 remote_addr: SocketAddr,
76 codec_type: CodecType,
77 #[allow(unused_variables)] srtp_ctx: Option<SrtpContext>,
78 ) -> Result<Self> {
79 let rtp_socket = UdpSocket::bind(format!("0.0.0.0:{}", local_rtp_port))
80 .await
81 .map_err(|e| Error::Network(e))?;
82
83 let rtp_socket = Arc::new(rtp_socket);
84 let muted = Arc::new(AtomicBool::new(false));
85 let running = Arc::new(AtomicBool::new(true));
86 let ssrc: u32 = rand::random();
87 let counters = RtpCounters::new(codec_type.name());
88 let learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>> =
89 Arc::new(std::sync::Mutex::new(None));
90
91 let encoder = create_encoder(codec_type)?;
92 let decoder = create_decoder(codec_type)?;
93
94 #[cfg(feature = "srtp")]
95 let shared_srtp: Option<SharedSrtp> =
96 srtp_ctx.map(|ctx| Arc::new(std::sync::Mutex::new(ctx)));
97 #[cfg(not(feature = "srtp"))]
98 let shared_srtp: Option<SharedSrtp> = None;
99
100 let rtcp_port = local_rtp_port + 1;
102 let rtcp_socket = UdpSocket::bind(format!("0.0.0.0:{}", rtcp_port))
103 .await
104 .map_err(|e| Error::Network(e))?;
105 let rtcp_socket = Arc::new(rtcp_socket);
106 let remote_rtcp_addr: SocketAddr =
107 format!("{}:{}", remote_addr.ip(), remote_addr.port() + 1)
108 .parse()
109 .unwrap_or(remote_addr);
110
111 #[cfg(feature = "device")]
113 {
114 let tx_socket = rtp_socket.clone();
115 let tx_muted = muted.clone();
116 let tx_running = running.clone();
117 let tx_counters = counters.clone();
118 let tx_learned = learned_remote.clone();
119 let tx_srtp = shared_srtp.clone();
120
121 std::thread::spawn(move || {
122 if let Err(e) = run_audio_tx(
123 tx_socket,
124 remote_addr,
125 ssrc,
126 tx_muted,
127 tx_running,
128 encoder,
129 tx_counters,
130 tx_learned,
131 tx_srtp,
132 ) {
133 log::error!("Audio TX error: {}", e);
134 }
135 });
136 }
137
138 #[cfg(feature = "device")]
140 {
141 let rx_socket = rtp_socket.clone();
142 let rx_running = running.clone();
143 let rx_counters = counters.clone();
144 let rx_learned = learned_remote.clone();
145 let rx_srtp = shared_srtp.clone();
146
147 std::thread::spawn(move || {
148 if let Err(e) = run_audio_rx(
149 rx_socket,
150 rx_running,
151 decoder,
152 rx_counters,
153 rx_learned,
154 rx_srtp,
155 ) {
156 log::error!("Audio RX error: {}", e);
157 }
158 });
159 }
160
161 {
163 let rtcp_running = running.clone();
164 let rtcp_counters = counters.clone();
165 let rtcp_srtp = shared_srtp;
166 tokio::spawn(async move {
167 run_rtcp(
168 rtcp_socket,
169 remote_rtcp_addr,
170 ssrc,
171 rtcp_running,
172 rtcp_counters,
173 rtcp_srtp,
174 )
175 .await;
176 });
177 }
178
179 log::info!(
180 "Media session started: local RTP :{}, remote {}, codec {:?}",
181 local_rtp_port,
182 remote_addr,
183 codec_type,
184 );
185
186 Ok(Self {
187 muted,
188 running,
189 counters,
190 codec: codec_type,
191 learned_remote,
192 rtp_socket,
193 ssrc,
194 remote_addr,
195 })
196 }
197
198 pub fn send_dtmf(&self, digit: &str) {
200 let event_code: u8 = match digit {
201 "0" => 0,
202 "1" => 1,
203 "2" => 2,
204 "3" => 3,
205 "4" => 4,
206 "5" => 5,
207 "6" => 6,
208 "7" => 7,
209 "8" => 8,
210 "9" => 9,
211 "*" => 10,
212 "#" => 11,
213 _ => {
214 log::warn!("Unknown DTMF digit: {}", digit);
215 return;
216 }
217 };
218
219 let socket = self.rtp_socket.clone();
220 let ssrc = self.ssrc;
221 let dest = self
222 .learned_remote
223 .lock()
224 .ok()
225 .and_then(|g| *g)
226 .unwrap_or(self.remote_addr);
227 let counters = self.counters.clone();
228
229 tokio::spawn(async move {
230 let base_ts: u32 = rand::random();
231 let base_seq: u16 = rand::random();
232 let volume: u8 = 10;
233 let pt: u8 = 101;
234 let durations = [160u16, 320, 480];
235
236 for (i, &duration) in durations.iter().enumerate() {
237 let is_end = i == durations.len() - 1;
238 let seq = base_seq.wrapping_add(i as u16);
239
240 let mut packet = Vec::with_capacity(16);
241 packet.push(0x80);
242 let marker = if i == 0 { 0x80 } else { 0x00 };
243 packet.push(pt | marker);
244 packet.extend_from_slice(&seq.to_be_bytes());
245 packet.extend_from_slice(&base_ts.to_be_bytes());
246 packet.extend_from_slice(&ssrc.to_be_bytes());
247
248 let end_flag: u8 = if is_end { 0x80 } else { 0x00 };
249 packet.push(event_code);
250 packet.push(end_flag | (volume & 0x3F));
251 packet.extend_from_slice(&duration.to_be_bytes());
252
253 let _ = socket.send_to(&packet, dest).await;
254 counters.record_sent(packet.len() as u64);
255
256 if is_end {
257 for _ in 0..2 {
258 let repeat_seq = seq.wrapping_add(1);
259 packet[2..4].copy_from_slice(&repeat_seq.to_be_bytes());
260 let _ = socket.send_to(&packet, dest).await;
261 }
262 }
263
264 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
265 }
266 });
267 }
268
269 pub fn set_mute(&self, mute: bool) {
271 self.muted.store(mute, Ordering::Relaxed);
272 }
273
274 pub fn is_muted(&self) -> bool {
276 self.muted.load(Ordering::Relaxed)
277 }
278
279 pub fn stats(&self) -> RtpStats {
281 self.counters.snapshot()
282 }
283
284 pub fn codec(&self) -> CodecType {
286 self.codec
287 }
288
289 pub fn ssrc(&self) -> u32 {
291 self.ssrc
292 }
293
294 pub fn remote_addr(&self) -> SocketAddr {
296 self.remote_addr
297 }
298
299 pub fn learned_remote(&self) -> Option<SocketAddr> {
301 self.learned_remote.lock().ok().and_then(|g| *g)
302 }
303
304 pub fn stop(&self) {
306 self.running.store(false, Ordering::Relaxed);
307 log::info!("Media session stopped");
308 }
309}
310
311impl Drop for MediaSession {
312 fn drop(&mut self) {
313 self.stop();
314 }
315}
316
317impl std::fmt::Debug for MediaSession {
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 f.debug_struct("MediaSession")
320 .field("codec", &self.codec)
321 .field("ssrc", &self.ssrc)
322 .field("remote_addr", &self.remote_addr)
323 .field("muted", &self.muted.load(Ordering::Relaxed))
324 .field("running", &self.running.load(Ordering::Relaxed))
325 .finish()
326 }
327}
328
329#[cfg(feature = "device")]
332fn run_audio_tx(
333 socket: Arc<UdpSocket>,
334 remote: SocketAddr,
335 ssrc: u32,
336 muted: Arc<AtomicBool>,
337 running: Arc<AtomicBool>,
338 encoder: Box<dyn AudioEncoder>,
339 counters: RtpCounters,
340 learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>>,
341 _srtp: Option<SharedSrtp>,
342) -> Result<()> {
343 use std::sync::atomic::AtomicU16;
344
345 let host = cpal::default_host();
346 let device = host
347 .default_input_device()
348 .ok_or_else(|| Error::device("No input device"))?;
349
350 let default_config = device
351 .default_input_config()
352 .map_err(|e| Error::device(format!("No input config: {}", e)))?;
353
354 let native_rate = default_config.sample_rate();
355 log::info!("Audio TX: native rate = {} Hz", native_rate);
356
357 let config = cpal::StreamConfig {
358 channels: 1,
359 sample_rate: default_config.sample_rate(),
360 buffer_size: cpal::BufferSize::Default,
361 };
362
363 let codec_rate = 8000u32;
364 let resample_ratio = codec_rate as f64 / native_rate as f64;
365
366 let rt = tokio::runtime::Handle::current();
367 let seq = Arc::new(AtomicU16::new(0));
368 let ts = Arc::new(std::sync::atomic::AtomicU32::new(0));
369 let pt = encoder.payload_type();
370 let encoder = Arc::new(std::sync::Mutex::new(encoder));
371 let sample_buffer = Arc::new(std::sync::Mutex::new(Vec::<f32>::with_capacity(1024)));
372 let samples_per_frame = 160usize;
373
374 let cb_running = running.clone();
375 let stream = device
376 .build_input_stream(
377 &config,
378 move |data: &[f32], _: &cpal::InputCallbackInfo| {
379 if !cb_running.load(Ordering::Relaxed) || muted.load(Ordering::Relaxed) {
380 return;
381 }
382
383 let mut buffer = match sample_buffer.lock() {
384 Ok(b) => b,
385 Err(_) => return,
386 };
387 buffer.extend_from_slice(data);
388
389 let native_samples_per_frame =
390 ((samples_per_frame as f64) / resample_ratio).ceil() as usize;
391
392 while buffer.len() >= native_samples_per_frame {
393 let chunk: Vec<f32> = buffer.drain(..native_samples_per_frame).collect();
394 let resampled = resample_linear(&chunk, native_rate, codec_rate);
395 let pcm = f32_to_i16(&resampled);
396
397 let current_seq = seq.fetch_add(1, Ordering::Relaxed);
398 let current_ts = ts.fetch_add(samples_per_frame as u32, Ordering::Relaxed);
399
400 let header = RtpHeader::new(pt, current_seq, current_ts, ssrc);
401 let mut packet = header.to_bytes();
402
403 if let Ok(mut enc) = encoder.lock() {
404 enc.encode(&pcm, &mut packet);
405 }
406
407 #[cfg(feature = "srtp")]
408 let send_packet = if let Some(ref srtp_ctx) = _srtp {
409 match srtp_ctx.lock() {
410 Ok(mut ctx) => match ctx.protect_rtp(&packet) {
411 Ok(encrypted) => encrypted,
412 Err(e) => {
413 log::error!("SRTP protect failed: {}", e);
414 continue;
415 }
416 },
417 Err(_) => packet,
418 }
419 } else {
420 packet
421 };
422
423 #[cfg(not(feature = "srtp"))]
424 let send_packet = packet;
425
426 counters.record_sent(send_packet.len() as u64);
427
428 let dest = learned_remote
429 .lock()
430 .ok()
431 .and_then(|g| *g)
432 .unwrap_or(remote);
433 let socket = socket.clone();
434 rt.spawn(async move {
435 let _ = socket.send_to(&send_packet, dest).await;
436 });
437 }
438 },
439 |err| log::error!("Audio input error: {}", err),
440 None,
441 )
442 .map_err(|e| Error::device(format!("Failed to build input stream: {}", e)))?;
443
444 stream
445 .play()
446 .map_err(|e| Error::device(format!("Failed to start input: {}", e)))?;
447
448 while running.load(Ordering::Relaxed) {
449 std::thread::sleep(std::time::Duration::from_millis(50));
450 }
451
452 drop(stream);
453 Ok(())
454}
455
456#[cfg(feature = "device")]
457fn run_audio_rx(
458 socket: Arc<UdpSocket>,
459 running: Arc<AtomicBool>,
460 mut decoder: Box<dyn AudioDecoder>,
461 counters: RtpCounters,
462 learned_remote: Arc<std::sync::Mutex<Option<SocketAddr>>>,
463 _srtp: Option<SharedSrtp>,
464) -> Result<()> {
465 use std::collections::VecDeque;
466
467 let host = cpal::default_host();
468 let device = host
469 .default_output_device()
470 .ok_or_else(|| Error::device("No output device"))?;
471
472 let default_config = device
473 .default_output_config()
474 .map_err(|e| Error::device(format!("No output config: {}", e)))?;
475
476 let native_rate = default_config.sample_rate();
477 log::info!("Audio RX: native rate = {} Hz", native_rate);
478
479 let config = cpal::StreamConfig {
480 channels: 1,
481 sample_rate: default_config.sample_rate(),
482 buffer_size: cpal::BufferSize::Default,
483 };
484
485 let codec_rate = 8000u32;
486
487 let sample_buffer: Arc<std::sync::Mutex<VecDeque<f32>>> = Arc::new(std::sync::Mutex::new(
488 VecDeque::with_capacity(native_rate as usize),
489 ));
490 let rx_buffer = sample_buffer.clone();
491
492 let stream = device
493 .build_output_stream(
494 &config,
495 move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
496 if let Ok(mut buffer) = rx_buffer.lock() {
497 for out in data.iter_mut() {
498 *out = buffer.pop_front().unwrap_or(0.0);
499 }
500 } else {
501 for out in data.iter_mut() {
502 *out = 0.0;
503 }
504 }
505 },
506 |err| log::error!("Audio output error: {}", err),
507 None,
508 )
509 .map_err(|e| Error::device(format!("Failed to build output stream: {}", e)))?;
510
511 stream
512 .play()
513 .map_err(|e| Error::device(format!("Failed to start output: {}", e)))?;
514
515 let rt = tokio::runtime::Builder::new_current_thread()
516 .enable_all()
517 .build()
518 .map_err(|e| Error::device(format!("Failed to create runtime: {}", e)))?;
519
520 rt.block_on(async {
521 let mut buf = [0u8; 2048];
522 let mut last_transit: Option<i64> = None;
523 let mut first_seq: Option<u16> = None;
524
525 while running.load(Ordering::Relaxed) {
526 let recv = tokio::time::timeout(
527 std::time::Duration::from_millis(100),
528 socket.recv_from(&mut buf),
529 )
530 .await;
531
532 match recv {
533 Ok(Ok((len, from_addr))) => {
534 if let Ok(mut lr) = learned_remote.lock()
536 && lr.is_none()
537 {
538 log::info!("Comedia: learned remote RTP address {}", from_addr);
539 *lr = Some(from_addr);
540 }
541
542 #[cfg(feature = "srtp")]
543 let rtp_data: Vec<u8> = if let Some(ref srtp_ctx) = _srtp {
544 match srtp_ctx.lock() {
545 Ok(mut ctx) => match ctx.unprotect_rtp(&buf[..len]) {
546 Ok(decrypted) => decrypted,
547 Err(e) => {
548 log::warn!("SRTP unprotect failed: {}", e);
549 continue;
550 }
551 },
552 Err(_) => buf[..len].to_vec(),
553 }
554 } else {
555 buf[..len].to_vec()
556 };
557
558 #[cfg(not(feature = "srtp"))]
559 let rtp_data: Vec<u8> = buf[..len].to_vec();
560
561 if let Some(seq) = parse_sequence(&rtp_data) {
563 if first_seq.is_none() {
564 first_seq = Some(seq);
565 }
566 counters.record_received(len as u64, seq);
567 }
568
569 if let Some(rtp_ts) = parse_timestamp(&rtp_data) {
571 let arrival = std::time::SystemTime::now()
572 .duration_since(std::time::UNIX_EPOCH)
573 .unwrap_or_default()
574 .as_micros() as i64;
575 let transit = arrival - (rtp_ts as i64 * 125);
576 if let Some(prev) = last_transit {
577 let d = (transit - prev).unsigned_abs();
578 counters.update_jitter(d);
579 }
580 last_transit = Some(transit);
581 }
582
583 if let Some((_, payload)) = parse_rtp(&rtp_data) {
585 let mut pcm = Vec::with_capacity(payload.len());
586 decoder.decode(payload, &mut pcm);
587
588 let f32_samples = i16_to_f32(&pcm);
589 let resampled = resample_linear(&f32_samples, codec_rate, native_rate);
590
591 if let Ok(mut buffer) = sample_buffer.lock() {
592 for s in resampled {
593 buffer.push_back(s);
594 }
595 while buffer.len() > native_rate as usize {
596 buffer.pop_front();
597 }
598 }
599 }
600 }
601 Ok(Err(e)) => {
602 log::error!("RTP recv error: {}", e);
603 }
604 Err(_) => {} }
606 }
607 });
608
609 drop(stream);
610 Ok(())
611}
612
613async fn run_rtcp(
614 socket: Arc<UdpSocket>,
615 remote_addr: SocketAddr,
616 ssrc: u32,
617 running: Arc<AtomicBool>,
618 counters: RtpCounters,
619 _srtp: Option<SharedSrtp>,
620) {
621 let mut remote_ssrc: u32 = 0;
622 let mut buf = [0u8; 512];
623
624 while running.load(Ordering::Relaxed) {
625 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
626 if !running.load(Ordering::Relaxed) {
627 break;
628 }
629
630 let stats = counters.snapshot();
632 let sr = build_rtcp_sr(ssrc, stats.packets_sent as u32, stats.bytes_sent as u32);
633
634 #[cfg(feature = "srtp")]
635 let sr_to_send = if let Some(ref srtp_ctx) = _srtp {
636 match srtp_ctx.lock() {
637 Ok(mut ctx) => ctx.protect_rtcp(&sr).unwrap_or(sr),
638 Err(_) => sr,
639 }
640 } else {
641 sr
642 };
643
644 #[cfg(not(feature = "srtp"))]
645 let sr_to_send = sr;
646
647 let _ = socket.send_to(&sr_to_send, remote_addr).await;
648
649 if remote_ssrc != 0 {
651 let received = stats.packets_received;
652 let expected = counters.expected_packets.load(Ordering::Relaxed);
653 let lost = expected.saturating_sub(received);
654 let loss_fraction = if expected > 0 {
655 ((lost * 256) / expected) as u8
656 } else {
657 0
658 };
659 let rr = build_rtcp_rr(
660 ssrc,
661 remote_ssrc,
662 loss_fraction,
663 lost as u32,
664 counters.highest_seq.load(Ordering::Relaxed),
665 (counters.jitter_us.load(Ordering::Relaxed) / 125) as u32,
666 );
667
668 #[cfg(feature = "srtp")]
669 let rr_to_send = if let Some(ref srtp_ctx) = _srtp {
670 match srtp_ctx.lock() {
671 Ok(mut ctx) => ctx.protect_rtcp(&rr).unwrap_or(rr),
672 Err(_) => rr,
673 }
674 } else {
675 rr
676 };
677
678 #[cfg(not(feature = "srtp"))]
679 let rr_to_send = rr;
680
681 let _ = socket.send_to(&rr_to_send, remote_addr).await;
682 }
683
684 if let Ok(Ok((len, _))) = tokio::time::timeout(
686 std::time::Duration::from_millis(50),
687 socket.recv_from(&mut buf),
688 )
689 .await
690 {
691 #[cfg(feature = "srtp")]
692 let rtcp_data: Vec<u8> = if let Some(ref srtp_ctx) = _srtp {
693 match srtp_ctx.lock() {
694 Ok(mut ctx) => ctx
695 .unprotect_rtcp(&buf[..len])
696 .unwrap_or_else(|_| buf[..len].to_vec()),
697 Err(_) => buf[..len].to_vec(),
698 }
699 } else {
700 buf[..len].to_vec()
701 };
702
703 #[cfg(not(feature = "srtp"))]
704 let rtcp_data: Vec<u8> = buf[..len].to_vec();
705
706 if rtcp_data.len() >= 8 && (rtcp_data[1] == 200 || rtcp_data[1] == 201) {
707 remote_ssrc =
708 u32::from_be_bytes([rtcp_data[4], rtcp_data[5], rtcp_data[6], rtcp_data[7]]);
709 }
710 }
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use std::net::{IpAddr, Ipv4Addr};
718
719 #[test]
720 fn test_codec_type_properties() {
721 assert_eq!(CodecType::Pcmu.payload_type(), 0);
723 assert_eq!(CodecType::Pcma.payload_type(), 8);
724 assert_eq!(CodecType::Pcmu.clock_rate(), 8000);
725 assert_eq!(CodecType::Pcmu.samples_per_frame(), 160);
726 }
727
728 #[tokio::test]
729 async fn test_media_session_start_invalid_port() {
730 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
732 let result = MediaSession::start(80, remote, CodecType::Pcmu).await;
733
734 if result.is_err() {
737 assert!(matches!(result, Err(Error::Network(_))));
738 }
739 }
740
741 #[tokio::test]
742 async fn test_media_session_basic_creation() {
743 let port = 50000 + (rand::random::<u16>() % 10000);
745 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
746
747 let result = MediaSession::start(port, remote, CodecType::Pcmu).await;
749
750 match result {
754 Ok(session) => {
755 assert!(!session.is_muted());
757 session.stop();
758 }
759 Err(e) => {
760 assert!(
762 matches!(e, Error::Device(_)) || matches!(e, Error::Network(_)),
763 "Unexpected error type: {:?}",
764 e
765 );
766 }
767 }
768 }
769
770 #[test]
771 fn test_rtp_counters_initialization() {
772 let counters = RtpCounters::new("PCMU");
773 let stats = counters.snapshot();
774
775 assert_eq!(stats.packets_sent, 0);
776 assert_eq!(stats.bytes_sent, 0);
777 assert_eq!(stats.packets_received, 0);
778 assert_eq!(stats.packets_lost, 0);
779 }
780
781 #[test]
782 fn test_socket_addr_creation() {
783 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5060);
784 assert_eq!(addr.port(), 5060);
785 assert_eq!(addr.ip(), IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)));
786 }
787
788 #[test]
789 fn test_create_encoder_decoder() {
790 let encoder = create_encoder(CodecType::Pcmu);
792 assert!(encoder.is_ok());
793
794 let encoder = create_encoder(CodecType::Pcma);
795 assert!(encoder.is_ok());
796
797 let decoder = create_decoder(CodecType::Pcmu);
799 assert!(decoder.is_ok());
800
801 let decoder = create_decoder(CodecType::Pcma);
802 assert!(decoder.is_ok());
803 }
804
805 #[cfg(feature = "srtp")]
806 #[test]
807 fn test_srtp_context_for_session() {
808 use crate::srtp::SrtpContext;
809
810 let (_ctx, key) = SrtpContext::generate().unwrap();
811 assert!(!key.is_empty());
812
813 let mut test_rtp = vec![
815 0x80, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0xA0, 0x12, 0x34, 0x56, 0x78,
816 ];
817 test_rtp.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
818
819 let mut ctx_clone = SrtpContext::from_base64(&key).unwrap();
820 let protected = ctx_clone.protect_rtp(&test_rtp);
821 assert!(protected.is_ok());
822 }
823}