Skip to main content

selene_daemon/player/
playback.rs

1use std::{
2    collections::VecDeque,
3    sync::{
4        Arc,
5        mpsc::{Receiver, SendError, Sender, channel},
6    },
7};
8
9use cpal::{
10    Device, DeviceDescription, Stream, SupportedStreamConfig,
11    traits::{DeviceTrait, HostTrait, StreamTrait},
12};
13use lunar_lib::debug;
14use ringbuf::{
15    CachingCons, CachingProd, HeapRb, SharedRb,
16    storage::Heap,
17    traits::{Consumer, Producer, Split},
18};
19
20use crate::{config::daemon_config, player::PlayerError};
21
22pub struct DeviceConfig {
23    pub device: Device,
24    pub config: SupportedStreamConfig,
25}
26
27impl DeviceConfig {
28    pub fn default_config() -> Result<Self, CpalError> {
29        let host = cpal::default_host();
30        let device = host
31            .default_output_device()
32            .ok_or(CpalError::NoDefaultDevice)?;
33        let config = device.default_output_config()?;
34
35        Ok(Self { device, config })
36    }
37}
38
39pub struct CpalHandle {
40    pub(crate) _stream: Stream,
41    pub tx: Sender<()>,
42    pub audio_buf: CachingProd<Arc<SharedRb<Heap<f32>>>>,
43    pub(crate) pending_packet: VecDeque<f32>,
44}
45
46impl Drop for CpalHandle {
47    fn drop(&mut self) {
48        debug!("Cpal handle was dropped");
49    }
50}
51
52impl CpalHandle {
53    pub fn open(device_config: &DeviceConfig) -> Result<Self, PlayerError> {
54        let (tx, rx) = channel();
55
56        let rb = HeapRb::new(daemon_config().playback.audio_buffer_size * 1024);
57        let (prod, cons) = rb.split();
58
59        let stream = open_cpal_stream(cons, rx, device_config)?;
60
61        Ok(Self {
62            _stream: stream,
63            tx,
64            audio_buf: prod,
65            pending_packet: VecDeque::new(),
66        })
67    }
68
69    pub fn clear_buf(&self) -> Result<(), SendError<()>> {
70        self.tx.send(())
71    }
72
73    /// Attempts to consume the rest of the pending packet.
74    ///
75    /// Returns [`None`] if there is no packet
76    /// Returns [`true`] if there is a packet and it finished consuming
77    /// Returns [`false`] if there is a packet and it has not finished consuming
78    pub fn consume_packet(&mut self, volume: f32) -> Option<bool> {
79        if !self.pending_packet.is_empty() {
80            let pushed = self
81                .audio_buf
82                .push_iter(self.pending_packet.iter().map(|a| a * (volume.powf(3.0))));
83
84            self.pending_packet.drain(..pushed);
85            return Some(self.pending_packet.is_empty());
86        }
87        None
88    }
89}
90
91#[derive(Debug, thiserror::Error)]
92pub enum CpalError {
93    #[error("{0}")]
94    PlayStreamError(#[from] cpal::PlayStreamError),
95
96    #[error("{0}")]
97    BuildStreamError(#[from] cpal::BuildStreamError),
98
99    #[error("{0}")]
100    DefaultStreamConfigError(#[from] cpal::DefaultStreamConfigError),
101
102    #[error("{0}")]
103    DeviceIdError(#[from] cpal::DeviceIdError),
104
105    #[error("Failed to find the default device")]
106    NoDefaultDevice,
107}
108
109fn open_cpal_stream(
110    mut audio_buf: CachingCons<Arc<SharedRb<Heap<f32>>>>,
111    rx: Receiver<()>,
112    device_config: &DeviceConfig,
113) -> Result<Stream, CpalError> {
114    let data_callback = move |output: &mut [f32], _: &cpal::OutputCallbackInfo| {
115        if let Ok(()) = rx.try_recv() {
116            audio_buf.clear();
117        }
118
119        for sample in output.iter_mut() {
120            *sample = audio_buf.try_pop().unwrap_or(0.0);
121        }
122    };
123
124    let error_callback = |err| eprintln!("Audio stream error: {err:?}");
125
126    let stream = device_config.device.build_output_stream(
127        &device_config.config.clone().into(),
128        data_callback,
129        error_callback,
130        None,
131    )?;
132
133    debug!(
134        "CPAL stream opened for {}",
135        device_config
136            .device
137            .description()
138            .as_ref()
139            .map_or("Unknown Device", DeviceDescription::name)
140    );
141    stream.play()?;
142
143    Ok(stream)
144}