active_call/media/
processor.rs1use super::INTERNAL_SAMPLERATE;
2use super::track::track_codec::TrackCodec;
3use crate::event::{EventSender, SessionEvent};
4use crate::media::{AudioFrame, Samples};
5use anyhow::Result;
6use std::any::Any;
7use std::sync::{Arc, Mutex};
8
9pub trait Processor: Send + Sync + Any {
10 fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()>;
11}
12
13pub fn convert_to_mono(samples: &mut Vec<i16>, channels: u16) {
14 if channels != 2 {
15 return;
16 }
17 let mut i = 0;
18 let mut j = 0;
19 while i < samples.len() {
20 let l = samples[i] as i32;
21 let r = samples[i + 1] as i32;
22 samples[j] = ((l + r) / 2) as i16;
23 i += 2;
24 j += 1;
25 }
26 samples.truncate(j);
27}
28
29impl Default for AudioFrame {
30 fn default() -> Self {
31 Self {
32 track_id: "".to_string(),
33 samples: Samples::Empty,
34 timestamp: 0,
35 sample_rate: 16000,
36 channels: 1,
37 }
38 }
39}
40
41impl Samples {
42 pub fn is_empty(&self) -> bool {
43 match self {
44 Samples::PCM { samples } => samples.is_empty(),
45 Samples::RTP { payload, .. } => payload.is_empty(),
46 Samples::Empty => true,
47 }
48 }
49}
50
51#[derive(Clone)]
52pub struct ProcessorChain {
53 processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
54 pub codec: TrackCodec,
55 sample_rate: u32,
56 pub force_decode: bool,
57}
58
59impl ProcessorChain {
60 pub fn new(_sample_rate: u32) -> Self {
61 Self {
62 processors: Arc::new(Mutex::new(Vec::new())),
63 codec: TrackCodec::new(),
64 sample_rate: INTERNAL_SAMPLERATE,
65 force_decode: true,
66 }
67 }
68 pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
69 self.processors.lock().unwrap().insert(0, processor);
70 }
71 pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
72 self.processors.lock().unwrap().push(processor);
73 }
74
75 pub fn has_processor<T: 'static>(&self) -> bool {
76 let processors = self.processors.lock().unwrap();
77 processors
78 .iter()
79 .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
80 }
81
82 pub fn remove_processor<T: 'static>(&self) {
83 let mut processors = self.processors.lock().unwrap();
84 processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
85 }
86
87 pub fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
88 let mut processors = self.processors.lock().unwrap();
89 if !self.force_decode && processors.is_empty() {
90 return Ok(());
91 }
92
93 match &mut frame.samples {
94 Samples::RTP {
95 payload_type,
96 payload,
97 ..
98 } => {
99 if TrackCodec::is_audio(*payload_type) {
100 let (decoded_sample_rate, channels, samples) =
101 self.codec.decode(*payload_type, &payload, self.sample_rate);
102 frame.channels = channels;
103 frame.samples = Samples::PCM { samples };
104 frame.sample_rate = decoded_sample_rate;
105 }
106 }
107 _ => {}
108 }
109
110 if let Samples::PCM { samples } = &mut frame.samples {
111 if frame.sample_rate != self.sample_rate {
112 let new_samples = self.codec.resample(
113 std::mem::take(samples),
114 frame.sample_rate,
115 self.sample_rate,
116 );
117 *samples = new_samples;
118 frame.sample_rate = self.sample_rate;
119 }
120 if frame.channels == 2 {
121 convert_to_mono(samples, 2);
122 frame.channels = 1;
123 }
124 }
125 for processor in processors.iter_mut() {
127 processor.process_frame(frame)?;
128 }
129 Ok(())
130 }
131}
132
133pub struct SubscribeProcessor {
134 event_sender: EventSender,
135 track_id: String,
136 track_index: u8, }
138
139impl SubscribeProcessor {
140 pub fn new(event_sender: EventSender, track_id: String, track_index: u8) -> Self {
141 Self {
142 event_sender,
143 track_id,
144 track_index,
145 }
146 }
147}
148
149impl Processor for SubscribeProcessor {
150 fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
151 if let Samples::PCM { samples } = &frame.samples {
152 if !samples.is_empty() {
153 let pcm_data = audio_codec::samples_to_bytes(samples);
154 let mut data = Vec::with_capacity(pcm_data.len() + 1);
155 data.push(self.track_index);
156 data.extend_from_slice(&pcm_data);
157
158 let event = SessionEvent::Binary {
159 track_id: self.track_id.clone(),
160 timestamp: frame.timestamp,
161 data,
162 };
163 self.event_sender.send(event).ok();
164 }
165 }
166 Ok(())
167 }
168}