use std::sync::Arc;
use derive_more::Debug;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument};
use wayle_core::Property;
use wayle_traits::Reactive;
use zbus::Connection;
use super::core::{
device::{
input::{InputDeviceParams, LiveInputDeviceParams},
output::{LiveOutputDeviceParams, OutputDeviceParams},
},
stream::{AudioStreamParams, LiveAudioStreamParams},
};
use crate::{
backend::types::{CommandSender, EventSender},
builder::AudioServiceBuilder,
core::{
device::{input::InputDevice, output::OutputDevice},
stream::AudioStream,
},
error::Error,
types::{device::DeviceKey, stream::StreamKey},
};
#[derive(Debug)]
pub struct AudioService {
#[debug(skip)]
pub(crate) command_tx: CommandSender,
#[debug(skip)]
pub(crate) event_tx: EventSender,
#[debug(skip)]
pub(crate) cancellation_token: CancellationToken,
#[debug(skip)]
pub(crate) backend_handle: Option<JoinHandle<Result<(), Error>>>,
#[debug(skip)]
pub(crate) _connection: Option<Connection>,
pub output_devices: Property<Vec<Arc<OutputDevice>>>,
pub input_devices: Property<Vec<Arc<InputDevice>>>,
pub default_output: Property<Option<Arc<OutputDevice>>>,
pub default_input: Property<Option<Arc<InputDevice>>>,
pub playback_streams: Property<Vec<Arc<AudioStream>>>,
pub recording_streams: Property<Vec<Arc<AudioStream>>>,
}
impl AudioService {
#[instrument]
pub async fn new() -> Result<Arc<Self>, Error> {
Self::builder().build().await
}
pub fn builder() -> AudioServiceBuilder {
AudioServiceBuilder::new()
}
#[instrument(skip(self), fields(device_key = ?key), err)]
pub async fn output_device(&self, key: DeviceKey) -> Result<OutputDevice, Error> {
OutputDevice::get(OutputDeviceParams {
command_tx: &self.command_tx,
device_key: key,
})
.await
}
#[instrument(skip(self), fields(device_key = ?key), err)]
pub async fn output_device_monitored(
&self,
key: DeviceKey,
) -> Result<Arc<OutputDevice>, Error> {
OutputDevice::get_live(LiveOutputDeviceParams {
command_tx: &self.command_tx,
event_tx: &self.event_tx,
device_key: key,
cancellation_token: &self.cancellation_token,
})
.await
}
#[instrument(skip(self), fields(device_key = ?key), err)]
pub async fn input_device(&self, key: DeviceKey) -> Result<InputDevice, Error> {
InputDevice::get(InputDeviceParams {
command_tx: &self.command_tx,
device_key: key,
})
.await
}
#[instrument(skip(self), fields(device_key = ?key), err)]
pub async fn input_device_monitored(&self, key: DeviceKey) -> Result<Arc<InputDevice>, Error> {
InputDevice::get_live(LiveInputDeviceParams {
command_tx: &self.command_tx,
event_tx: &self.event_tx,
device_key: key,
cancellation_token: &self.cancellation_token,
})
.await
}
#[instrument(skip(self), fields(stream_key = ?key), err)]
pub async fn audio_stream(&self, key: StreamKey) -> Result<AudioStream, Error> {
AudioStream::get(AudioStreamParams {
command_tx: &self.command_tx,
stream_key: key,
})
.await
}
#[instrument(skip(self), fields(stream_key = ?key), err)]
pub async fn audio_stream_monitored(&self, key: StreamKey) -> Result<Arc<AudioStream>, Error> {
AudioStream::get_live(LiveAudioStreamParams {
command_tx: &self.command_tx,
event_tx: &self.event_tx,
stream_key: key,
cancellation_token: &self.cancellation_token,
})
.await
}
}
impl Drop for AudioService {
fn drop(&mut self) {
self.cancellation_token.cancel();
let Some(handle) = self.backend_handle.take() else {
return;
};
let Ok(rt) = tokio::runtime::Handle::try_current() else {
return;
};
let result = tokio::task::block_in_place(|| rt.block_on(handle));
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => error!(error = %e, "PulseAudio backend shutdown error"),
Err(e) => error!(error = %e, "PulseAudio backend task panicked"),
}
}
}