voice_engine/media/
processor.rs1use super::track::track_codec::TrackCodec;
2use crate::media::{AudioFrame, Samples};
3use anyhow::Result;
4use std::any::Any;
5use std::sync::{Arc, Mutex};
6
7pub trait Processor: Send + Sync + Any {
8 fn process_frame(&self, frame: &mut AudioFrame) -> Result<()>;
9}
10
11impl Default for AudioFrame {
12 fn default() -> Self {
13 Self {
14 track_id: "".to_string(),
15 samples: Samples::Empty,
16 timestamp: 0,
17 sample_rate: 16000,
18 }
19 }
20}
21
22impl Samples {
23 pub fn is_empty(&self) -> bool {
24 match self {
25 Samples::PCM { samples } => samples.is_empty(),
26 Samples::RTP { payload, .. } => payload.is_empty(),
27 Samples::Empty => true,
28 }
29 }
30}
31
32#[derive(Clone)]
33pub struct ProcessorChain {
34 processors: Arc<Mutex<Vec<Box<dyn Processor>>>>,
35 codec: Arc<Mutex<TrackCodec>>,
36 sample_rate: u32,
37 pub force_decode: bool,
38}
39
40impl ProcessorChain {
41 pub fn new(sample_rate: u32) -> Self {
42 Self {
43 processors: Arc::new(Mutex::new(Vec::new())),
44 codec: Arc::new(Mutex::new(TrackCodec::new())),
45 sample_rate,
46 force_decode: true,
47 }
48 }
49 pub fn insert_processor(&mut self, processor: Box<dyn Processor>) {
50 self.processors.lock().unwrap().insert(0, processor);
51 }
52 pub fn append_processor(&mut self, processor: Box<dyn Processor>) {
53 self.processors.lock().unwrap().push(processor);
54 }
55
56 pub fn has_processor<T: 'static>(&self) -> bool {
57 let processors = self.processors.lock().unwrap();
58 processors
59 .iter()
60 .any(|processor| (processor.as_ref() as &dyn Any).is::<T>())
61 }
62
63 pub fn remove_processor<T: 'static>(&self) {
64 let mut processors = self.processors.lock().unwrap();
65 processors.retain(|processor| !(processor.as_ref() as &dyn Any).is::<T>());
66 }
67
68 pub fn process_frame(&self, frame: &mut AudioFrame) -> Result<()> {
69 let processors = self.processors.lock().unwrap();
70 if !self.force_decode && processors.is_empty() {
71 return Ok(());
72 }
73
74 if let Samples::RTP {
75 payload_type,
76 payload,
77 ..
78 } = &frame.samples
79 {
80 if TrackCodec::is_audio(*payload_type) {
81 let samples =
82 self.codec
83 .lock()
84 .unwrap()
85 .decode(*payload_type, &payload, self.sample_rate);
86 frame.samples = Samples::PCM { samples };
87 frame.sample_rate = self.sample_rate;
88 }
89 }
90 for processor in processors.iter() {
92 processor.process_frame(frame)?;
93 }
94 Ok(())
95 }
96}