wayle-audio 0.1.4

PulseAudio service with reactive state
Documentation
pub(crate) mod commands;
pub(crate) mod conversion;
pub(crate) mod dispatcher;
pub(crate) mod events;
pub(crate) mod types;

use std::{
    collections::HashMap,
    future::poll_fn,
    sync::{Arc, RwLock},
};

use commands::Command;
use conversion::volume::to_pulse as convert_volume_to_pulse;
use dispatcher::{handle_external_command, handle_internal_command};
use libpulse_binding::context::{Context, FlagSet as ContextFlags};
use tokio::{
    runtime::Handle,
    sync::mpsc,
    task::{JoinHandle, spawn, spawn_blocking},
};
use tokio_util::sync::CancellationToken;
use tracing::info;
use types::{
    CommandReceiver, DefaultDevice, DeviceStore, EventSender, ExternalCommand, InternalRefresh,
    StreamStore,
};

use crate::{Error, tokio_mainloop::TokioMain};

struct BackendState {
    devices: DeviceStore,
    streams: StreamStore,
    default_input: DefaultDevice,
    default_output: DefaultDevice,
}

impl BackendState {
    fn new() -> Self {
        Self {
            devices: Arc::new(RwLock::new(HashMap::new())),
            streams: Arc::new(RwLock::new(HashMap::new())),
            default_input: Arc::new(RwLock::new(None)),
            default_output: Arc::new(RwLock::new(None)),
        }
    }
}

pub(crate) struct PulseBackend {
    state: BackendState,
    mainloop: TokioMain,
    context: Context,
}

impl PulseBackend {
    pub fn start(
        command_rx: CommandReceiver,
        event_tx: EventSender,
        cancellation_token: CancellationToken,
    ) -> JoinHandle<Result<(), Error>> {
        spawn_blocking(move || {
            let runtime = Handle::current();

            runtime.block_on(async move {
                info!("Starting PulseAudio backend");

                let backend = Self::new().await?;
                backend.run(command_rx, event_tx, cancellation_token).await
            })
        })
    }

    async fn new() -> Result<Self, Error> {
        use libpulse_binding::context::State;

        let mut mainloop = TokioMain::new();
        info!("Creating PulseAudio context");
        let mut context =
            Context::new(&mainloop, "wayle-pulse").ok_or(Error::ContextCreationFailed)?;

        info!("Connecting to PulseAudio server");
        context
            .connect(None, ContextFlags::NOFLAGS, None)
            .map_err(Error::ConnectionFailed)?;

        info!("Waiting for PulseAudio context to become ready");
        let state = mainloop
            .wait_for_ready(&context)
            .await
            .map_err(|_| Error::ContextStateFailed(State::Terminated))?;

        if state != State::Ready {
            return Err(Error::ContextStateFailed(state));
        }

        Ok(Self {
            state: BackendState::new(),
            mainloop,
            context,
        })
    }

    fn setup_event_monitoring(
        &mut self,
        event_tx: EventSender,
        cancellation_token: CancellationToken,
    ) -> Result<
        (
            mpsc::UnboundedSender<InternalRefresh>,
            mpsc::UnboundedReceiver<InternalRefresh>,
        ),
        Error,
    > {
        let (internal_command_tx, internal_command_rx) =
            mpsc::unbounded_channel::<InternalRefresh>();

        info!("Setting up PulseAudio event subscription");
        events::start_event_processor(
            &mut self.context,
            self.state.devices.clone(),
            self.state.streams.clone(),
            event_tx,
            internal_command_tx.clone(),
            cancellation_token,
        )?;

        info!("Triggering initial device and stream discovery");
        let _ = internal_command_tx.send(InternalRefresh::Devices);
        let _ = internal_command_tx.send(InternalRefresh::Streams);
        let _ = internal_command_tx.send(InternalRefresh::ServerInfo);

        Ok((internal_command_tx, internal_command_rx))
    }

    fn spawn_command_processor(
        &self,
        mut command_rx: CommandReceiver,
        external_tx: mpsc::UnboundedSender<ExternalCommand>,
        cancellation_token: CancellationToken,
    ) -> JoinHandle<()> {
        let devices = self.state.devices.clone();
        let streams = self.state.streams.clone();

        spawn(async move {
            loop {
                tokio::select! {
                    _ = cancellation_token.cancelled() => {
                        info!("PulseBackend command handler cancelled");
                        return;
                    }
                    command = command_rx.recv() => {
                        let Some(command) = command else {
                            info!("Command channel closed");
                            return;
                        };

                        Self::handle_command(command, &devices, &streams, &external_tx);
                    }
                }
            }
        })
    }

    #[allow(clippy::too_many_lines)]
    fn handle_command(
        command: Command,
        devices: &DeviceStore,
        streams: &StreamStore,
        external_tx: &mpsc::UnboundedSender<ExternalCommand>,
    ) {
        match command {
            Command::GetDevice {
                device_key,
                responder,
            } => {
                let result = if let Ok(devices_guard) = devices.read() {
                    devices_guard
                        .values()
                        .find(|d| d.key() == device_key)
                        .cloned()
                        .ok_or(Error::DeviceNotFound {
                            index: device_key.index,
                            device_type: device_key.device_type,
                        })
                } else {
                    Err(Error::LockPoisoned)
                };
                let _ = responder.send(result);
            }
            Command::GetStream {
                stream_key,
                responder,
            } => {
                let result = if let Ok(streams_guard) = streams.read() {
                    streams_guard
                        .values()
                        .find(|s| s.key() == stream_key)
                        .cloned()
                        .ok_or(Error::StreamNotFound {
                            index: stream_key.index,
                            stream_type: stream_key.stream_type,
                        })
                } else {
                    Err(Error::LockPoisoned)
                };
                let _ = responder.send(result);
            }
            Command::SetVolume {
                device_key,
                volume,
                responder,
            } => {
                let pulse_volume = convert_volume_to_pulse(&volume);
                let _ = external_tx.send(ExternalCommand::SetDeviceVolume {
                    device_key,
                    volume: pulse_volume,
                });
                let _ = responder.send(Ok(()));
            }
            Command::SetMute {
                device_key,
                muted,
                responder,
            } => {
                let _ = external_tx.send(ExternalCommand::SetDeviceMute { device_key, muted });
                let _ = responder.send(Ok(()));
            }
            Command::SetStreamVolume {
                stream_key,
                volume,
                responder,
            } => {
                let pulse_volume = convert_volume_to_pulse(&volume);
                let _ = external_tx.send(ExternalCommand::SetStreamVolume {
                    stream_key,
                    volume: pulse_volume,
                });
                let _ = responder.send(Ok(()));
            }
            Command::SetStreamMute {
                stream_key,
                muted,
                responder,
            } => {
                let _ = external_tx.send(ExternalCommand::SetStreamMute { stream_key, muted });
                let _ = responder.send(Ok(()));
            }
            Command::SetDefaultInput {
                device_key,
                responder,
            } => {
                let _ = external_tx.send(ExternalCommand::SetDefaultInput { device_key });
                let _ = responder.send(Ok(()));
            }
            Command::SetDefaultOutput {
                device_key,
                responder,
            } => {
                let _ = external_tx.send(ExternalCommand::SetDefaultOutput { device_key });
                let _ = responder.send(Ok(()));
            }
            Command::MoveStream {
                stream_key,
                device_key,
                responder,
            } => {
                let _ = external_tx.send(ExternalCommand::MoveStream {
                    stream_key,
                    device_key,
                });
                let _ = responder.send(Ok(()));
            }
            Command::SetPort {
                device_key,
                port,
                responder,
            } => {
                let _ = external_tx.send(ExternalCommand::SetPort { device_key, port });
                let _ = responder.send(Ok(()));
            }
        }
    }

    #[allow(clippy::cognitive_complexity)]
    async fn run(
        mut self,
        command_rx: CommandReceiver,
        event_tx: EventSender,
        cancellation_token: CancellationToken,
    ) -> Result<(), Error> {
        let event_token = cancellation_token.child_token();
        let command_token = cancellation_token.child_token();

        let (_, mut internal_rx) = self.setup_event_monitoring(event_tx.clone(), event_token)?;

        let (external_tx, mut external_rx) = mpsc::unbounded_channel::<ExternalCommand>();

        info!("PulseAudio backend fully initialized and monitoring");

        let command_handle =
            self.spawn_command_processor(command_rx, external_tx, command_token.clone());

        loop {
            tokio::select! {
                biased;

                _ = cancellation_token.cancelled() => {
                    info!("PulseAudio backend cancelled");
                    break;
                }

                result = poll_fn(|cx| self.mainloop.tick(cx)) => {
                    if result.is_some() {
                        info!("PulseAudio mainloop quit requested");
                        break;
                    }
                }

                Some(cmd) = internal_rx.recv() => {
                    handle_internal_command(
                        &mut self.context,
                        cmd,
                        &self.state.devices,
                        &self.state.streams,
                        &event_tx,
                        &self.state.default_input,
                        &self.state.default_output,
                    );
                }

                Some(cmd) = external_rx.recv() => {
                    handle_external_command(
                        &mut self.context,
                        cmd,
                        &self.state.devices,
                        &self.state.streams,
                    );
                }
            }
        }

        self.context.disconnect();
        command_token.cancel();
        let _ = command_handle.await;

        info!("PulseAudio backend stopped");
        Ok(())
    }
}