Skip to main content

omni_dev/voice/
audio.rs

1//! Audio source abstraction.
2//!
3//! The [`AudioSource`] trait is the seam between hardware capture (real cpal
4//! callbacks) and the rest of the pipeline. Production code uses
5//! [`CpalAudioSource`] (step 7); tests drive the same pipeline through
6//! [`FileAudioSource`], which replays samples from a fixture WAV.
7//!
8//! See ADR-0031 for the rationale behind keeping this seam at the f32-frame
9//! level (rather than mocking cpal directly or asserting only at the CLI
10//! level).
11
12use std::path::Path;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
18use cpal::{Sample, SampleFormat, Stream, StreamConfig};
19use ringbuf::traits::{Consumer, Producer, Split};
20use ringbuf::{HeapCons, HeapRb};
21
22/// Source of raw interleaved f32 audio samples at a fixed sample rate and
23/// channel count.
24///
25/// Each call to [`AudioSource::next_chunk`] returns a freshly-allocated
26/// `Vec<f32>` of interleaved frames (i.e. for stereo, samples alternate
27/// L/R/L/R/…). `None` signals end-of-stream — the source is exhausted
28/// (file end, cpal stream stopped, …) and will not produce more samples.
29///
30/// The trait is intentionally not `Send`: on macOS, cpal's `Stream` is
31/// not `Send` (it holds a CoreAudio `AudioUnit` containing raw pointers),
32/// so requiring `Send` here would force `CpalAudioSource` into an
33/// awkward indirection. The capture pipeline runs synchronously on the
34/// owning thread — the cpal callback runs on cpal's own audio thread and
35/// communicates through a lock-free SPSC ring buffer.
36pub trait AudioSource {
37    /// Returns the next chunk of interleaved samples, or `None` when the
38    /// source is exhausted.
39    fn next_chunk(&mut self) -> Option<Vec<f32>>;
40    /// The source's sample rate in Hz.
41    fn sample_rate(&self) -> u32;
42    /// Channel count (1 = mono, 2 = stereo, …).
43    fn channels(&self) -> u16;
44}
45
46/// Test [`AudioSource`] that replays a fixture WAV in fixed-size chunks.
47///
48/// Samples are converted to f32 in `[-1.0, 1.0]` regardless of the fixture's
49/// bit depth, so a single fixture can stand in for any capture-side input
50/// rate the pipeline needs to exercise.
51pub struct FileAudioSource {
52    samples: Vec<f32>,
53    cursor: usize,
54    chunk_frames: usize,
55    sample_rate: u32,
56    channels: u16,
57}
58
59impl FileAudioSource {
60    /// Loads a WAV file and prepares it for chunked playback.
61    ///
62    /// `chunk_frames` is the number of *frames* (not samples) returned per
63    /// [`AudioSource::next_chunk`] call — i.e. for stereo at
64    /// `chunk_frames = 1024`, each chunk contains 2048 interleaved samples.
65    pub fn from_path(path: impl AsRef<Path>, chunk_frames: usize) -> Result<Self> {
66        let path = path.as_ref();
67        let mut reader = hound::WavReader::open(path)
68            .with_context(|| format!("Failed to open fixture WAV at {}", path.display()))?;
69        let spec = reader.spec();
70        let samples = read_all_samples_as_f32(&mut reader, spec)
71            .with_context(|| format!("Failed to read samples from {}", path.display()))?;
72        Ok(Self {
73            samples,
74            cursor: 0,
75            chunk_frames: chunk_frames.max(1),
76            sample_rate: spec.sample_rate,
77            channels: spec.channels,
78        })
79    }
80
81    /// Builds a fixture source directly from an in-memory sample buffer.
82    /// Useful for synthesising test signals (sine waves, silence, …) without
83    /// hitting disk.
84    pub fn from_samples(
85        samples: Vec<f32>,
86        sample_rate: u32,
87        channels: u16,
88        chunk_frames: usize,
89    ) -> Self {
90        Self {
91            samples,
92            cursor: 0,
93            chunk_frames: chunk_frames.max(1),
94            sample_rate,
95            channels,
96        }
97    }
98}
99
100impl AudioSource for FileAudioSource {
101    fn next_chunk(&mut self) -> Option<Vec<f32>> {
102        if self.cursor >= self.samples.len() {
103            return None;
104        }
105        let samples_per_chunk = self.chunk_frames * self.channels as usize;
106        let end = (self.cursor + samples_per_chunk).min(self.samples.len());
107        let chunk = self.samples[self.cursor..end].to_vec();
108        self.cursor = end;
109        Some(chunk)
110    }
111
112    fn sample_rate(&self) -> u32 {
113        self.sample_rate
114    }
115
116    fn channels(&self) -> u16 {
117        self.channels
118    }
119}
120
121fn read_all_samples_as_f32<R: std::io::Read>(
122    reader: &mut hound::WavReader<R>,
123    spec: hound::WavSpec,
124) -> Result<Vec<f32>> {
125    match spec.sample_format {
126        hound::SampleFormat::Float => reader
127            .samples::<f32>()
128            .collect::<Result<Vec<_>, _>>()
129            .context("Failed to decode f32 PCM samples"),
130        hound::SampleFormat::Int => {
131            let scale = i32_pcm_scale(spec.bits_per_sample);
132            reader
133                .samples::<i32>()
134                .map(|res| res.map(|s| s as f32 / scale))
135                .collect::<Result<Vec<_>, _>>()
136                .context("Failed to decode integer PCM samples")
137        }
138    }
139}
140
141fn i32_pcm_scale(bits_per_sample: u16) -> f32 {
142    // `hound` decodes integer PCM as sign-extended i32 regardless of the
143    // declared bit depth, so the divisor is always `2^(bits-1)`.
144    let shift = bits_per_sample.saturating_sub(1);
145    (1u64 << shift) as f32
146}
147
148/// Maximum samples per [`AudioSource::next_chunk`] call for `CpalAudioSource`.
149/// Sized to amortise the SPSC drain cost while staying well below the
150/// resampler's chunk size (so each `next_chunk` produces at most one
151/// resampler chunk's worth of work).
152const CPAL_DRAIN_CHUNK_SAMPLES: usize = 2048;
153
154/// How long [`AudioSource::next_chunk`] sleeps when the ring buffer is
155/// empty before retrying. Short enough that ~5 s of idle silence is
156/// detected within one window (100 ms) of slack.
157const CPAL_POLL_INTERVAL: Duration = Duration::from_millis(10);
158
159/// One-second ring-buffer at the worst common configuration we expect
160/// (192 kHz × 8 channels). Sized in samples (not frames) because the cpal
161/// callback delivers interleaved samples.
162const CPAL_RING_CAPACITY_SAMPLES: usize = 192_000 * 8;
163
164/// Production [`AudioSource`] backed by a `cpal` input stream.
165///
166/// Opens the default input device (or the named device matching `--device`),
167/// builds a stream at the device's default config, and feeds the f32-coerced
168/// samples through a lock-free SPSC ring buffer to the consumer side. The
169/// cpal callback runs on cpal's own audio thread and must never block;
170/// resampling/idle detection/writing all happen on the consumer side.
171pub struct CpalAudioSource {
172    consumer: HeapCons<f32>,
173    sample_rate: u32,
174    channels: u16,
175    stream_error: Arc<Mutex<Option<String>>>,
176    /// Held to keep the cpal stream alive. Dropped before the writer is
177    /// finalised so all in-flight callback samples have flushed through
178    /// the ring buffer.
179    _stream: Stream,
180}
181
182impl CpalAudioSource {
183    /// Opens the default input device (or the device matching
184    /// `device_name`, if provided) and starts a stream at its native rate
185    /// and channel count.
186    ///
187    /// `device_name` matching is exact (case-sensitive) against
188    /// `Device::name()` — cpal reports platform-native names which differ
189    /// across macOS/Linux/Windows, so users get an error listing every
190    /// detected device when no match is found.
191    pub fn new(device_name: Option<&str>) -> Result<Self> {
192        let host = cpal::default_host();
193        let device = match device_name {
194            None => host
195                .default_input_device()
196                .ok_or_else(|| anyhow!("No default input device available on this host"))?,
197            Some(name) => find_input_device(&host, name)?,
198        };
199        let resolved_name = device.description().map_or_else(
200            |_| "<unnamed device>".to_string(),
201            |desc| desc.name().to_string(),
202        );
203        let supported = device
204            .default_input_config()
205            .with_context(|| format!("Failed to query default input config for {resolved_name}"))?;
206        let sample_format = supported.sample_format();
207        let config: StreamConfig = supported.config();
208        let sample_rate = config.sample_rate;
209        let channels = config.channels;
210
211        let rb = HeapRb::<f32>::new(CPAL_RING_CAPACITY_SAMPLES);
212        let (mut producer, consumer) = rb.split();
213        let stream_error: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
214        let error_clone = stream_error.clone();
215        let err_fn = move |err: cpal::StreamError| {
216            if let Ok(mut slot) = error_clone.lock() {
217                *slot = Some(err.to_string());
218            }
219        };
220
221        let stream = match sample_format {
222            SampleFormat::F32 => device
223                .build_input_stream(
224                    &config,
225                    move |data: &[f32], _| {
226                        producer.push_slice(data);
227                    },
228                    err_fn,
229                    None,
230                )
231                .with_context(|| format!("Failed to build f32 input stream on {resolved_name}"))?,
232            SampleFormat::I16 => device
233                .build_input_stream(
234                    &config,
235                    move |data: &[i16], _| {
236                        for sample in data {
237                            let _ = producer.try_push(sample.to_float_sample());
238                        }
239                    },
240                    err_fn,
241                    None,
242                )
243                .with_context(|| format!("Failed to build i16 input stream on {resolved_name}"))?,
244            SampleFormat::U16 => device
245                .build_input_stream(
246                    &config,
247                    move |data: &[u16], _| {
248                        for sample in data {
249                            let _ = producer.try_push(sample.to_float_sample());
250                        }
251                    },
252                    err_fn,
253                    None,
254                )
255                .with_context(|| format!("Failed to build u16 input stream on {resolved_name}"))?,
256            other => anyhow::bail!(
257                "Unsupported cpal sample format {other:?} on {resolved_name} \
258                 (only F32, I16, U16 are wired up — file an issue if you need others)"
259            ),
260        };
261        stream
262            .play()
263            .with_context(|| format!("Failed to start input stream on {resolved_name}"))?;
264
265        Ok(Self {
266            consumer,
267            sample_rate,
268            channels,
269            stream_error,
270            _stream: stream,
271        })
272    }
273
274    fn take_stream_error(&self) -> Option<String> {
275        self.stream_error.lock().ok().and_then(|mut s| s.take())
276    }
277}
278
279impl AudioSource for CpalAudioSource {
280    fn next_chunk(&mut self) -> Option<Vec<f32>> {
281        if let Some(err) = self.take_stream_error() {
282            tracing::warn!("cpal stream error: {err}");
283            return None;
284        }
285        // Poll until samples arrive — cpal callbacks deliver in bursts at
286        // the device's buffer cadence. Returning empty Vecs every poll
287        // would burn CPU on the consumer side without producing useful
288        // work.
289        let mut buf = vec![0.0_f32; CPAL_DRAIN_CHUNK_SAMPLES];
290        loop {
291            let popped = self.consumer.pop_slice(&mut buf);
292            if popped > 0 {
293                buf.truncate(popped);
294                return Some(buf);
295            }
296            if let Some(err) = self.take_stream_error() {
297                tracing::warn!("cpal stream error: {err}");
298                return None;
299            }
300            std::thread::sleep(CPAL_POLL_INTERVAL);
301        }
302    }
303
304    fn sample_rate(&self) -> u32 {
305        self.sample_rate
306    }
307
308    fn channels(&self) -> u16 {
309        self.channels
310    }
311}
312
313fn find_input_device(host: &cpal::Host, name: &str) -> Result<<cpal::Host as HostTrait>::Device> {
314    let devices = host
315        .input_devices()
316        .context("Failed to enumerate input devices")?;
317    let mut available: Vec<String> = Vec::new();
318    for device in devices {
319        let device_name = device.description().map_or_else(
320            |_| "<unnamed device>".to_string(),
321            |desc| desc.name().to_string(),
322        );
323        if device_name == name {
324            return Ok(device);
325        }
326        available.push(device_name);
327    }
328    Err(anyhow!(
329        "Input device {name:?} not found. Available: {available:?}"
330    ))
331}
332
333#[cfg(test)]
334#[allow(clippy::unwrap_used, clippy::expect_used)]
335mod tests {
336    use super::*;
337
338    use anyhow::Result;
339    use tempfile::TempDir;
340
341    fn write_fixture_wav(
342        dir: &TempDir,
343        name: &str,
344        sample_rate: u32,
345        channels: u16,
346        bits: u16,
347        samples_i16: &[i16],
348    ) -> Result<std::path::PathBuf> {
349        let path = dir.path().join(name);
350        let spec = hound::WavSpec {
351            channels,
352            sample_rate,
353            bits_per_sample: bits,
354            sample_format: hound::SampleFormat::Int,
355        };
356        let mut writer = hound::WavWriter::create(&path, spec)?;
357        for s in samples_i16 {
358            writer.write_sample(*s)?;
359        }
360        writer.finalize()?;
361        Ok(path)
362    }
363
364    #[test]
365    fn file_source_returns_samples_in_chunks() -> Result<()> {
366        let tmp = TempDir::new()?;
367        // 12 mono i16 samples; 5 frames per chunk → 5, 5, 2.
368        let path = write_fixture_wav(
369            &tmp,
370            "mono.wav",
371            16_000,
372            1,
373            16,
374            &[
375                100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200,
376            ],
377        )?;
378        let mut src = FileAudioSource::from_path(&path, 5)?;
379        assert_eq!(src.sample_rate(), 16_000);
380        assert_eq!(src.channels(), 1);
381        let c1 = src.next_chunk().expect("first chunk");
382        let c2 = src.next_chunk().expect("second chunk");
383        let c3 = src.next_chunk().expect("third chunk");
384        assert_eq!(c1.len(), 5);
385        assert_eq!(c2.len(), 5);
386        assert_eq!(c3.len(), 2);
387        assert!(src.next_chunk().is_none());
388        Ok(())
389    }
390
391    #[test]
392    fn file_source_chunk_size_is_frames_not_samples_for_stereo() -> Result<()> {
393        let tmp = TempDir::new()?;
394        // 4 frames * 2 channels = 8 interleaved samples; chunk_frames = 2.
395        let path = write_fixture_wav(&tmp, "stereo.wav", 48_000, 2, 16, &[1, 2, 3, 4, 5, 6, 7, 8])?;
396        let mut src = FileAudioSource::from_path(&path, 2)?;
397        assert_eq!(src.channels(), 2);
398        let c1 = src.next_chunk().expect("chunk");
399        assert_eq!(c1.len(), 4, "2 frames * 2 channels = 4 samples");
400        let c2 = src.next_chunk().expect("chunk");
401        assert_eq!(c2.len(), 4);
402        assert!(src.next_chunk().is_none());
403        Ok(())
404    }
405
406    #[test]
407    fn file_source_decodes_i16_to_unit_range() -> Result<()> {
408        let tmp = TempDir::new()?;
409        let path = write_fixture_wav(&tmp, "edges.wav", 8000, 1, 16, &[i16::MAX, 0, i16::MIN])?;
410        let mut src = FileAudioSource::from_path(&path, 16)?;
411        let chunk = src.next_chunk().expect("chunk");
412        // i16::MAX (32767) / 32768.0 ≈ 0.99997
413        assert!((chunk[0] - 0.999_969_5).abs() < 1e-4);
414        assert!((chunk[1] - 0.0).abs() < 1e-6);
415        // i16::MIN (-32768) / 32768.0 = -1.0
416        assert!((chunk[2] + 1.0).abs() < 1e-6);
417        Ok(())
418    }
419
420    #[test]
421    fn from_samples_round_trips_without_disk() {
422        let samples = vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6];
423        let mut src = FileAudioSource::from_samples(samples.clone(), 16_000, 1, 4);
424        let c1 = src.next_chunk().expect("first chunk");
425        let c2 = src.next_chunk().expect("second chunk");
426        assert_eq!(c1, samples[..4]);
427        assert_eq!(c2, samples[4..]);
428        assert!(src.next_chunk().is_none());
429    }
430
431    #[test]
432    fn from_samples_yields_none_when_exhausted() {
433        let mut src = FileAudioSource::from_samples(vec![0.0; 0], 16_000, 1, 32);
434        assert!(src.next_chunk().is_none());
435    }
436
437    #[test]
438    fn zero_chunk_size_is_treated_as_one_frame() {
439        let mut src = FileAudioSource::from_samples(vec![0.1, 0.2, 0.3], 16_000, 1, 0);
440        // chunk_frames clamped to 1 — one sample per chunk.
441        let c1 = src.next_chunk().expect("c1");
442        assert_eq!(c1, vec![0.1]);
443        assert_eq!(src.next_chunk(), Some(vec![0.2]));
444        assert_eq!(src.next_chunk(), Some(vec![0.3]));
445        assert!(src.next_chunk().is_none());
446    }
447
448    #[test]
449    #[ignore = "requires a working audio input device (local hardware only)"]
450    fn cpal_default_input_produces_samples() -> Result<()> {
451        let mut src = CpalAudioSource::new(None)?;
452        assert!(src.sample_rate() > 0);
453        assert!(src.channels() > 0);
454        let chunk = src
455            .next_chunk()
456            .expect("default input should produce at least one chunk");
457        assert!(!chunk.is_empty(), "default input chunk should not be empty");
458        Ok(())
459    }
460
461    #[test]
462    fn file_source_decodes_f32_fixtures() -> Result<()> {
463        // Exercise the SampleFormat::Float branch in read_all_samples_as_f32.
464        // Most capture-side cpal configs are f32, so a fixture in that format
465        // is a realistic stand-in.
466        let tmp = TempDir::new()?;
467        let path = tmp.path().join("float.wav");
468        let spec = hound::WavSpec {
469            channels: 1,
470            sample_rate: 16_000,
471            bits_per_sample: 32,
472            sample_format: hound::SampleFormat::Float,
473        };
474        let mut writer = hound::WavWriter::create(&path, spec)?;
475        for s in [0.0_f32, 0.25, -0.25, 0.5, -0.5] {
476            writer.write_sample(s)?;
477        }
478        writer.finalize()?;
479
480        let mut src = FileAudioSource::from_path(&path, 16)?;
481        let chunk = src.next_chunk().expect("chunk");
482        assert_eq!(chunk.len(), 5);
483        assert!((chunk[0] - 0.0).abs() < 1e-6);
484        assert!((chunk[1] - 0.25).abs() < 1e-6);
485        assert!((chunk[2] + 0.25).abs() < 1e-6);
486        assert!((chunk[3] - 0.5).abs() < 1e-6);
487        assert!((chunk[4] + 0.5).abs() < 1e-6);
488        Ok(())
489    }
490
491    #[test]
492    fn file_source_open_missing_path_errors() {
493        let Err(err) = FileAudioSource::from_path("/this/path/does/not/exist.wav", 16) else {
494            panic!("expected open of missing file to error");
495        };
496        assert!(
497            err.to_string().contains("Failed to open fixture WAV"),
498            "got: {err}"
499        );
500    }
501
502    #[test]
503    fn i32_pcm_scale_matches_bit_depth() {
504        // 16-bit: divisor is 2^15 = 32768
505        assert!((i32_pcm_scale(16) - 32768.0).abs() < f32::EPSILON);
506        // 24-bit: divisor is 2^23 = 8_388_608
507        assert!((i32_pcm_scale(24) - 8_388_608.0).abs() < f32::EPSILON);
508        // 32-bit: divisor is 2^31
509        assert!((i32_pcm_scale(32) - (1u64 << 31) as f32).abs() < f32::EPSILON);
510        // 0-bit nonsense input clamps to shift = 0, divisor = 1 (no panic)
511        assert!((i32_pcm_scale(0) - 1.0).abs() < f32::EPSILON);
512    }
513
514    #[test]
515    fn cpal_unknown_device_lists_alternatives() {
516        let result = CpalAudioSource::new(Some(
517            "this-device-name-definitely-does-not-exist-on-anyone-system",
518        ));
519        let Err(err) = result else {
520            panic!("expected unknown device to error");
521        };
522        let msg = err.to_string();
523        assert!(
524            msg.contains("not found"),
525            "error message should say 'not found': {msg}"
526        );
527        assert!(
528            msg.contains("Available"),
529            "error message should list available devices: {msg}"
530        );
531    }
532}