car-voice 0.15.1

Voice I/O capability for CAR — mic capture, VAD, listener/speaker traits
Documentation
//! macOS system-audio capture via Core Audio process taps.
//!
//! Ported from Meetily (`frontend/src-tauri/src/audio/capture/core_audio.rs`,
//! MIT, © 2024 Zackriya Solutions). See `NOTICE.md` at the workspace
//! root for license attribution.
//!
//! Architecture:
//!   - Apple Core Audio HAL process taps (macOS 14.2+) capture the
//!     default output device's mix without going through the speakers.
//!   - The tap is wrapped in an aggregate device so we get a normal
//!     `AudioDeviceIOProc` callback delivering `f32` PCM frames.
//!   - The IO proc runs on a real-time audio thread and cannot block —
//!     samples are pushed into a lock-free SPSC ring buffer.
//!   - The consumer side exposes a `futures::Stream<Item = f32>` that
//!     wakes the polling task when new samples are available.
//!
//! Why process taps and not ScreenCaptureKit:
//!   - SCK is video-first. For audio-only capture the tap path is
//!     lower overhead and avoids ScreenCaptureKit's permission model
//!     (which is screen-recording-themed and confusing for audio apps).
//!   - Process taps respect the user's audio routing (speakers,
//!     headphones, AirPods) without us having to follow device changes.
//!   - The tap delivers the *post-mix* signal — exactly what a meeting
//!     consumer wants ("everything I'm hearing").
//!
//! Permission: on macOS 14.4+ the system displays an Audio Capture
//! prompt the first time a tap is created. If the user denies, the
//! tap returns silence — we don't fail loudly, we just stream zeros.

#![cfg(all(target_os = "macos", feature = "system-audio-macos"))]
#![allow(clippy::missing_safety_doc)]

use anyhow::Result;
use cidre::{arc, av, cat, cf, core_audio as ca, os};
use futures::Stream;
use log::{error, info, warn};
use ringbuf::{
    traits::{Consumer, Producer, Split},
    HeapCons, HeapProd, HeapRb,
};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

/// Wake state shared between the IO proc callback and the polling task.
struct WakerState {
    waker: Option<Waker>,
    has_data: bool,
}

/// Handle to a configured Core Audio process tap. Call [`stream`] to
/// actually start the device and produce samples.
///
/// [`stream`]: CoreAudioCapture::stream
pub struct CoreAudioCapture {
    tap: ca::TapGuard,
    agg_desc: arc::Retained<cf::DictionaryOf<cf::String, cf::Type>>,
}

/// Async stream of `f32` PCM samples from the system audio mix.
///
/// The format negotiated by Core Audio is reported by
/// [`sample_rate`](Self::sample_rate). Channel layout is mono (the tap
/// is created with `with_mono_global_tap_excluding_processes`).
pub struct CoreAudioStream {
    consumer: HeapCons<f32>,
    _device: ca::hardware::StartedDevice<ca::AggregateDevice>,
    _ctx: Box<AudioContext>,
    _tap: ca::TapGuard,
    waker_state: Arc<Mutex<WakerState>>,
    current_sample_rate: Arc<AtomicU32>,
}

/// Live state owned by the IO proc callback.
struct AudioContext {
    format: arc::R<av::AudioFormat>,
    producer: HeapProd<f32>,
    waker_state: Arc<Mutex<WakerState>>,
    current_sample_rate: Arc<AtomicU32>,
    consecutive_drops: Arc<AtomicU32>,
    should_terminate: Arc<AtomicBool>,
}

impl CoreAudioCapture {
    /// Create a Core Audio process tap on the default output device.
    ///
    /// Triggers the Audio Capture permission prompt on macOS 14.4+ if
    /// the user hasn't approved it for this app yet.
    pub fn new() -> Result<Self> {
        info!("CoreAudio: starting system-audio capture initialization");

        let output_device = ca::System::default_output_device().map_err(|e| {
            error!("CoreAudio: failed to get default output device: {:?}", e);
            anyhow::anyhow!("Failed to get default output device: {:?}", e)
        })?;

        let output_uid = output_device.uid().map_err(|e| {
            error!("CoreAudio: failed to get device UID: {:?}", e);
            anyhow::anyhow!("Failed to get device UID: {:?}", e)
        })?;

        let device_name = output_device
            .name()
            .unwrap_or_else(|_| cf::String::from_str("Unknown"));
        info!(
            "CoreAudio: default output device: '{}' (UID: {:?})",
            device_name, output_uid
        );

        // Mono global tap is the most reliable shape for system-audio
        // capture on macOS — stereo taps have surprising failure modes
        // when the output device's channel layout changes.
        let tap_desc =
            ca::TapDesc::with_mono_global_tap_excluding_processes(&cidre::ns::Array::new());
        let tap = tap_desc.create_process_tap().map_err(|e| {
            error!("CoreAudio: failed to create process tap: {:?}", e);
            anyhow::anyhow!("Failed to create process tap: {:?}", e)
        })?;

        let tap_uid = tap.uid().unwrap_or_else(|_| cf::Uuid::new().to_cf_string());
        match tap.asbd() {
            Ok(asbd) => info!(
                "CoreAudio: process tap created (UID: {:?}, sample_rate: {} Hz, channels: {})",
                tap_uid, asbd.sample_rate, asbd.channels_per_frame
            ),
            Err(e) => warn!("CoreAudio: tap created but couldn't read format: {:?}", e),
        }

        let sub_tap = cf::DictionaryOf::with_keys_values(
            &[ca::sub_device_keys::uid()],
            &[tap.uid().unwrap().as_type_ref()],
        );

        // Aggregate device wraps the tap so the standard IO proc
        // callback machinery delivers samples to us.
        //
        // CRITICAL: we include ONLY the tap, not the underlying output
        // device. Including both causes duplicate audio (every frame
        // gets captured twice, producing audible echo). The tap alone
        // is sufficient — it observes the post-mix output stream.
        let agg_desc = cf::DictionaryOf::with_keys_values(
            &[
                ca::aggregate_device_keys::is_private(),
                ca::aggregate_device_keys::is_stacked(),
                ca::aggregate_device_keys::tap_auto_start(),
                ca::aggregate_device_keys::name(),
                ca::aggregate_device_keys::main_sub_device(),
                ca::aggregate_device_keys::uid(),
                ca::aggregate_device_keys::tap_list(),
            ],
            &[
                cf::Boolean::value_true().as_type_ref(),
                cf::Boolean::value_false(),
                cf::Boolean::value_true(),
                cf::str!(c"car-voice-system-audio-tap").as_type_ref(),
                &output_uid,
                &cf::Uuid::new().to_cf_string(),
                &cf::ArrayOf::from_slice(&[sub_tap.as_ref()]),
            ],
        );

        Ok(Self { tap, agg_desc })
    }

    /// Start the audio device and create a stream.
    fn start_device(
        &self,
        ctx: &mut Box<AudioContext>,
    ) -> Result<ca::hardware::StartedDevice<ca::AggregateDevice>> {
        extern "C" fn audio_proc(
            device: ca::Device,
            _now: &cat::AudioTimeStamp,
            input_data: &cat::AudioBufList<1>,
            _input_time: &cat::AudioTimeStamp,
            _output_data: &mut cat::AudioBufList<1>,
            _output_time: &cat::AudioTimeStamp,
            ctx: Option<&mut AudioContext>,
        ) -> os::Status {
            let ctx = ctx.unwrap();

            // Track sample rate changes (output device switched, e.g.
            // user plugged in headphones).
            let after = device
                .nominal_sample_rate()
                .unwrap_or(ctx.format.absd().sample_rate) as u32;
            let before = ctx.current_sample_rate.load(Ordering::Acquire);
            if before != after {
                ctx.current_sample_rate.store(after, Ordering::Release);
            }

            if let Some(view) =
                av::AudioPcmBuf::with_buf_list_no_copy(&ctx.format, input_data, None)
            {
                if let Some(data) = view.data_f32_at(0) {
                    process_audio_data(ctx, data);
                }
            } else if ctx.format.common_format() == av::audio::CommonFormat::PcmF32 {
                // Fallback: AudioPcmBuf can fail on some channel layouts.
                let first_buffer = &input_data.buffers[0];
                let byte_count = first_buffer.data_bytes_size as usize;
                let float_count = byte_count / std::mem::size_of::<f32>();
                if float_count > 0 && !first_buffer.data.is_null() {
                    let data = unsafe {
                        std::slice::from_raw_parts(first_buffer.data as *const f32, float_count)
                    };
                    process_audio_data(ctx, data);
                }
            }
            os::Status::NO_ERR
        }

        let agg_device = ca::AggregateDevice::with_desc(&self.agg_desc).map_err(|e| {
            error!("CoreAudio: failed to create aggregate device: {:?}", e);
            anyhow::anyhow!("Failed to create aggregate device: {:?}", e)
        })?;

        let proc_id = agg_device
            .create_io_proc_id(audio_proc, Some(ctx))
            .map_err(|e| {
                error!("CoreAudio: failed to create IO proc: {:?}", e);
                anyhow::anyhow!("Failed to create IO proc: {:?}", e)
            })?;

        let started_device = ca::device_start(agg_device, Some(proc_id)).map_err(|e| {
            error!("CoreAudio: failed to start device: {:?}", e);
            anyhow::anyhow!("Failed to start device: {:?}", e)
        })?;

        info!(
            "CoreAudio: device started, aggregate sample_rate={} Hz",
            started_device.as_ref().nominal_sample_rate().unwrap_or(0.0)
        );
        Ok(started_device)
    }

    /// Start the device and return a stream of `f32` samples.
    pub fn stream(self) -> Result<CoreAudioStream> {
        let asbd = self
            .tap
            .asbd()
            .map_err(|e| anyhow::anyhow!("Failed to get tap ASBD: {:?}", e))?;
        let format = av::AudioFormat::with_asbd(&asbd)
            .ok_or_else(|| anyhow::anyhow!("Failed to create audio format"))?;

        // 128 KB ring buffer ≈ 32 K float samples ≈ 0.67 s at 48 kHz —
        // enough headroom that a momentarily-stalled consumer doesn't
        // immediately drop frames.
        let buffer_size = 1024 * 128;
        let rb = HeapRb::<f32>::new(buffer_size);
        let (producer, consumer) = rb.split();

        let waker_state = Arc::new(Mutex::new(WakerState {
            waker: None,
            has_data: false,
        }));
        let current_sample_rate = Arc::new(AtomicU32::new(asbd.sample_rate as u32));

        let mut ctx = Box::new(AudioContext {
            format,
            producer,
            waker_state: waker_state.clone(),
            current_sample_rate: current_sample_rate.clone(),
            consecutive_drops: Arc::new(AtomicU32::new(0)),
            should_terminate: Arc::new(AtomicBool::new(false)),
        });

        let device = self.start_device(&mut ctx)?;

        Ok(CoreAudioStream {
            consumer,
            _device: device,
            _ctx: ctx,
            _tap: self.tap,
            waker_state,
            current_sample_rate,
        })
    }
}

/// Push samples into the ring buffer and wake the consumer if it's
/// parked. Called from the real-time audio thread — no allocation, no
/// blocking, no logging in the hot path beyond the drop-counter
/// fast-path.
fn process_audio_data(ctx: &mut AudioContext, data: &[f32]) {
    let buffer_size = data.len();
    let pushed = ctx.producer.push_slice(data);

    if pushed < buffer_size {
        let consecutive = ctx.consecutive_drops.fetch_add(1, Ordering::AcqRel) + 1;
        if consecutive > 10 {
            // Consumer has been stuck for ~10 callbacks (~100 ms). Tear
            // down rather than continue dropping silently — the consumer
            // will see end-of-stream and can restart.
            ctx.should_terminate.store(true, Ordering::Release);
            return;
        }
    } else {
        ctx.consecutive_drops.store(0, Ordering::Release);
    }

    if pushed > 0 {
        let should_wake = {
            let mut waker_state = ctx.waker_state.lock().unwrap();
            if !waker_state.has_data {
                waker_state.has_data = true;
                waker_state.waker.take()
            } else {
                None
            }
        };
        if let Some(waker) = should_wake {
            waker.wake();
        }
    }
}

impl CoreAudioStream {
    /// Negotiated sample rate in Hz. May change at runtime if the user
    /// switches output device.
    pub fn sample_rate(&self) -> u32 {
        self.current_sample_rate.load(Ordering::Acquire)
    }
}

impl Stream for CoreAudioStream {
    type Item = f32;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Some(sample) = self.consumer.try_pop() {
            return Poll::Ready(Some(sample));
        }
        if self._ctx.should_terminate.load(Ordering::Acquire) {
            warn!("CoreAudioStream: terminating due to buffer pressure");
            return match self.consumer.try_pop() {
                Some(sample) => Poll::Ready(Some(sample)),
                None => Poll::Ready(None),
            };
        }
        {
            let mut state = self.waker_state.lock().unwrap();
            state.has_data = false;
            state.waker = Some(cx.waker().clone());
        }
        Poll::Pending
    }
}

impl Drop for CoreAudioStream {
    fn drop(&mut self) {
        info!("CoreAudioStream: dropped, signaling termination");
        self._ctx.should_terminate.store(true, Ordering::Release);
    }
}