1use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
3use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
4use log::*;
5use std::fmt::Debug;
6use tokio::sync::watch;
7
8use crate::audio::transformers::{NoiseGate, Transformer};
9use crate::audio::SAMPLE_RATE;
10use crate::error::{AudioError, AudioStream};
11use crate::state::StatePhase;
12
13pub fn callback<T: Sample>(
15 mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
16 mut transformers: Vec<Box<dyn Transformer + Send + 'static>>,
17 mut opus_encoder: opus::Encoder,
18 buffer_size: usize,
19 input_volume_receiver: watch::Receiver<f32>,
20 phase_watcher: watch::Receiver<StatePhase>,
21) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
22 let mut buffer = Vec::with_capacity(buffer_size);
23
24 move |data: &[T], _info: &InputCallbackInfo| {
25 if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) {
26 return;
27 }
28 let input_volume = *input_volume_receiver.borrow();
29 let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume);
30
31 while buffer.len() + data.len() > buffer_size {
32 buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
33 let encoded = transformers
34 .iter_mut()
35 .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| {
36 e.transform(acc)
37 })
38 .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap());
39
40 if let Some(encoded) = encoded {
41 if let Err(e) = input_sender.try_send(encoded) {
42 warn!("Error sending audio: {}", e);
43 }
44 }
45 buffer.clear();
46 }
47 buffer.extend(data);
48 }
49}
50
51pub trait AudioInputDevice {
55 fn play(&self) -> Result<(), AudioError>;
57 fn pause(&self) -> Result<(), AudioError>;
59 fn set_volume(&self, volume: f32);
61 fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
63 fn num_channels(&self) -> usize;
65}
66
67pub struct DefaultAudioInputDevice {
68 stream: cpal::Stream,
69 sample_receiver: Option<futures_channel::mpsc::Receiver<Vec<u8>>>,
70 volume_sender: watch::Sender<f32>,
71 channels: u16,
72}
73
74impl DefaultAudioInputDevice {
75 pub fn new(
77 input_volume: f32,
78 disable_noise_gate: bool,
79 phase_watcher: watch::Receiver<StatePhase>,
80 frame_size: u32, ) -> Result<Self, AudioError> {
82 let sample_rate = SampleRate(SAMPLE_RATE);
83
84 let host = cpal::default_host();
85
86 let input_device = host
87 .default_input_device()
88 .ok_or(AudioError::NoDevice(AudioStream::Input))?;
89 let input_supported_config = input_device
90 .supported_input_configs()
91 .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))?
92 .find_map(|c| {
93 if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
94 Some(c)
95 } else {
96 None
97 }
98 })
99 .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))?
100 .with_sample_rate(sample_rate);
101 let input_supported_sample_format = input_supported_config.sample_format();
102 let input_config: StreamConfig = input_supported_config.into();
103
104 let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
105
106 let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000);
107
108 let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
109
110 let opus_encoder = opus::Encoder::new(
111 sample_rate.0,
112 match input_config.channels {
113 1 => opus::Channels::Mono,
114 2 => opus::Channels::Stereo,
115 _ => unimplemented!(
116 "Only 1 or 2 channels supported, got {}",
117 input_config.channels
118 ),
119 },
120 opus::Application::Voip,
121 )
122 .unwrap();
123 let buffer_size = (sample_rate.0 * frame_size / 400) as usize;
124
125 let transformers = if disable_noise_gate {
126 vec![]
127 } else {
128 vec![Box::new(NoiseGate::new(50)) as Box<dyn Transformer + Send + 'static>]
129 };
130
131 let input_stream = match input_supported_sample_format {
132 SampleFormat::F32 => input_device.build_input_stream(
133 &input_config,
134 callback::<f32>(
135 sample_sender,
136 transformers,
137 opus_encoder,
138 buffer_size,
139 input_volume_receiver,
140 phase_watcher,
141 ),
142 err_fn,
143 ),
144 SampleFormat::I16 => input_device.build_input_stream(
145 &input_config,
146 callback::<i16>(
147 sample_sender,
148 transformers,
149 opus_encoder,
150 buffer_size,
151 input_volume_receiver,
152 phase_watcher,
153 ),
154 err_fn,
155 ),
156 SampleFormat::U16 => input_device.build_input_stream(
157 &input_config,
158 callback::<u16>(
159 sample_sender,
160 transformers,
161 opus_encoder,
162 buffer_size,
163 input_volume_receiver,
164 phase_watcher,
165 ),
166 err_fn,
167 ),
168 }
169 .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?;
170
171 let res = Self {
172 stream: input_stream,
173 sample_receiver: Some(sample_receiver),
174 volume_sender,
175 channels: input_config.channels,
176 };
177 Ok(res)
178 }
179}
180
181impl AudioInputDevice for DefaultAudioInputDevice {
182 fn play(&self) -> Result<(), AudioError> {
183 self.stream.play().map_err(AudioError::InputPlayError)
184 }
185
186 fn pause(&self) -> Result<(), AudioError> {
187 self.stream.pause().map_err(AudioError::InputPauseError)
188 }
189
190 fn set_volume(&self, volume: f32) {
191 self.volume_sender.send(volume).unwrap();
192 }
193
194 fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
195 self.sample_receiver.take()
196 }
197
198 fn num_channels(&self) -> usize {
199 self.channels as usize
200 }
201}
202
203impl Debug for DefaultAudioInputDevice {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("DefaultAudioInputDevice")
206 .field("sample_receiver", &self.sample_receiver)
207 .field("channels", &self.channels)
208 .field("volume_sender", &self.volume_sender)
209 .field("stream", &"cpal::Stream")
210 .finish()
211 }
212}