use std::sync::Arc;
use crate::resource_daemon::{DaemonState, QuitSignal, ResourceDaemon};
use ::cpal::traits::StreamTrait;
#[derive(Debug, Clone)]
pub enum StreamCommand {
Pause,
Resume,
Reset,
SetVolume(f64),
}
#[derive(Debug, Clone)]
pub enum StreamResponse {
Success,
Error(String),
}
#[derive(Debug, Clone)]
pub enum StreamDaemonError {
StreamCreationFailed(String),
StreamOperationFailed(String),
DaemonStopped,
}
#[derive(Debug)]
pub struct CpalStreamDaemon {
daemon: ResourceDaemon<(), StreamDaemonError>,
shutdown_sender: Option<flume::Sender<()>>,
}
#[derive(Debug, Clone)]
pub struct StreamHandle {
command_sender: flume::Sender<(StreamCommand, flume::Sender<StreamResponse>)>,
}
impl StreamHandle {
pub async fn pause(&self) -> Result<(), StreamDaemonError> {
log::debug!("StreamHandle: sending pause command");
self.send_command(StreamCommand::Pause).await
}
pub async fn resume(&self) -> Result<(), StreamDaemonError> {
log::debug!("StreamHandle: sending resume command");
self.send_command(StreamCommand::Resume).await
}
pub async fn reset(&self) -> Result<(), StreamDaemonError> {
log::debug!("StreamHandle: sending reset command");
self.send_command(StreamCommand::Reset).await
}
pub async fn set_volume(&self, volume: f64) -> Result<(), StreamDaemonError> {
self.send_command(StreamCommand::SetVolume(volume)).await
}
async fn send_command(&self, command: StreamCommand) -> Result<(), StreamDaemonError> {
let (response_tx, response_rx) = flume::unbounded();
self.command_sender
.send_async((command, response_tx))
.await
.map_err(|_| StreamDaemonError::DaemonStopped)?;
match response_rx.recv_async().await {
Ok(StreamResponse::Success) => Ok(()),
Ok(StreamResponse::Error(err)) => Err(StreamDaemonError::StreamOperationFailed(err)),
Err(_) => Err(StreamDaemonError::DaemonStopped),
}
}
}
impl CpalStreamDaemon {
pub fn new<F>(
stream_factory: F,
volume_atomic: Arc<std::sync::RwLock<Arc<atomic_float::AtomicF64>>>,
) -> Result<(Self, StreamHandle), StreamDaemonError>
where
F: FnOnce() -> Result<::cpal::Stream, String> + Send + 'static,
{
let (command_tx, command_rx) = flume::unbounded();
let (quit_tx, quit_rx) = flume::unbounded::<()>();
let daemon = ResourceDaemon::new(move |quit_signal: QuitSignal<StreamDaemonError>| {
log::debug!("CPAL stream daemon: starting daemon thread");
let stream = stream_factory().map_err(|e| {
log::error!("CPAL stream daemon: stream creation failed: {e}");
StreamDaemonError::StreamCreationFailed(e)
})?;
log::debug!("CPAL stream daemon: stream created successfully, starting playback");
if let Err(e) = stream.play() {
log::error!("CPAL stream daemon: failed to start stream playback: {e:?}");
return Err(StreamDaemonError::StreamCreationFailed(format!(
"Failed to start stream: {e:?}"
)));
}
log::debug!("CPAL stream daemon: stream playback started");
Self::run_command_loop(&stream, &command_rx, &quit_rx, &volume_atomic, &quit_signal);
Ok(())
});
let handle = StreamHandle {
command_sender: command_tx,
};
let stream_daemon = Self {
daemon,
shutdown_sender: Some(quit_tx),
};
Ok((stream_daemon, handle))
}
#[must_use]
pub fn state(&self) -> DaemonState<StreamDaemonError> {
self.daemon.state()
}
pub fn quit(&mut self, reason: StreamDaemonError) {
log::debug!("CpalStreamDaemon: quit called, sending quit signal");
if let Some(quit_sender) = self.shutdown_sender.take()
&& let Err(e) = quit_sender.send(())
{
log::debug!("CpalStreamDaemon: failed to send quit signal: {e}");
}
self.daemon.quit(reason);
}
fn run_command_loop(
stream: &::cpal::Stream,
command_rx: &flume::Receiver<(StreamCommand, flume::Sender<StreamResponse>)>,
quit_rx: &flume::Receiver<()>,
volume_atomic: &Arc<std::sync::RwLock<Arc<atomic_float::AtomicF64>>>,
quit_signal: &QuitSignal<StreamDaemonError>,
) {
log::debug!("CPAL stream daemon: starting command loop");
loop {
let should_exit = flume::Selector::new()
.recv(command_rx, |result| {
if let Ok((command, response_tx)) = result {
log::trace!("CPAL stream daemon: processing command: {command:?}");
let response = match command {
StreamCommand::Pause => match stream.pause() {
Ok(()) => {
log::debug!("CPAL stream daemon: stream paused");
StreamResponse::Success
}
Err(e) => {
log::error!(
"CPAL stream daemon: failed to pause stream: {e:?}"
);
StreamResponse::Error(format!("Failed to pause stream: {e:?}"))
}
},
StreamCommand::Resume => match stream.play() {
Ok(()) => {
log::debug!("CPAL stream daemon: stream resumed");
StreamResponse::Success
}
Err(e) => {
log::error!(
"CPAL stream daemon: failed to resume stream: {e:?}"
);
StreamResponse::Error(format!("Failed to resume stream: {e:?}"))
}
},
StreamCommand::Reset => match stream.pause() {
Ok(()) => {
log::debug!("CPAL stream daemon: stream reset (paused)");
StreamResponse::Success
}
Err(e) => {
log::error!(
"CPAL stream daemon: failed to reset stream: {e:?}"
);
StreamResponse::Error(format!("Failed to reset stream: {e:?}"))
}
},
StreamCommand::SetVolume(volume) => {
volume_atomic
.read()
.unwrap()
.store(volume, std::sync::atomic::Ordering::SeqCst);
log::debug!("CPAL stream daemon: volume set to {volume}");
StreamResponse::Success
}
};
if let Err(e) = response_tx.send(response) {
log::warn!("CPAL stream daemon: failed to send response: {e}");
quit_signal.dispatch(StreamDaemonError::DaemonStopped);
return true; }
false } else {
log::debug!(
"CPAL stream daemon: command channel closed, exiting command loop"
);
true }
})
.recv(quit_rx, |_result| true)
.wait();
if should_exit {
break;
}
}
log::debug!("CPAL stream daemon: command loop ended - daemon thread shutting down");
}
}
impl Drop for CpalStreamDaemon {
fn drop(&mut self) {
log::debug!("CpalStreamDaemon: Drop called, sending quit signal for immediate shutdown");
if let Some(quit_sender) = self.shutdown_sender.take() {
if let Err(e) = quit_sender.send(()) {
log::debug!(
"CpalStreamDaemon: failed to send quit signal (daemon may already be stopped): {e}"
);
} else {
log::debug!("CpalStreamDaemon: quit signal sent successfully");
}
}
}
}
unsafe impl Send for CpalStreamDaemon {}
unsafe impl Sync for CpalStreamDaemon {}
#[cfg(test)]
mod tests {
use super::*;
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_pause_channel_disconnected() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
drop(rx);
let result = handle.pause().await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StreamDaemonError::DaemonStopped
));
}
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_resume_channel_disconnected() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
drop(rx);
let result = handle.resume().await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StreamDaemonError::DaemonStopped
));
}
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_reset_channel_disconnected() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
drop(rx);
let result = handle.reset().await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StreamDaemonError::DaemonStopped
));
}
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_set_volume_channel_disconnected() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
drop(rx);
let result = handle.set_volume(0.5).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StreamDaemonError::DaemonStopped
));
}
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_success_response() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
switchy_async::task::spawn(async move {
if let Ok((cmd, response_tx)) = rx.recv_async().await {
assert!(matches!(cmd, StreamCommand::Pause));
let _ = response_tx.send_async(StreamResponse::Success).await;
}
});
let result = handle.pause().await;
assert!(result.is_ok());
}
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_error_response() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
switchy_async::task::spawn(async move {
if let Ok((_cmd, response_tx)) = rx.recv_async().await {
let _ = response_tx
.send_async(StreamResponse::Error("mock error".to_string()))
.await;
}
});
let result = handle.resume().await;
assert!(result.is_err());
assert!(
matches!(result.unwrap_err(), StreamDaemonError::StreamOperationFailed(msg) if msg == "mock error")
);
}
#[test_log::test(switchy_async::test)]
async fn test_stream_handle_response_channel_dropped() {
let (tx, rx) = flume::unbounded::<(StreamCommand, flume::Sender<StreamResponse>)>();
let handle = StreamHandle { command_sender: tx };
switchy_async::task::spawn(async move {
if let Ok((_cmd, response_tx)) = rx.recv_async().await {
drop(response_tx);
}
});
let result = handle.reset().await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StreamDaemonError::DaemonStopped
));
}
}