Skip to main content

wavecraft_dev_server/audio/
server.rs

1//! Audio server for full-duplex audio I/O in dev mode.
2//!
3//! This module provides an audio server that captures microphone input,
4//! processes it through a `DevAudioProcessor` (typically an `FfiProcessor`
5//! loaded from the user's cdylib), and sends the processed audio to the
6//! output device (speakers/headphones). Meter data is communicated back
7//! via a callback channel.
8//!
9//! # Architecture
10//!
11//! ```text
12//! OS Mic → cpal input callback → deinterleave → FfiProcessor::process()
13//!                                                        │
14//!                                              ┌─────────┴──────────┐
15//!                                              │                    │
16//!                                         meter compute      interleave
17//!                                              │               → SPSC ring
18//!                                              ▼                    │
19//!                                        WebSocket broadcast        │
20//!                                                                   ▼
21//!                                              cpal output callback → Speakers
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::{Context, Result};
27use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
28use cpal::{Device, Stream, StreamConfig};
29use wavecraft_protocol::MeterUpdateNotification;
30
31use super::atomic_params::AtomicParameterBridge;
32use super::ffi_processor::DevAudioProcessor;
33
34/// Configuration for audio server.
35#[derive(Debug, Clone)]
36pub struct AudioConfig {
37    /// Desired sample rate (e.g., 44100.0). Falls back to system default.
38    pub sample_rate: f32,
39    /// Buffer size in samples.
40    pub buffer_size: u32,
41}
42
43/// Handle returned by `AudioServer::start()` that keeps both audio
44/// streams alive. Drop this handle to stop audio capture and playback.
45pub struct AudioHandle {
46    _input_stream: Stream,
47    _output_stream: Option<Stream>,
48}
49
50/// Audio server that processes OS input through a `DevAudioProcessor`
51/// and routes the processed audio to the output device.
52pub struct AudioServer {
53    processor: Box<dyn DevAudioProcessor>,
54    config: AudioConfig,
55    input_device: Device,
56    output_device: Option<Device>,
57    input_config: StreamConfig,
58    output_config: Option<StreamConfig>,
59    param_bridge: Arc<AtomicParameterBridge>,
60}
61
62impl AudioServer {
63    /// Create a new audio server with the given processor, config, and
64    /// parameter bridge for lock-free audio-thread parameter reads.
65    pub fn new(
66        processor: Box<dyn DevAudioProcessor>,
67        config: AudioConfig,
68        param_bridge: Arc<AtomicParameterBridge>,
69    ) -> Result<Self> {
70        let host = cpal::default_host();
71
72        // Input device (required)
73        let input_device = host
74            .default_input_device()
75            .context("No input device available")?;
76        tracing::info!("Using input device: {}", input_device.name()?);
77
78        let supported_input = input_device
79            .default_input_config()
80            .context("Failed to get default input config")?;
81        let input_sample_rate = supported_input.sample_rate().0;
82        tracing::info!("Input sample rate: {} Hz", input_sample_rate);
83        let input_config: StreamConfig = supported_input.into();
84
85        // Output device (optional — graceful fallback to metering-only)
86        let (output_device, output_config) = match host.default_output_device() {
87            Some(dev) => {
88                match dev.name() {
89                    Ok(name) => tracing::info!("Using output device: {}", name),
90                    Err(_) => tracing::info!("Using output device: (unnamed)"),
91                }
92                match dev.default_output_config() {
93                    Ok(supported_output) => {
94                        let output_sr = supported_output.sample_rate().0;
95                        tracing::info!("Output sample rate: {} Hz", output_sr);
96                        if output_sr != input_sample_rate {
97                            tracing::warn!(
98                                "Input/output sample rate mismatch ({} vs {}). \
99                                 Processing at input rate; output device may resample.",
100                                input_sample_rate,
101                                output_sr
102                            );
103                        }
104                        let cfg: StreamConfig = supported_output.into();
105                        (Some(dev), Some(cfg))
106                    }
107                    Err(e) => {
108                        tracing::warn!(
109                            "Failed to get output config: {}. Metering-only mode.",
110                            e
111                        );
112                        (None, None)
113                    }
114                }
115            }
116            None => {
117                tracing::warn!("No output device available. Metering-only mode.");
118                (None, None)
119            }
120        };
121
122        Ok(Self {
123            processor,
124            config,
125            input_device,
126            output_device,
127            input_config,
128            output_config,
129            param_bridge,
130        })
131    }
132
133    /// Start audio capture, processing, and playback.
134    ///
135    /// Returns an `AudioHandle` that keeps both streams alive, plus a
136    /// `MeterConsumer` for draining meter frames from a lock-free ring
137    /// buffer (RT-safe: no allocations on the audio thread).
138    ///
139    /// Drop the handle to stop audio.
140    pub fn start(mut self) -> Result<(AudioHandle, rtrb::Consumer<MeterUpdateNotification>)> {
141        // Set sample rate from the actual input device config
142        let actual_sample_rate = self.input_config.sample_rate.0 as f32;
143        self.processor.set_sample_rate(actual_sample_rate);
144
145        let mut processor = self.processor;
146        let buffer_size = self.config.buffer_size as usize;
147        let num_channels = self.input_config.channels as usize;
148        let _param_bridge = Arc::clone(&self.param_bridge);
149
150        // --- SPSC ring buffer for input→output audio transfer ---
151        // Capacity: buffer_size * num_channels * 4 blocks of headroom.
152        // Data format: interleaved f32 samples (matches cpal output).
153        let ring_capacity = buffer_size * num_channels.max(2) * 4;
154        let (mut ring_producer, mut ring_consumer) = rtrb::RingBuffer::new(ring_capacity);
155
156        // --- SPSC ring buffer for meter data (audio → consumer task) ---
157        // Capacity: 64 frames — sufficient for ~1s at 60 Hz update rate.
158        // Uses rtrb (lock-free, zero-allocation) instead of tokio channels
159        // to maintain real-time safety on the audio thread.
160        let (mut meter_producer, meter_consumer) =
161            rtrb::RingBuffer::<MeterUpdateNotification>::new(64);
162
163        let mut frame_counter = 0u64;
164
165        // Pre-allocate deinterleaved buffers BEFORE the audio callback.
166        // These are moved into the closure and reused on every invocation,
167        // avoiding heap allocations on the audio thread.
168        let mut left_buf = vec![0.0f32; buffer_size];
169        let mut right_buf = vec![0.0f32; buffer_size];
170
171        // Pre-allocate interleave buffer for writing to the ring buffer.
172        let mut interleave_buf = vec![0.0f32; buffer_size * 2];
173
174        let input_stream = self
175            .input_device
176            .build_input_stream(
177                &self.input_config,
178                move |data: &[f32], _: &cpal::InputCallbackInfo| {
179                    frame_counter += 1;
180
181                    let num_samples = data.len() / num_channels.max(1);
182                    if num_samples == 0 || num_channels == 0 {
183                        return;
184                    }
185
186                    let actual_samples = num_samples.min(left_buf.len());
187                    let left = &mut left_buf[..actual_samples];
188                    let right = &mut right_buf[..actual_samples];
189
190                    // Zero-fill and deinterleave
191                    left.fill(0.0);
192                    right.fill(0.0);
193
194                    for i in 0..actual_samples {
195                        left[i] = data[i * num_channels];
196                        if num_channels > 1 {
197                            right[i] = data[i * num_channels + 1];
198                        } else {
199                            right[i] = left[i];
200                        }
201                    }
202
203                    // Read parameter values at block boundary (RT-safe atomic reads).
204                    // Currently the bridge is kept alive in the closure for future
205                    // vtable v2 parameter injection. The infrastructure is in place.
206                    let _ = &_param_bridge;
207
208                    // Process through the user's DSP (stack-local channel array)
209                    {
210                        let mut channels: [&mut [f32]; 2] = [left, right];
211                        processor.process(&mut channels);
212                    }
213
214                    // Re-borrow after process()
215                    let left = &left_buf[..actual_samples];
216                    let right = &right_buf[..actual_samples];
217
218                    // Compute meters from processed output
219                    let peak_left = left.iter().copied().fold(0.0f32, |a, b| a.max(b.abs()));
220                    let rms_left =
221                        (left.iter().map(|x| x * x).sum::<f32>() / left.len() as f32).sqrt();
222                    let peak_right = right.iter().copied().fold(0.0f32, |a, b| a.max(b.abs()));
223                    let rms_right =
224                        (right.iter().map(|x| x * x).sum::<f32>() / right.len() as f32).sqrt();
225
226                    // Send meter update approximately every other callback.
227                    // At 44100 Hz / 512 samples per buffer ≈ 86 callbacks/sec,
228                    // firing every 2nd callback gives ~43 Hz visual updates.
229                    // The WebSocket/UI side already rate-limits display.
230                    if frame_counter.is_multiple_of(2) {
231                        let notification = MeterUpdateNotification {
232                            timestamp_us: frame_counter,
233                            left_peak: peak_left,
234                            left_rms: rms_left,
235                            right_peak: peak_right,
236                            right_rms: rms_right,
237                        };
238                        // Push to lock-free ring buffer — RT-safe, no allocation.
239                        // If the consumer is slow, older frames are silently
240                        // dropped (acceptable for metering data).
241                        let _ = meter_producer.push(notification);
242                    }
243
244                    // Interleave processed audio and write to ring buffer.
245                    // If the ring buffer is full, samples are silently dropped
246                    // (acceptable — temporary glitch, RT-safe).
247                    let interleave = &mut interleave_buf[..actual_samples * 2];
248                    for i in 0..actual_samples {
249                        interleave[i * 2] = left[i];
250                        interleave[i * 2 + 1] = right[i];
251                    }
252
253                    // Write to SPSC ring buffer — non-blocking, lock-free.
254                    // Push sample by sample; if full, remaining samples are dropped.
255                    for &sample in interleave.iter() {
256                        if ring_producer.push(sample).is_err() {
257                            break;
258                        }
259                    }
260                },
261                |err| {
262                    tracing::error!("Audio input stream error: {}", err);
263                },
264                None,
265            )
266            .context("Failed to build input stream")?;
267
268        input_stream
269            .play()
270            .context("Failed to start input stream")?;
271        tracing::info!("Input stream started");
272
273        // --- Output stream (optional) ---
274        let output_stream = if let (Some(output_device), Some(output_config)) =
275            (self.output_device, self.output_config)
276        {
277            let stream = output_device
278                .build_output_stream(
279                    &output_config,
280                    move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
281                        // Read from SPSC ring buffer — non-blocking, lock-free.
282                        // If underflow, fill with silence (zeros).
283                        for sample in data.iter_mut() {
284                            *sample = ring_consumer.pop().unwrap_or(0.0);
285                        }
286                    },
287                    |err| {
288                        tracing::error!("Audio output stream error: {}", err);
289                    },
290                    None,
291                )
292                .context("Failed to build output stream")?;
293
294            stream.play().context("Failed to start output stream")?;
295            tracing::info!("Output stream started");
296            Some(stream)
297        } else {
298            tracing::info!("No output device — metering-only mode");
299            None
300        };
301
302        let mode = if output_stream.is_some() {
303            "full-duplex (input + output)"
304        } else {
305            "input-only (metering)"
306        };
307        tracing::info!("Audio server started in {} mode", mode);
308
309        Ok((
310            AudioHandle {
311                _input_stream: input_stream,
312                _output_stream: output_stream,
313            },
314            meter_consumer,
315        ))
316    }
317
318    /// Returns true if an output device is available for audio playback.
319    pub fn has_output(&self) -> bool {
320        self.output_device.is_some()
321    }
322}