active_call/media/
processor.rs1use super::INTERNAL_SAMPLERATE;
2use super::track::track_codec::TrackCodec;
3use crate::event::{EventSender, SessionEvent};
4use crate::media::{AudioFrame, Samples, SourcePacket};
5use anyhow::Result;
6use std::any::Any;
7use std::sync::{Arc, Mutex};
8
9pub trait Processor: Send + Sync + Any {
10 fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()>;
11}
12
13pub fn convert_to_mono(samples: &mut Vec<i16>, channels: u16) {
14 if channels != 2 {
15 return;
16 }
17 let mut i = 0;
18 let mut j = 0;
19 while i < samples.len() {
20 let l = samples[i] as i32;
21 let r = samples[i + 1] as i32;
22 samples[j] = ((l + r) / 2) as i16;
23 i += 2;
24 j += 1;
25 }
26 samples.truncate(j);
27}
28
29impl Default for AudioFrame {
30 fn default() -> Self {
31 Self {
32 track_id: "".to_string(),
33 samples: Samples::Empty,
34 timestamp: 0,
35 sample_rate: 16000,
36 channels: 1,
37 src_packet: None,
38 }
39 }
40}
41
42impl Samples {
43 pub fn is_empty(&self) -> bool {
44 match self {
45 Samples::PCM { samples } => samples.is_empty(),
46 Samples::RTP { payload, .. } => payload.is_empty(),
47 Samples::Empty => true,
48 }
49 }
50}
51
52#[derive(Clone)]
53pub struct ProcessorChain {
54 processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
55 pub codec: TrackCodec,
56 sample_rate: u32,
57 pub force_decode: bool,
58}
59
60impl ProcessorChain {
61 pub fn new(_sample_rate: u32) -> Self {
62 Self {
63 processors: Arc::new(Mutex::new(Vec::new())),
64 codec: TrackCodec::new(),
65 sample_rate: INTERNAL_SAMPLERATE,
66 force_decode: true,
67 }
68 }
69 pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
70 self.processors.lock().unwrap().insert(0, processor);
71 }
72 pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
73 self.processors.lock().unwrap().push(processor);
74 }
75
76 pub fn has_processor<T: 'static>(&self) -> bool {
77 let processors = self.processors.lock().unwrap();
78 processors
79 .iter()
80 .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
81 }
82
83 pub fn remove_processor<T: 'static>(&self) {
84 let mut processors = self.processors.lock().unwrap();
85 processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
86 }
87
88 pub fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
89 let mut processors = self.processors.lock().unwrap();
90 if !self.force_decode && processors.is_empty() {
91 return Ok(());
92 }
93 match &mut frame.samples {
94 Samples::RTP {
95 payload_type,
96 payload,
97 sequence_number,
98 } => {
99 if TrackCodec::is_audio(*payload_type) {
100 let (decoded_sample_rate, channels, samples) =
101 self.codec.decode(*payload_type, &payload, self.sample_rate);
102 let src_packet = SourcePacket {
103 sequence_number: *sequence_number,
104 payload_type: *payload_type,
105 payload: std::mem::take(payload),
106 };
107 frame.src_packet = Some(src_packet);
108 frame.channels = channels;
109 frame.samples = Samples::PCM { samples };
110 frame.sample_rate = decoded_sample_rate;
111 }
112 }
113 _ => {}
114 }
115
116 if let Samples::PCM { samples } = &mut frame.samples {
117 if frame.sample_rate != self.sample_rate {
118 let new_samples = self.codec.resample(
119 std::mem::take(samples),
120 frame.sample_rate,
121 self.sample_rate,
122 );
123 *samples = new_samples;
124 frame.sample_rate = self.sample_rate;
125 }
126 if frame.channels == 2 {
127 convert_to_mono(samples, 2);
128 frame.channels = 1;
129 }
130 }
131 for processor in processors.iter_mut() {
133 processor.process_frame(frame)?;
134 }
135 Ok(())
136 }
137}
138
139pub struct SubscribeProcessor {
140 event_sender: EventSender,
141 track_id: String,
142 track_index: u8, }
144
145impl SubscribeProcessor {
146 pub fn new(event_sender: EventSender, track_id: String, track_index: u8) -> Self {
147 Self {
148 event_sender,
149 track_id,
150 track_index,
151 }
152 }
153}
154
155impl Processor for SubscribeProcessor {
156 fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
157 if let Samples::PCM { samples } = &frame.samples {
158 if !samples.is_empty() {
159 let pcm_data = audio_codec::samples_to_bytes(samples);
160 let mut data = Vec::with_capacity(pcm_data.len() + 1);
161 data.push(self.track_index);
162 data.extend_from_slice(&pcm_data);
163
164 let event = SessionEvent::Binary {
165 track_id: self.track_id.clone(),
166 timestamp: frame.timestamp,
167 data,
168 };
169 self.event_sender.send(event).ok();
170 }
171 }
172 Ok(())
173 }
174}