Skip to main content

active_call/media/
processor.rs

1use super::INTERNAL_SAMPLERATE;
2use super::track::track_codec::TrackCodec;
3use crate::event::{EventSender, SessionEvent};
4use crate::media::{AudioFrame, Samples};
5use anyhow::Result;
6use std::any::Any;
7use std::sync::{Arc, Mutex};
8
9pub trait Processor: Send + Sync + Any {
10    fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()>;
11}
12
13pub fn convert_to_mono(samples: &mut Vec<i16>, channels: u16) {
14    if channels != 2 {
15        return;
16    }
17    let mut i = 0;
18    let mut j = 0;
19    while i < samples.len() {
20        let l = samples[i] as i32;
21        let r = samples[i + 1] as i32;
22        samples[j] = ((l + r) / 2) as i16;
23        i += 2;
24        j += 1;
25    }
26    samples.truncate(j);
27}
28
29impl Default for AudioFrame {
30    fn default() -> Self {
31        Self {
32            track_id: "".to_string(),
33            samples: Samples::Empty,
34            timestamp: 0,
35            sample_rate: 16000,
36            channels: 1,
37        }
38    }
39}
40
41impl Samples {
42    pub fn is_empty(&self) -> bool {
43        match self {
44            Samples::PCM { samples } => samples.is_empty(),
45            Samples::RTP { payload, .. } => payload.is_empty(),
46            Samples::Empty => true,
47        }
48    }
49}
50
51#[derive(Clone)]
52pub struct ProcessorChain {
53    processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
54    pub codec: TrackCodec,
55    sample_rate: u32,
56    pub force_decode: bool,
57}
58
59impl ProcessorChain {
60    pub fn new(_sample_rate: u32) -> Self {
61        Self {
62            processors: Arc::new(Mutex::new(Vec::new())),
63            codec: TrackCodec::new(),
64            sample_rate: INTERNAL_SAMPLERATE,
65            force_decode: true,
66        }
67    }
68    pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
69        self.processors.lock().unwrap().insert(0, processor);
70    }
71    pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
72        self.processors.lock().unwrap().push(processor);
73    }
74
75    pub fn has_processor<T: 'static>(&self) -> bool {
76        let processors = self.processors.lock().unwrap();
77        processors
78            .iter()
79            .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
80    }
81
82    pub fn remove_processor<T: 'static>(&self) {
83        let mut processors = self.processors.lock().unwrap();
84        processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
85    }
86
87    pub fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
88        let mut processors = self.processors.lock().unwrap();
89        if !self.force_decode && processors.is_empty() {
90            return Ok(());
91        }
92
93        match &mut frame.samples {
94            Samples::RTP {
95                payload_type,
96                payload,
97                ..
98            } => {
99                if TrackCodec::is_audio(*payload_type) {
100                    let (decoded_sample_rate, channels, samples) =
101                        self.codec.decode(*payload_type, &payload, self.sample_rate);
102                    frame.channels = channels;
103                    frame.samples = Samples::PCM { samples };
104                    frame.sample_rate = decoded_sample_rate;
105                }
106            }
107            _ => {}
108        }
109
110        if let Samples::PCM { samples } = &mut frame.samples {
111            if frame.sample_rate != self.sample_rate {
112                let new_samples = self.codec.resample(
113                    std::mem::take(samples),
114                    frame.sample_rate,
115                    self.sample_rate,
116                );
117                *samples = new_samples;
118                frame.sample_rate = self.sample_rate;
119            }
120            if frame.channels == 2 {
121                convert_to_mono(samples, 2);
122                frame.channels = 1;
123            }
124        }
125        // Process the frame with all processors
126        for processor in processors.iter_mut() {
127            processor.process_frame(frame)?;
128        }
129        Ok(())
130    }
131}
132
133pub struct SubscribeProcessor {
134    event_sender: EventSender,
135    track_id: String,
136    track_index: u8, // 0 for caller, 1 for callee
137}
138
139impl SubscribeProcessor {
140    pub fn new(event_sender: EventSender, track_id: String, track_index: u8) -> Self {
141        Self {
142            event_sender,
143            track_id,
144            track_index,
145        }
146    }
147}
148
149impl Processor for SubscribeProcessor {
150    fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
151        if let Samples::PCM { samples } = &frame.samples {
152            if !samples.is_empty() {
153                let pcm_data = audio_codec::samples_to_bytes(samples);
154                let mut data = Vec::with_capacity(pcm_data.len() + 1);
155                data.push(self.track_index);
156                data.extend_from_slice(&pcm_data);
157
158                let event = SessionEvent::Binary {
159                    track_id: self.track_id.clone(),
160                    timestamp: frame.timestamp,
161                    data,
162                };
163                self.event_sender.send(event).ok();
164            }
165        }
166        Ok(())
167    }
168}