1use crate::audio::SAMPLE_RATE;
4use crate::error::{AudioError, AudioStream};
5use crate::network::VoiceStreamType;
6
7use bytes::Bytes;
8use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
9use cpal::{OutputCallbackInfo, Sample, SampleFormat, SampleRate, StreamConfig};
10use dasp_ring_buffer::Bounded;
11use log::*;
12use mumble_protocol::voice::VoicePacketPayload;
13use std::collections::{HashMap, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::ops::AddAssign;
17use std::sync::{Arc, Mutex};
18use tokio::sync::watch;
19
20type ClientStreamKey = (VoiceStreamType, u32);
21
22#[derive(Debug)]
24pub struct ClientAudioData {
25 buf: Bounded<Vec<f32>>,
26 output_channels: opus::Channels,
27 mono_decoder: opus::Decoder,
31 stereo_decoder: opus::Decoder,
32}
33
34impl ClientAudioData {
35 pub fn new(sample_rate: u32, output_channels: opus::Channels) -> Self {
36 Self {
37 mono_decoder: opus::Decoder::new(sample_rate, opus::Channels::Mono).unwrap(),
38 stereo_decoder: opus::Decoder::new(sample_rate, opus::Channels::Stereo).unwrap(),
39 output_channels,
40 buf: Bounded::from_full(vec![0.0; sample_rate as usize * output_channels as usize]), }
42 }
43
44 pub fn store_packet(&mut self, bytes: Bytes) {
45 let packet_channels = opus::packet::get_nb_channels(&bytes).unwrap();
46 let (decoder, channels) = match packet_channels {
47 opus::Channels::Mono => (&mut self.mono_decoder, 1),
48 opus::Channels::Stereo => (&mut self.stereo_decoder, 2),
49 };
50 let mut out: Vec<f32> = vec![0.0; 720 * channels * 4]; let parsed = decoder
52 .decode_float(&bytes, &mut out, false)
53 .expect("Error decoding");
54 out.truncate(parsed);
55 match (packet_channels, self.output_channels) {
56 (opus::Channels::Mono, opus::Channels::Mono)
57 | (opus::Channels::Stereo, opus::Channels::Stereo) => {
58 for sample in out {
59 self.buf.push(sample);
60 }
61 }
62 (opus::Channels::Mono, opus::Channels::Stereo) => {
63 for sample in out {
64 self.buf.push(sample);
65 self.buf.push(sample);
66 }
67 }
68 (opus::Channels::Stereo, opus::Channels::Mono) => {
69 for sample in out.into_iter().step_by(2) {
70 self.buf.push(sample);
71 }
72 }
73 }
74 }
75}
76
77#[derive(Debug)]
79pub struct ClientStream {
80 buffer_clients: HashMap<ClientStreamKey, ClientAudioData>,
81 buffer_effects: VecDeque<f32>,
82 sample_rate: u32,
83 output_channels: opus::Channels,
84}
85
86impl ClientStream {
87 pub fn new(sample_rate: u32, channels: u16) -> Self {
88 let channels = match channels {
89 1 => opus::Channels::Mono,
90 2 => opus::Channels::Stereo,
91 _ => unimplemented!("Only 1 or 2 channels supported, got {}", channels),
92 };
93 Self {
94 buffer_clients: HashMap::new(),
95 buffer_effects: VecDeque::new(),
96 sample_rate,
97 output_channels: channels,
98 }
99 }
100
101 fn get_client(&mut self, client: ClientStreamKey) -> &mut ClientAudioData {
102 self.buffer_clients
103 .entry(client)
104 .or_insert(ClientAudioData::new(self.sample_rate, self.output_channels))
105 }
106
107 pub fn decode_packet(&mut self, client: ClientStreamKey, payload: VoicePacketPayload) {
109 match payload {
110 VoicePacketPayload::Opus(bytes, _eot) => {
111 self.get_client(client).store_packet(bytes);
112 }
113 _ => {
114 unimplemented!("Payload type not supported");
115 }
116 }
117 }
118
119 pub fn add_sound_effect(&mut self, values: &[f32]) {
121 self.buffer_effects.extend(values.iter().copied());
122 }
123}
124
125pub trait SaturatingAdd {
131 fn saturating_add(self, rhs: Self) -> Self;
133}
134
135impl SaturatingAdd for f32 {
136 fn saturating_add(self, rhs: Self) -> Self {
137 match self + rhs {
138 a if a < -1.0 => -1.0,
139 a if a > 1.0 => 1.0,
140 a => a,
141 }
142 }
143}
144
145impl SaturatingAdd for i16 {
146 fn saturating_add(self, rhs: Self) -> Self {
147 i16::saturating_add(self, rhs)
148 }
149}
150
151impl SaturatingAdd for u16 {
152 fn saturating_add(self, rhs: Self) -> Self {
153 u16::saturating_add(self, rhs)
154 }
155}
156
157pub trait AudioOutputDevice {
158 fn play(&self) -> Result<(), AudioError>;
159 fn pause(&self) -> Result<(), AudioError>;
160 fn set_volume(&self, volume: f32);
161 fn num_channels(&self) -> usize;
162 fn client_streams(&self) -> Arc<Mutex<ClientStream>>;
163}
164
165pub struct DefaultAudioOutputDevice {
167 config: StreamConfig,
168 stream: cpal::Stream,
169 client_streams: Arc<Mutex<ClientStream>>,
173 volume_sender: watch::Sender<f32>,
175}
176
177impl DefaultAudioOutputDevice {
178 pub fn new(
180 output_volume: f32,
181 user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
182 ) -> Result<Self, AudioError> {
183 let sample_rate = SampleRate(SAMPLE_RATE);
184
185 let host = cpal::default_host();
186 let output_device = host
187 .default_output_device()
188 .ok_or(AudioError::NoDevice(AudioStream::Output))?;
189 let output_supported_config = output_device
190 .supported_output_configs()
191 .map_err(|e| AudioError::NoConfigs(AudioStream::Output, e))?
192 .find_map(|c| {
193 if c.min_sample_rate() <= sample_rate
194 && c.max_sample_rate() >= sample_rate
195 && c.channels() == 2
196 {
197 Some(c)
198 } else {
199 None
200 }
201 })
202 .ok_or(AudioError::NoSupportedConfig(AudioStream::Output))?
203 .with_sample_rate(sample_rate);
204 let output_supported_sample_format = output_supported_config.sample_format();
205 let output_config: StreamConfig = output_supported_config.into();
206 let client_streams = Arc::new(std::sync::Mutex::new(ClientStream::new(
207 sample_rate.0,
208 output_config.channels,
209 )));
210
211 let err_fn = |err| error!("An error occurred on the output audio stream: {}", err);
212
213 let (output_volume_sender, output_volume_receiver) = watch::channel::<f32>(output_volume);
214
215 let output_stream = match output_supported_sample_format {
216 SampleFormat::F32 => output_device.build_output_stream(
217 &output_config,
218 callback::<f32>(
219 Arc::clone(&client_streams),
220 output_volume_receiver,
221 user_volumes,
222 ),
223 err_fn,
224 ),
225 SampleFormat::I16 => output_device.build_output_stream(
226 &output_config,
227 callback::<i16>(
228 Arc::clone(&client_streams),
229 output_volume_receiver,
230 user_volumes,
231 ),
232 err_fn,
233 ),
234 SampleFormat::U16 => output_device.build_output_stream(
235 &output_config,
236 callback::<u16>(
237 Arc::clone(&client_streams),
238 output_volume_receiver,
239 user_volumes,
240 ),
241 err_fn,
242 ),
243 }
244 .map_err(|e| AudioError::InvalidStream(AudioStream::Output, e))?;
245
246 Ok(Self {
247 config: output_config,
248 stream: output_stream,
249 volume_sender: output_volume_sender,
250 client_streams,
251 })
252 }
253}
254
255impl AudioOutputDevice for DefaultAudioOutputDevice {
256 fn play(&self) -> Result<(), AudioError> {
257 self.stream.play().map_err(AudioError::OutputPlayError)
258 }
259
260 fn pause(&self) -> Result<(), AudioError> {
261 self.stream.pause().map_err(AudioError::OutputPauseError)
262 }
263
264 fn set_volume(&self, volume: f32) {
265 self.volume_sender.send(volume).unwrap();
266 }
267
268 fn num_channels(&self) -> usize {
269 self.config.channels as usize
270 }
271
272 fn client_streams(&self) -> Arc<Mutex<ClientStream>> {
273 Arc::clone(&self.client_streams)
274 }
275}
276
277pub fn callback<T: Sample + AddAssign + SaturatingAdd + std::fmt::Display>(
280 user_bufs: Arc<Mutex<ClientStream>>,
281 output_volume_receiver: watch::Receiver<f32>,
282 user_volumes: Arc<Mutex<HashMap<u32, (f32, bool)>>>,
283) -> impl FnMut(&mut [T], &OutputCallbackInfo) + Send + 'static {
284 move |data: &mut [T], _info: &OutputCallbackInfo| {
285 for sample in data.iter_mut() {
286 *sample = Sample::from(&0.0);
287 }
288
289 let volume = *output_volume_receiver.borrow();
290
291 let mut user_bufs = user_bufs.lock().unwrap();
292 let user_volumes = user_volumes.lock().unwrap();
293 for (k, v) in user_bufs.buffer_clients.iter_mut() {
294 let (user_volume, muted) = user_volumes.get(&k.1).cloned().unwrap_or((1.0, false));
295 if !muted {
296 for (sample, val) in data.iter_mut().zip(v.buf.drain().chain(iter::repeat(0.0))) {
297 *sample = sample.saturating_add(Sample::from(&(val * volume * user_volume)));
298 }
299 }
300 }
301 for sample in data.iter_mut() {
302 *sample = sample.saturating_add(Sample::from(
303 &(user_bufs.buffer_effects.pop_front().unwrap_or(0.0) * volume),
304 ));
305 }
306 }
307}
308
309impl Debug for DefaultAudioOutputDevice {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 f.debug_struct("DefaultAudioInputDevice")
312 .field("client_streams", &self.client_streams)
313 .field("config", &self.config)
314 .field("volume_sender", &self.volume_sender)
315 .field("stream", &"cpal::Stream")
316 .finish()
317 }
318}