car-voice 0.19.0

Voice I/O capability for CAR — mic capture, VAD, listener/speaker traits
Documentation
//! Text-to-speech provider trait and audio playback helpers.
//!
//! Channels obtain a `&dyn Speaker` (or a concrete impl like
//! [`crate::ElevenLabsSpeaker`]) and call [`Speaker::speak`] for
//! fire-and-forget narration or [`Speaker::synth`] when they want the audio
//! bytes for processing / saving / inspection.

use crate::{Result, VoiceError};
use async_trait::async_trait;
use tokio::sync::mpsc;

/// Audio format produced by a [`Speaker`] implementation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AudioFormat {
    /// Compressed MP3 (decodable with rodio's `mp3` feature).
    Mp3,
    /// Uncompressed WAV.
    Wav,
}

/// A finalized synthesis result — raw audio bytes plus a format tag so the
/// player knows how to decode them.
#[derive(Debug)]
pub struct SynthesizedAudio {
    pub bytes: Vec<u8>,
    pub format: AudioFormat,
}

/// One chunk emitted by [`Speaker::synth_stream`]. Streaming consumers
/// receive a sequence of `TtsChunk` ending with `is_final = true`.
///
/// Sequence numbers are provider-assigned and start at 0 within a single
/// `synth_stream` call. The final chunk may carry a non-empty `bytes`
/// payload — the consumer should process the bytes *and* honor the flag.
#[derive(Debug, Clone)]
pub struct TtsChunk {
    /// Monotonic chunk index within a single synthesis call.
    pub seq: u64,
    /// Raw audio bytes for this chunk. Format is the same for every
    /// chunk in a stream and matches what `synth_stream` declares
    /// implicitly via the first chunk.
    pub bytes: Vec<u8>,
    /// Audio container/codec for the chunk bytes.
    pub format: AudioFormat,
    /// True only for the last chunk in the stream. After receiving a
    /// final chunk the consumer must not expect further sends.
    pub is_final: bool,
}

/// Bounded queue capacity for [`Speaker::synth_stream`]. Sized so a slow
/// consumer doesn't backpressure the HTTP read loop hard enough to stall
/// the provider's connection — 64 chunks at ~4 KB each ≈ 250 KB peak.
pub const TTS_STREAM_QUEUE_CAPACITY: usize = 64;

/// A text-to-speech provider.
///
/// Implementations are stateless from the caller's perspective — each call
/// to [`Speaker::synth`] is independent. Concrete impls hold the API key,
/// voice ID, and any provider-specific tuning in the constructor.
#[async_trait]
pub trait Speaker: Send + Sync {
    /// Synthesize the text into audio bytes. The returned format depends on
    /// the impl (ElevenLabs streams MP3; a future Piper impl would emit
    /// WAV/PCM).
    async fn synth(&self, text: &str) -> Result<SynthesizedAudio>;

    /// Streaming variant. Returns a receiver that yields chunks as the
    /// provider produces them, ending with a chunk where `is_final = true`.
    ///
    /// Default impl buffers via [`Speaker::synth`] and emits a single final
    /// chunk — correct but defeats the latency win. Providers that can
    /// stream natively (ElevenLabs `/stream`, OpenAI TTS streaming, etc.)
    /// should override this to forward chunks as they arrive.
    ///
    /// # Cancellation
    /// Dropping the returned receiver signals cancellation; implementations
    /// must abort the in-flight request and release resources. This is the
    /// barge-in path: the controller drops the receiver when the user
    /// starts speaking mid-utterance.
    async fn synth_stream(&self, text: &str) -> Result<mpsc::Receiver<TtsChunk>> {
        let audio = self.synth(text).await?;
        let (tx, rx) = mpsc::channel(1);
        // Best-effort send — if the consumer already dropped the rx, this
        // is a no-op which is exactly what we want for "cancel before
        // first chunk arrived."
        let _ = tx
            .send(TtsChunk {
                seq: 0,
                bytes: audio.bytes,
                format: audio.format,
                is_final: true,
            })
            .await;
        Ok(rx)
    }

    /// Synthesize and play through the OS default output device.
    ///
    /// Default impl: call [`synth`] then play via [`play_audio`].
    async fn speak(&self, text: &str) -> Result<()> {
        let audio = self.synth(text).await?;
        play_audio(audio).await
    }
}

/// Decode and play [`SynthesizedAudio`] through the OS default output
/// device. Blocks the current task until playback finishes.
///
/// Uses `rodio` (synchronous), which is run on a dedicated blocking thread
/// so the async runtime is not stalled.
///
/// # Cancellation
///
/// If the returned future is dropped mid-playback, the underlying
/// blocking thread keeps running until rodio's `sleep_until_end` returns —
/// `spawn_blocking` cannot abort an OS thread. For a "stop speaking now"
/// affordance the caller needs a `Sink` reference and to call `.stop()`
/// on it; that's a future hardening item alongside chunk-streamed playback.
/// Encode mono Float32 PCM samples in `[-1.0, 1.0]` as a 16-bit PCM
/// WAV blob. Shared between providers that capture PCM and need to
/// hand back a [`SynthesizedAudio`].
pub(crate) fn encode_pcm_f32_to_wav_pcm16(
    samples: &[f32],
    sample_rate: u32,
) -> std::io::Result<Vec<u8>> {
    let spec = hound::WavSpec {
        channels: 1,
        sample_rate,
        bits_per_sample: 16,
        sample_format: hound::SampleFormat::Int,
    };
    let mut buf = std::io::Cursor::new(Vec::<u8>::new());
    {
        let mut writer = hound::WavWriter::new(&mut buf, spec).map_err(io_err)?;
        for &s in samples {
            let clamped = s.clamp(-1.0, 1.0);
            let pcm16 = (clamped * i16::MAX as f32) as i16;
            writer.write_sample(pcm16).map_err(io_err)?;
        }
        writer.finalize().map_err(io_err)?;
    }
    Ok(buf.into_inner())
}

fn io_err(e: hound::Error) -> std::io::Error {
    std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
}

pub async fn play_audio(audio: SynthesizedAudio) -> Result<()> {
    tokio::task::spawn_blocking(move || -> Result<()> {
        let (_stream, handle) = rodio::OutputStream::try_default()
            .map_err(|e| VoiceError::Playback(format!("output stream: {e}")))?;
        let sink = rodio::Sink::try_new(&handle)
            .map_err(|e| VoiceError::Playback(format!("sink: {e}")))?;
        let cursor = std::io::Cursor::new(audio.bytes);
        let decoder = match audio.format {
            AudioFormat::Mp3 | AudioFormat::Wav => rodio::Decoder::new(cursor)
                .map_err(|e| VoiceError::Playback(format!("decode: {e}")))?,
        };
        sink.append(decoder);
        sink.sleep_until_end();
        Ok(())
    })
    .await
    .map_err(|e| VoiceError::Playback(format!("blocking task join: {e}")))?
}