rill-io 0.4.0

Audio I/O backends for Rill - CPAL, ALSA, PipeWire, JACK
Documentation
//! CPAL бэкенд (кросс-платформенный)

use crossbeam_channel::{unbounded, Receiver, Sender};
use parking_lot::RwLock;
use std::fmt;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};

use crate::buffer::IoRingBuffer;

use crate::backend::{AudioBackend, BackendType};
use crate::config::AudioConfig;
use crate::error::{IoError, IoResult};

// Команды для потока
#[derive(Debug)]
enum Command {
    Init {
        input_device: Option<String>,
        output_device: Option<String>,
    },
    Start,
    Stop,
}

// Сообщения о состоянии
#[derive(Debug, PartialEq)]
enum Status {
    Initialized,
    Started,
    Stopped,
    Error(String),
}

/// CPAL бэкенд
pub struct CpalBackend {
    config: AudioConfig,
    host: Arc<cpal::Host>,
    command_tx: Sender<Command>,
    status_rx: Receiver<Status>,
    xruns: Arc<RwLock<u32>>,
    input_buffer: Arc<RwLock<IoRingBuffer>>,
    output_buffer: Arc<RwLock<IoRingBuffer>>,
    thread_handle: Option<thread::JoinHandle<()>>,
}

impl fmt::Debug for CpalBackend {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("CpalBackend")
            .field("config", &self.config)
            .field("xruns", &self.xruns)
            .field("thread_handle", &self.thread_handle.is_some())
            .finish()
    }
}

impl CpalBackend {
    /// Создать новый CPAL бэкенд
    pub fn new(config: AudioConfig) -> IoResult<Self> {
        let host = Arc::new(cpal::default_host());
        let buffer_size = (config.buffer_size * config.output_channels * 4) as usize;

        let (command_tx, command_rx) = unbounded();
        let (status_tx, status_rx) = unbounded();

        let xruns = Arc::new(RwLock::new(0));
        let input_buffer = Arc::new(RwLock::new(IoRingBuffer::new(buffer_size)));
        let output_buffer = Arc::new(RwLock::new(IoRingBuffer::new(buffer_size)));

        let thread_host = host.clone();
        let thread_input = input_buffer.clone();
        let thread_output = output_buffer.clone();
        let thread_xruns = xruns.clone();
        let thread_config = config.clone();

        // Запускаем поток для работы с CPAL
        let handle = thread::spawn(move || {
            run_cpal_thread(
                command_rx,
                status_tx,
                thread_host,
                thread_config,
                thread_input,
                thread_output,
                thread_xruns,
            );
        });

        Ok(Self {
            config,
            host,
            command_tx,
            status_rx,
            xruns,
            input_buffer,
            output_buffer,
            thread_handle: Some(handle),
        })
    }

    fn wait_for_status(&self, expected: Status) -> IoResult<()> {
        while let Ok(status) = self.status_rx.recv_timeout(Duration::from_millis(1000)) {
            match status {
                Status::Error(e) => return Err(IoError::Backend(e)),
                s if s == expected => return Ok(()),
                _ => continue,
            }
        }
        Err(IoError::Timeout)
    }
}

// Функция, выполняющаяся в отдельном потоке
fn run_cpal_thread(
    command_rx: Receiver<Command>,
    status_tx: Sender<Status>,
    host: Arc<cpal::Host>,
    config: AudioConfig,
    input_buffer: Arc<RwLock<IoRingBuffer>>,
    output_buffer: Arc<RwLock<IoRingBuffer>>,
    xruns: Arc<RwLock<u32>>,
) {
    let mut _input_device: Option<cpal::Device> = None;
    let mut output_device: Option<cpal::Device> = None;
    let mut stream: Option<cpal::Stream> = None;

    while let Ok(cmd) = command_rx.recv() {
        match cmd {
            Command::Init {
                input_device: in_name,
                output_device: out_name,
            } => {
                _input_device = find_device(&host, in_name.as_deref(), true).ok().flatten();
                output_device = find_device(&host, out_name.as_deref(), false)
                    .ok()
                    .flatten();

                // Очищаем буферы
                let cap = input_buffer.read().capacity();
                let zeros = vec![0.0f32; cap];
                input_buffer.write().write(&zeros);
                output_buffer.write().write(&zeros);

                let _ = status_tx.send(Status::Initialized);
            }

            Command::Start => {
                if let Some(dev) = &output_device {
                    let out_buf = output_buffer.clone();
                    let xruns_clone = xruns.clone();

                    let stream_config = cpal::StreamConfig {
                        channels: config.output_channels as u16,
                        sample_rate: cpal::SampleRate(config.sample_rate),
                        buffer_size: cpal::BufferSize::Fixed(config.buffer_size),
                    };

                    match dev.build_output_stream(
                        &stream_config,
                        move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
                            let mut out_buf_lock = out_buf.write();
                            let mut temp = vec![0.0f32; data.len()];
                            out_buf_lock.read(&mut temp);
                            data.copy_from_slice(&temp[..data.len()]);
                        },
                        move |err| {
                            eprintln!("Stream error: {}", err);
                            *xruns_clone.write() += 1;
                        },
                        None,
                    ) {
                        Ok(s) => {
                            if s.play().is_ok() {
                                stream = Some(s);
                                let _ = status_tx.send(Status::Started);
                            }
                        }
                        Err(e) => {
                            let _ = status_tx.send(Status::Error(e.to_string()));
                        }
                    }
                }
            }

            Command::Stop => {
                if let Some(s) = stream.take() {
                    let _ = s.pause();
                }
                let _ = status_tx.send(Status::Stopped);
            }
        }
    }
}

fn find_device(
    host: &cpal::Host,
    name: Option<&str>,
    is_input: bool,
) -> IoResult<Option<cpal::Device>> {
    let devices = if is_input {
        host.input_devices()
    } else {
        host.output_devices()
    }
    .map_err(|e| IoError::DeviceNotFound(e.to_string()))?;

    if let Some(name) = name {
        for device in devices {
            if let Ok(dev_name) = device.name() {
                if dev_name.contains(name) {
                    return Ok(Some(device));
                }
            }
        }
        Ok(None)
    } else {
        if is_input {
            Ok(host.default_input_device())
        } else {
            Ok(host.default_output_device())
        }
    }
}

impl AudioBackend for CpalBackend {
    fn backend_type(&self) -> BackendType {
        BackendType::Cpal
    }

    fn config(&self) -> &AudioConfig {
        &self.config
    }

    fn config_mut(&mut self) -> &mut AudioConfig {
        &mut self.config
    }

    fn init(&mut self) -> IoResult<()> {
        self.command_tx
            .send(Command::Init {
                input_device: self.config.input_device.clone(),
                output_device: self.config.output_device.clone(),
            })
            .map_err(|e| IoError::Backend(e.to_string()))?;

        self.wait_for_status(Status::Initialized)
    }

    fn start(&mut self) -> IoResult<()> {
        self.command_tx
            .send(Command::Start)
            .map_err(|e| IoError::Backend(e.to_string()))?;

        self.wait_for_status(Status::Started)
    }

    fn stop(&mut self) -> IoResult<()> {
        self.command_tx
            .send(Command::Stop)
            .map_err(|e| IoError::Backend(e.to_string()))?;

        self.wait_for_status(Status::Stopped)
    }

    fn read(&mut self, buffer: &mut [f32]) -> IoResult<usize> {
        let mut input_buf = self.input_buffer.write();
        input_buf.read(buffer);
        Ok(buffer.len())
    }

    fn write(&mut self, buffer: &[f32]) -> IoResult<usize> {
        let mut output_buf = self.output_buffer.write();
        output_buf.write(buffer);
        Ok(buffer.len())
    }

    fn xruns(&self) -> u32 {
        *self.xruns.read()
    }

    fn latency(&self) -> Duration {
        Duration::from_micros(
            (1_000_000.0 * self.config.buffer_size as f64 / self.config.sample_rate as f64) as u64,
        )
    }

    fn list_input_devices(&self) -> Vec<String> {
        self.host
            .input_devices()
            .map(|devices| devices.filter_map(|d| d.name().ok()).collect())
            .unwrap_or_default()
    }

    fn list_output_devices(&self) -> Vec<String> {
        self.host
            .output_devices()
            .map(|devices| devices.filter_map(|d| d.name().ok()).collect())
            .unwrap_or_default()
    }
}

impl Drop for CpalBackend {
    fn drop(&mut self) {
        if let Some(handle) = self.thread_handle.take() {
            let _ = self.command_tx.send(Command::Stop);
            let _ = handle.join();
        }
    }
}