mum_cli/audio/
input.rs

1//! Listens to the microphone and sends it to the networking.
2use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
3use cpal::{InputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
4use log::*;
5use std::fmt::Debug;
6use tokio::sync::watch;
7
8use crate::audio::transformers::{NoiseGate, Transformer};
9use crate::audio::SAMPLE_RATE;
10use crate::error::{AudioError, AudioStream};
11use crate::state::StatePhase;
12
13/// Generates a callback that receives [Sample]s and sends them as floats to a [futures_channel::mpsc::Sender].
14pub fn callback<T: Sample>(
15    mut input_sender: futures_channel::mpsc::Sender<Vec<u8>>,
16    mut transformers: Vec<Box<dyn Transformer + Send + 'static>>,
17    mut opus_encoder: opus::Encoder,
18    buffer_size: usize,
19    input_volume_receiver: watch::Receiver<f32>,
20    phase_watcher: watch::Receiver<StatePhase>,
21) -> impl FnMut(&[T], &InputCallbackInfo) + Send + 'static {
22    let mut buffer = Vec::with_capacity(buffer_size);
23
24    move |data: &[T], _info: &InputCallbackInfo| {
25        if !matches!(&*phase_watcher.borrow(), StatePhase::Connected(_)) {
26            return;
27        }
28        let input_volume = *input_volume_receiver.borrow();
29        let mut data = data.iter().map(|e| e.to_f32()).map(|e| e * input_volume);
30
31        while buffer.len() + data.len() > buffer_size {
32            buffer.extend(data.by_ref().take(buffer_size - buffer.len()));
33            let encoded = transformers
34                .iter_mut()
35                .try_fold((opus::Channels::Mono, &mut buffer[..]), |acc, e| {
36                    e.transform(acc)
37                })
38                .map(|buf| opus_encoder.encode_vec_float(&*buf.1, buffer_size).unwrap());
39
40            if let Some(encoded) = encoded {
41                if let Err(e) = input_sender.try_send(encoded) {
42                    warn!("Error sending audio: {}", e);
43                }
44            }
45            buffer.clear();
46        }
47        buffer.extend(data);
48    }
49}
50
51/// Something that can listen to audio and send it somewhere.
52///
53/// One sample is assumed to be an encoded opus frame. See [opus::Encoder].
54pub trait AudioInputDevice {
55    /// Starts the device.
56    fn play(&self) -> Result<(), AudioError>;
57    /// Stops the device.
58    fn pause(&self) -> Result<(), AudioError>;
59    /// Sets the input volume of the device.
60    fn set_volume(&self, volume: f32);
61    /// Returns a receiver to this device's values.
62    fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>>;
63    /// The amount of channels this device has.
64    fn num_channels(&self) -> usize;
65}
66
67pub struct DefaultAudioInputDevice {
68    stream: cpal::Stream,
69    sample_receiver: Option<futures_channel::mpsc::Receiver<Vec<u8>>>,
70    volume_sender: watch::Sender<f32>,
71    channels: u16,
72}
73
74impl DefaultAudioInputDevice {
75    /// Initializes the default audio input.
76    pub fn new(
77        input_volume: f32,
78        disable_noise_gate: bool,
79        phase_watcher: watch::Receiver<StatePhase>,
80        frame_size: u32, // blocks of 2.5 ms
81    ) -> Result<Self, AudioError> {
82        let sample_rate = SampleRate(SAMPLE_RATE);
83
84        let host = cpal::default_host();
85
86        let input_device = host
87            .default_input_device()
88            .ok_or(AudioError::NoDevice(AudioStream::Input))?;
89        let input_supported_config = input_device
90            .supported_input_configs()
91            .map_err(|e| AudioError::NoConfigs(AudioStream::Input, e))?
92            .find_map(|c| {
93                if c.min_sample_rate() <= sample_rate && c.max_sample_rate() >= sample_rate {
94                    Some(c)
95                } else {
96                    None
97                }
98            })
99            .ok_or(AudioError::NoSupportedConfig(AudioStream::Input))?
100            .with_sample_rate(sample_rate);
101        let input_supported_sample_format = input_supported_config.sample_format();
102        let input_config: StreamConfig = input_supported_config.into();
103
104        let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
105
106        let (sample_sender, sample_receiver) = futures_channel::mpsc::channel(1_000_000);
107
108        let (volume_sender, input_volume_receiver) = watch::channel::<f32>(input_volume);
109
110        let opus_encoder = opus::Encoder::new(
111            sample_rate.0,
112            match input_config.channels {
113                1 => opus::Channels::Mono,
114                2 => opus::Channels::Stereo,
115                _ => unimplemented!(
116                    "Only 1 or 2 channels supported, got {}",
117                    input_config.channels
118                ),
119            },
120            opus::Application::Voip,
121        )
122        .unwrap();
123        let buffer_size = (sample_rate.0 * frame_size / 400) as usize;
124
125        let transformers = if disable_noise_gate {
126            vec![]
127        } else {
128            vec![Box::new(NoiseGate::new(50)) as Box<dyn Transformer + Send + 'static>]
129        };
130
131        let input_stream = match input_supported_sample_format {
132            SampleFormat::F32 => input_device.build_input_stream(
133                &input_config,
134                callback::<f32>(
135                    sample_sender,
136                    transformers,
137                    opus_encoder,
138                    buffer_size,
139                    input_volume_receiver,
140                    phase_watcher,
141                ),
142                err_fn,
143            ),
144            SampleFormat::I16 => input_device.build_input_stream(
145                &input_config,
146                callback::<i16>(
147                    sample_sender,
148                    transformers,
149                    opus_encoder,
150                    buffer_size,
151                    input_volume_receiver,
152                    phase_watcher,
153                ),
154                err_fn,
155            ),
156            SampleFormat::U16 => input_device.build_input_stream(
157                &input_config,
158                callback::<u16>(
159                    sample_sender,
160                    transformers,
161                    opus_encoder,
162                    buffer_size,
163                    input_volume_receiver,
164                    phase_watcher,
165                ),
166                err_fn,
167            ),
168        }
169        .map_err(|e| AudioError::InvalidStream(AudioStream::Input, e))?;
170
171        let res = Self {
172            stream: input_stream,
173            sample_receiver: Some(sample_receiver),
174            volume_sender,
175            channels: input_config.channels,
176        };
177        Ok(res)
178    }
179}
180
181impl AudioInputDevice for DefaultAudioInputDevice {
182    fn play(&self) -> Result<(), AudioError> {
183        self.stream.play().map_err(AudioError::InputPlayError)
184    }
185
186    fn pause(&self) -> Result<(), AudioError> {
187        self.stream.pause().map_err(AudioError::InputPauseError)
188    }
189
190    fn set_volume(&self, volume: f32) {
191        self.volume_sender.send(volume).unwrap();
192    }
193
194    fn sample_receiver(&mut self) -> Option<futures_channel::mpsc::Receiver<Vec<u8>>> {
195        self.sample_receiver.take()
196    }
197
198    fn num_channels(&self) -> usize {
199        self.channels as usize
200    }
201}
202
203impl Debug for DefaultAudioInputDevice {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("DefaultAudioInputDevice")
206            .field("sample_receiver", &self.sample_receiver)
207            .field("channels", &self.channels)
208            .field("volume_sender", &self.volume_sender)
209            .field("stream", &"cpal::Stream")
210            .finish()
211    }
212}