mum_cli/audio/
output.rs

1//! Receives audio packets from the networking and plays them.
2
3use crate::audio::SAMPLE_RATE;
4use crate::error::{AudioError, AudioStream};
5use crate::network::VoiceStreamType;
6
7use bytes::Bytes;
8use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
9use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
10use dasp_ring_buffer::Bounded;
11use log::*;
12use mumble_protocol::voice::VoicePacketPayload;
13use std::collections::{HashMap, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::ops::AddAssign;
17use std::sync::{Arc, Mutex};
18use tokio::sync::watch;
19
20type ClientStreamKey = (VoiceStreamType, u32);
21
22/// State for decoding audio received from another user.
23#[derive(Debug)]
24pub struct ClientAudioData {
25    buf: Bounded<Vec<f32>>,
26    output_channels: opus::Channels,
27    // We need both since a client can hypothetically send both mono
28    // and stereo packets, and we can't switch a decoder on the fly
29    // to reuse it.
30    mono_decoder: opus::Decoder,
31    stereo_decoder: opus::Decoder,
32}
33
34impl ClientAudioData {
35    pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self {
36        Self {
37            mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(),
38            stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(),
39            output_channels,
40            buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), //buffer 1 s of audio
41        }
42    }
43
44    pub fn store_packet(&mut self, bytes: Bytes) {
45        let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap();
46        let (decoder, channels) = match packet_channels {
47            opus::Channels::Mono => (&mut self.mono_decoder, 1),
48            opus::Channels::Stereo => (&mut self.stereo_decoder, 2),
49        };
50        let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; //720 is because that is the max size of packet we can get that we want to decode
51        let parsed = decoder
52            .decode_float(&bytes, &mut out, false)
53            .expect("Error decoding");
54        out.truncate(parsed);
55        match (packet_channels, self.output_channels) {
56            (opus::Channels::Mono, opus::Channels::Mono)
57            | (opus::Channels::Stereo, opus::Channels::Stereo) => {
58                for sample in out {
59                    self.buf.push(sample);
60                }
61            }
62            (opus::Channels::Mono, opus::Channels::Stereo) => {
63                for sample in out {
64                    self.buf.push(sample);
65                    self.buf.push(sample);
66                }
67            }
68            (opus::Channels::Stereo, opus::Channels::Mono) => {
69                for sample in out.into_iter().step_by(2) {
70                    self.buf.push(sample);
71                }
72            }
73        }
74    }
75}
76
77/// Collected state for client opus decoders and sound effects.
78#[derive(Debug)]
79pub struct ClientStream {
80    buffer_clients: HashMap<ClientStreamKey, ClientAudioData>,
81    buffer_effects: VecDeque<f32>,
82    sample_rate: u32,
83    output_channels: opus::Channels,
84}
85
86impl ClientStream {
87    pub fn new(sample_rate: u32, channels: u16) -> Self {
88        let channels = match channels {
89            1 => opus::Channels::Mono,
90            2 => opus::Channels::Stereo,
91            _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
92        };
93        Self {
94            buffer_clients: HashMap::new(),
95            buffer_effects: VecDeque::new(),
96            sample_rate,
97            output_channels: channels,
98        }
99    }
100
101    fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData {
102        self.buffer_clients
103            .entry(client)
104            .or_insert(ClientAudioData::new(self.sample_rate, self.output_channels))
105    }
106
107    /// Decodes a voice packet.
108    pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) {
109        match payload {
110            VoicePacketPayload::Opus(bytes, _eot) => {
111                self.get_client(client).store_packet(bytes);
112            }
113            _ => {
114                unimplemented!("Payload type not supported");
115            }
116        }
117    }
118
119    /// Extends the sound effect buffer queue with some received values.
120    pub fn add_sound_effect(&mut self, values: &[f32]) {
121        self.buffer_effects.extend(values.iter().copied());
122    }
123}
124
125/// Adds two values in some saturating way.
126///
127/// Since we support [f32], [i16] and [u16] we need some way of adding two values
128/// without peaking above/below the edge values. This trait ensures that we can
129/// use all three primitive types as a generic parameter.
130pub trait SaturatingAdd {
131    /// Adds two values in some saturating way. See trait documentation.
132    fn saturating_add(self, rhs: Self) -> Self;
133}
134
135impl SaturatingAdd for f32 {
136    fn saturating_add(self, rhs: Self) -> Self {
137        match self + rhs {
138            a if a < -1.0 => -1.0,
139            a if a > 1.0 => 1.0,
140            a => a,
141        }
142    }
143}
144
145impl SaturatingAdd for i16 {
146    fn saturating_add(self, rhs: Self) -> Self {
147        i16::saturating_add(self, rhs)
148    }
149}
150
151impl SaturatingAdd for u16 {
152    fn saturating_add(self, rhs: Self) -> Self {
153        u16::saturating_add(self, rhs)
154    }
155}
156
157pub trait AudioOutputDevice {
158    fn play(&self) -> Result<(), AudioError>;
159    fn pause(&self) -> Result<(), AudioError>;
160    fn set_volume(&self, volume: f32);
161    fn num_channels(&self) -> usize;
162    fn client_streams(&self) -> Arc<Mutex<ClientStream>>;
163}
164
165/// The default audio output device, as determined by [cpal].
166pub struct DefaultAudioOutputDevice {
167    config: StreamConfig,
168    stream: cpal::Stream,
169    /// The client stream per user ID. A separate stream is kept for UDP and TCP.
170    ///
171    /// Shared with [super::AudioOutput].
172    client_streams: Arc<Mutex<ClientStream>>,
173    /// Output volume configuration.
174    volume_sender: watch::Sender<f32>,
175}
176
177impl DefaultAudioOutputDevice {
178    /// Initializes the default audio output.
179    pub fn new(
180        output_volume: f32,
181        user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
182    ) -> Result<Self, AudioError> {
183        let sample_rate = SampleRate(SAMPLE_RATE);
184
185        let host = cpal::default_host();
186        let output_device = host
187            .default_output_device()
188            .ok_or(AudioError::NoDevice(AudioStream::Output))?;
189        let output_supported_config = output_device
190            .supported_output_configs()
191            .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))?
192            .find_map(|c| {
193                if c.min_sample_rate() <= sample_rate
194                    && c.max_sample_rate() >= sample_rate
195                    && c.channels() == 2
196                {
197                    Some(c)
198                } else {
199                    None
200                }
201            })
202            .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))?
203            .with_sample_rate(sample_rate);
204        let output_supported_sample_format = output_supported_config.sample_format();
205        let output_config: StreamConfig = output_supported_config.into();
206        let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(
207            sample_rate.0,
208            output_config.channels,
209        )));
210
211        let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
212
213        let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
214
215        let output_stream = match output_supported_sample_format {
216            SampleFormat::F32 => output_device.build_output_stream(
217                &output_config,
218                callback::<f32>(
219                    Arc::clone(&client_streams),
220                    output_volume_receiver,
221                    user_volumes,
222                ),
223                err_fn,
224            ),
225            SampleFormat::I16 => output_device.build_output_stream(
226                &output_config,
227                callback::<i16>(
228                    Arc::clone(&client_streams),
229                    output_volume_receiver,
230                    user_volumes,
231                ),
232                err_fn,
233            ),
234            SampleFormat::U16 => output_device.build_output_stream(
235                &output_config,
236                callback::<u16>(
237                    Arc::clone(&client_streams),
238                    output_volume_receiver,
239                    user_volumes,
240                ),
241                err_fn,
242            ),
243        }
244        .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?;
245
246        Ok(Self {
247            config: output_config,
248            stream: output_stream,
249            volume_sender: output_volume_sender,
250            client_streams,
251        })
252    }
253}
254
255impl AudioOutputDevice for DefaultAudioOutputDevice {
256    fn play(&self) -> Result<(), AudioError> {
257        self.stream.play().map_err(AudioError::OutputPlayError)
258    }
259
260    fn pause(&self) -> Result<(), AudioError> {
261        self.stream.pause().map_err(AudioError::OutputPauseError)
262    }
263
264    fn set_volume(&self, volume: f32) {
265        self.volume_sender.send(volume).unwrap();
266    }
267
268    fn num_channels(&self) -> usize {
269        self.config.channels as usize
270    }
271
272    fn client_streams(&self) -> Arc<Mutex<ClientStream>> {
273        Arc::clone(&self.client_streams)
274    }
275}
276
277/// Returns a function that fills a buffer with audio from client streams
278/// modified according to some audio configuration.
279pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
280    user_bufs: Arc<Mutex<ClientStream>>,
281    output_volume_receiver: watch::Receiver<f32>,
282    user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
283) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
284    move |data: &mut [T], _info: &OutputCallbackInfo| {
285        for sample in data.iter_mut() {
286            *sample = Sample::from(&0.0);
287        }
288
289        let volume = *output_volume_receiver.borrow();
290
291        let mut user_bufs = user_bufs.lock().unwrap();
292        let user_volumes = user_volumes.lock().unwrap();
293        for (k, v) in user_bufs.buffer_clients.iter_mut() {
294            let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false));
295            if !muted {
296                for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) {
297                    *sample = sample.saturating_add(Sample::from(&(val * volume * user_volume)));
298                }
299            }
300        }
301        for sample in data.iter_mut() {
302            *sample = sample.saturating_add(Sample::from(
303                &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume),
304            ));
305        }
306    }
307}
308
309impl Debug for DefaultAudioOutputDevice {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        f.debug_struct("DefaultAudioInputDevice")
312            .field("client_streams", &self.client_streams)
313            .field("config", &self.config)
314            .field("volume_sender", &self.volume_sender)
315            .field("stream", &"cpal::Stream")
316            .finish()
317    }
318}