Skip to main content

active_call/media/
processor.rs

1use super::INTERNAL_SAMPLERATE;
2use super::track::track_codec::TrackCodec;
3use crate::event::{EventSender, SessionEvent};
4use crate::media::{AudioFrame, Samples, SourcePacket};
5use anyhow::Result;
6use std::any::Any;
7use std::sync::{Arc, Mutex};
8
9pub trait Processor: Send + Sync + Any {
10    fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()>;
11}
12
13pub fn convert_to_mono(samples: &mut Vec<i16>, channels: u16) {
14    if channels != 2 {
15        return;
16    }
17    let mut i = 0;
18    let mut j = 0;
19    while i < samples.len() {
20        let l = samples[i] as i32;
21        let r = samples[i + 1] as i32;
22        samples[j] = ((l + r) / 2) as i16;
23        i += 2;
24        j += 1;
25    }
26    samples.truncate(j);
27}
28
29impl Default for AudioFrame {
30    fn default() -> Self {
31        Self {
32            track_id: "".to_string(),
33            samples: Samples::Empty,
34            timestamp: 0,
35            sample_rate: 16000,
36            channels: 1,
37            src_packet: None,
38        }
39    }
40}
41
42impl Samples {
43    pub fn is_empty(&self) -> bool {
44        match self {
45            Samples::PCM { samples } => samples.is_empty(),
46            Samples::RTP { payload, .. } => payload.is_empty(),
47            Samples::Empty => true,
48        }
49    }
50}
51
52#[derive(Clone)]
53pub struct ProcessorChain {
54    processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
55    pub codec: TrackCodec,
56    sample_rate: u32,
57    pub force_decode: bool,
58}
59
60impl ProcessorChain {
61    pub fn new(_sample_rate: u32) -> Self {
62        Self {
63            processors: Arc::new(Mutex::new(Vec::new())),
64            codec: TrackCodec::new(),
65            sample_rate: INTERNAL_SAMPLERATE,
66            force_decode: true,
67        }
68    }
69    pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
70        self.processors.lock().unwrap().insert(0, processor);
71    }
72    pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
73        self.processors.lock().unwrap().push(processor);
74    }
75
76    pub fn has_processor<T: 'static>(&self) -> bool {
77        let processors = self.processors.lock().unwrap();
78        processors
79            .iter()
80            .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
81    }
82
83    pub fn remove_processor<T: 'static>(&self) {
84        let mut processors = self.processors.lock().unwrap();
85        processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
86    }
87
88    pub fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
89        let mut processors = self.processors.lock().unwrap();
90        if !self.force_decode && processors.is_empty() {
91            return Ok(());
92        }
93        match &mut frame.samples {
94            Samples::RTP {
95                payload_type,
96                payload,
97                sequence_number,
98            } => {
99                if TrackCodec::is_audio(*payload_type) {
100                    let (decoded_sample_rate, channels, samples) =
101                        self.codec.decode(*payload_type, &payload, self.sample_rate);
102                    let src_packet = SourcePacket {
103                        sequence_number: *sequence_number,
104                        payload_type: *payload_type,
105                        payload: std::mem::take(payload),
106                    };
107                    frame.src_packet = Some(src_packet);
108                    frame.channels = channels;
109                    frame.samples = Samples::PCM { samples };
110                    frame.sample_rate = decoded_sample_rate;
111                }
112            }
113            _ => {}
114        }
115
116        if let Samples::PCM { samples } = &mut frame.samples {
117            if frame.sample_rate != self.sample_rate {
118                let new_samples = self.codec.resample(
119                    std::mem::take(samples),
120                    frame.sample_rate,
121                    self.sample_rate,
122                );
123                *samples = new_samples;
124                frame.sample_rate = self.sample_rate;
125            }
126            if frame.channels == 2 {
127                convert_to_mono(samples, 2);
128                frame.channels = 1;
129            }
130        }
131        // Process the frame with all processors
132        for processor in processors.iter_mut() {
133            processor.process_frame(frame)?;
134        }
135        Ok(())
136    }
137}
138
139pub struct SubscribeProcessor {
140    event_sender: EventSender,
141    track_id: String,
142    track_index: u8, // 0 for caller, 1 for callee
143}
144
145impl SubscribeProcessor {
146    pub fn new(event_sender: EventSender, track_id: String, track_index: u8) -> Self {
147        Self {
148            event_sender,
149            track_id,
150            track_index,
151        }
152    }
153}
154
155impl Processor for SubscribeProcessor {
156    fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
157        if let Samples::PCM { samples } = &frame.samples {
158            if !samples.is_empty() {
159                let pcm_data = audio_codec::samples_to_bytes(samples);
160                let mut data = Vec::with_capacity(pcm_data.len() + 1);
161                data.push(self.track_index);
162                data.extend_from_slice(&pcm_data);
163
164                let event = SessionEvent::Binary {
165                    track_id: self.track_id.clone(),
166                    timestamp: frame.timestamp,
167                    data,
168                };
169                self.event_sender.send(event).ok();
170            }
171        }
172        Ok(())
173    }
174}