pub(crate) mod controls;
pub(crate) mod monitoring;
pub(crate) mod types;
use std::{collections::HashMap, sync::Arc};
use controls::AudioStreamController;
use derive_more::Debug;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
pub(crate) use types::{AudioStreamParams, LiveAudioStreamParams};
use wayle_core::Property;
use wayle_traits::{ModelMonitoring, Reactive};
use crate::{
backend::{
commands::Command,
types::{CommandSender, EventSender},
},
error::Error,
types::{
device::DeviceKey,
format::{ChannelMap, SampleSpec},
stream::{MediaInfo, StreamInfo, StreamKey, StreamState},
},
volume::types::Volume,
};
#[derive(Clone, Debug)]
pub struct AudioStream {
#[debug(skip)]
command_tx: CommandSender,
#[debug(skip)]
event_tx: Option<EventSender>,
#[debug(skip)]
pub(crate) cancellation_token: Option<CancellationToken>,
pub key: StreamKey,
pub name: Property<String>,
pub application_name: Property<Option<String>>,
pub binary: Property<Option<String>>,
pub pid: Property<Option<u32>>,
pub owner_module: Property<Option<u32>>,
pub client: Property<Option<u32>>,
pub state: Property<StreamState>,
pub volume: Property<Volume>,
pub muted: Property<bool>,
pub corked: Property<bool>,
pub has_volume: Property<bool>,
pub volume_writable: Property<bool>,
pub device_index: Property<u32>,
pub sample_spec: Property<SampleSpec>,
pub channel_map: Property<ChannelMap>,
pub properties: Property<HashMap<String, String>>,
pub media: Property<MediaInfo>,
pub buffer_latency: Property<u64>,
pub device_latency: Property<u64>,
pub resample_method: Property<Option<String>>,
pub driver: Property<String>,
pub format: Property<Option<String>>,
}
impl PartialEq for AudioStream {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl Reactive for AudioStream {
type Context<'a> = AudioStreamParams<'a>;
type LiveContext<'a> = LiveAudioStreamParams<'a>;
type Error = Error;
async fn get(params: Self::Context<'_>) -> Result<Self, Self::Error> {
let (tx, rx) = oneshot::channel();
params
.command_tx
.send(Command::GetStream {
stream_key: params.stream_key,
responder: tx,
})
.map_err(|_| Error::CommandChannelDisconnected)?;
let stream_info = rx.await.map_err(|_| Error::CommandChannelDisconnected)??;
Ok(Self::from_info(
stream_info,
params.command_tx.clone(),
None,
None,
))
}
async fn get_live(params: Self::LiveContext<'_>) -> Result<Arc<Self>, Self::Error> {
let (tx, rx) = oneshot::channel();
params
.command_tx
.send(Command::GetStream {
stream_key: params.stream_key,
responder: tx,
})
.map_err(|_| Error::CommandChannelDisconnected)?;
let stream_info = rx.await.map_err(|_| Error::CommandChannelDisconnected)??;
let stream = Arc::new(Self::from_info(
stream_info,
params.command_tx.clone(),
Some(params.event_tx.clone()),
Some(params.cancellation_token.child_token()),
));
stream.clone().start_monitoring().await?;
Ok(stream)
}
}
impl AudioStream {
pub(crate) fn from_info(
info: StreamInfo,
command_tx: CommandSender,
event_tx: Option<EventSender>,
cancellation_token: Option<CancellationToken>,
) -> Self {
Self {
command_tx,
event_tx,
cancellation_token,
key: info.key(),
name: Property::new(info.name),
application_name: Property::new(info.application_name),
binary: Property::new(info.binary),
pid: Property::new(info.pid),
owner_module: Property::new(info.owner_module),
client: Property::new(info.client),
state: Property::new(info.state),
volume: Property::new(info.volume),
muted: Property::new(info.muted),
corked: Property::new(info.corked),
has_volume: Property::new(info.has_volume),
volume_writable: Property::new(info.volume_writable),
device_index: Property::new(info.device_index),
sample_spec: Property::new(info.sample_spec),
channel_map: Property::new(info.channel_map),
properties: Property::new(info.properties),
media: Property::new(info.media),
buffer_latency: Property::new(info.buffer_latency),
device_latency: Property::new(info.device_latency),
resample_method: Property::new(info.resample_method),
driver: Property::new(info.driver),
format: Property::new(info.format),
}
}
pub(crate) fn update_from_info(&self, info: &StreamInfo) {
self.name.set(info.name.clone());
self.application_name.set(info.application_name.clone());
self.binary.set(info.binary.clone());
self.pid.set(info.pid);
self.owner_module.set(info.owner_module);
self.client.set(info.client);
self.state.set(info.state);
self.volume.set(info.volume.clone());
self.muted.set(info.muted);
self.corked.set(info.corked);
self.has_volume.set(info.has_volume);
self.volume_writable.set(info.volume_writable);
self.device_index.set(info.device_index);
self.sample_spec.set(info.sample_spec.clone());
self.channel_map.set(info.channel_map.clone());
self.properties.set(info.properties.clone());
self.media.set(info.media.clone());
self.buffer_latency.set(info.buffer_latency);
self.device_latency.set(info.device_latency);
self.resample_method.set(info.resample_method.clone());
self.driver.set(info.driver.clone());
self.format.set(info.format.clone());
}
pub async fn set_volume(&self, volume: Volume) -> Result<(), Error> {
AudioStreamController::set_volume(&self.command_tx, self.key, volume).await
}
pub async fn set_mute(&self, muted: bool) -> Result<(), Error> {
AudioStreamController::set_mute(&self.command_tx, self.key, muted).await
}
pub async fn move_to_device(&self, device_key: DeviceKey) -> Result<(), Error> {
AudioStreamController::move_to_device(&self.command_tx, self.key, device_key).await
}
}