active_call/media/
processor.rs

1use super::track::track_codec::TrackCodec;
2use crate::media::{AudioFrame, Samples};
3use anyhow::Result;
4use std::any::Any;
5use std::sync::{Arc, Mutex};
6
7pub trait Processor: Send + Sync + Any {
8    fn process_frame(&self, frame: &mut AudioFrame) -> Result<()>;
9}
10
11impl Default for AudioFrame {
12    fn default() -> Self {
13        Self {
14            track_id: "".to_string(),
15            samples: Samples::Empty,
16            timestamp: 0,
17            sample_rate: 16000,
18        }
19    }
20}
21
22impl Samples {
23    pub fn is_empty(&self) -> bool {
24        match self {
25            Samples::PCM { samples } => samples.is_empty(),
26            Samples::RTP { payload, .. } => payload.is_empty(),
27            Samples::Empty => true,
28        }
29    }
30}
31
32#[derive(Clone)]
33pub struct ProcessorChain {
34    processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
35    codec: Arc<Mutex<TrackCodec>>,
36    sample_rate: u32,
37    pub force_decode: bool,
38}
39
40impl ProcessorChain {
41    pub fn new(sample_rate: u32) -> Self {
42        Self {
43            processors: Arc::new(Mutex::new(Vec::new())),
44            codec: Arc::new(Mutex::new(TrackCodec::new())),
45            sample_rate,
46            force_decode: true,
47        }
48    }
49    pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
50        self.processors.lock().unwrap().insert(0, processor);
51    }
52    pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
53        self.processors.lock().unwrap().push(processor);
54    }
55
56    pub fn has_processor<T: 'static>(&self) -> bool {
57        let processors = self.processors.lock().unwrap();
58        processors
59            .iter()
60            .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
61    }
62
63    pub fn remove_processor<T: 'static>(&self) {
64        let mut processors = self.processors.lock().unwrap();
65        processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
66    }
67
68    pub fn process_frame(&self, frame: &mut AudioFrame) -> Result<()> {
69        let processors = self.processors.lock().unwrap();
70        if !self.force_decode && processors.is_empty() {
71            return Ok(());
72        }
73
74        if let Samples::RTP {
75            payload_type,
76            payload,
77            ..
78        } = &frame.samples
79        {
80            if TrackCodec::is_audio(*payload_type) {
81                let samples =
82                    self.codec
83                        .lock()
84                        .unwrap()
85                        .decode(*payload_type, &payload, self.sample_rate);
86                frame.samples = Samples::PCM { samples };
87                frame.sample_rate = self.sample_rate;
88            }
89        }
90        // Process the frame with all processors
91        for processor in processors.iter() {
92            processor.process_frame(frame)?;
93        }
94        Ok(())
95    }
96}