active_call/media/
processor.rs1use super::INTERNAL_SAMPLERATE;
2use super::track::track_codec::TrackCodec;
3use crate::media::{AudioFrame, Samples};
4use anyhow::Result;
5use std::any::Any;
6use std::sync::{Arc, Mutex};
7
8pub trait Processor: Send + Sync + Any {
9 fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()>;
10}
11
12pub fn convert_to_mono(samples: &mut Vec<i16>, channels: u16) {
13 if channels != 2 {
14 return;
15 }
16 let mut i = 0;
17 let mut j = 0;
18 while i < samples.len() {
19 let l = samples[i] as i32;
20 let r = samples[i + 1] as i32;
21 samples[j] = ((l + r) / 2) as i16;
22 i += 2;
23 j += 1;
24 }
25 samples.truncate(j);
26}
27
28impl Default for AudioFrame {
29 fn default() -> Self {
30 Self {
31 track_id: "".to_string(),
32 samples: Samples::Empty,
33 timestamp: 0,
34 sample_rate: 16000,
35 channels: 1,
36 }
37 }
38}
39
40impl Samples {
41 pub fn is_empty(&self) -> bool {
42 match self {
43 Samples::PCM { samples } => samples.is_empty(),
44 Samples::RTP { payload, .. } => payload.is_empty(),
45 Samples::Empty => true,
46 }
47 }
48}
49
50#[derive(Clone)]
51pub struct ProcessorChain {
52 processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
53 pub codec: TrackCodec,
54 sample_rate: u32,
55 pub force_decode: bool,
56}
57
58impl ProcessorChain {
59 pub fn new(_sample_rate: u32) -> Self {
60 Self {
61 processors: Arc::new(Mutex::new(Vec::new())),
62 codec: TrackCodec::new(),
63 sample_rate: INTERNAL_SAMPLERATE,
64 force_decode: true,
65 }
66 }
67 pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
68 self.processors.lock().unwrap().insert(0, processor);
69 }
70 pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
71 self.processors.lock().unwrap().push(processor);
72 }
73
74 pub fn has_processor<T: 'static>(&self) -> bool {
75 let processors = self.processors.lock().unwrap();
76 processors
77 .iter()
78 .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
79 }
80
81 pub fn remove_processor<T: 'static>(&self) {
82 let mut processors = self.processors.lock().unwrap();
83 processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
84 }
85
86 pub fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
87 let mut processors = self.processors.lock().unwrap();
88 if !self.force_decode && processors.is_empty() {
89 return Ok(());
90 }
91
92 match &mut frame.samples {
93 Samples::RTP {
94 payload_type,
95 payload,
96 ..
97 } => {
98 if TrackCodec::is_audio(*payload_type) {
99 let (decoded_sample_rate, channels, samples) =
100 self.codec.decode(*payload_type, &payload, self.sample_rate);
101 frame.channels = channels;
102 frame.samples = Samples::PCM { samples };
103 frame.sample_rate = decoded_sample_rate;
104 }
105 }
106 _ => {}
107 }
108
109 if let Samples::PCM { samples } = &mut frame.samples {
110 if frame.sample_rate != self.sample_rate {
111 let new_samples = self.codec.resample(
112 std::mem::take(samples),
113 frame.sample_rate,
114 self.sample_rate,
115 );
116 *samples = new_samples;
117 frame.sample_rate = self.sample_rate;
118 }
119 if frame.channels == 2 {
120 convert_to_mono(samples, 2);
121 frame.channels = 1;
122 }
123 }
124 for processor in processors.iter_mut() {
126 processor.process_frame(frame)?;
127 }
128 Ok(())
129 }
130}