wavecraft_dev_server/audio/server.rs
1//! Audio server for full-duplex audio I/O in dev mode.
2//!
3//! This module provides an audio server that captures microphone input,
4//! processes it through a `DevAudioProcessor` (typically an `FfiProcessor`
5//! loaded from the user's cdylib), and sends the processed audio to the
6//! output device (speakers/headphones). Meter data is communicated back
7//! via a callback channel.
8//!
9//! # Architecture
10//!
11//! ```text
12//! OS Mic → cpal input callback → deinterleave → FfiProcessor::process()
13//! │
14//! ┌─────────┴──────────┐
15//! │ │
16//! meter compute interleave
17//! │ → SPSC ring
18//! ▼ │
19//! WebSocket broadcast │
20//! ▼
21//! cpal output callback → Speakers
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::{Context, Result};
27use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
28use cpal::{Device, Stream, StreamConfig};
29use wavecraft_protocol::MeterUpdateNotification;
30
31use super::atomic_params::AtomicParameterBridge;
32use super::ffi_processor::DevAudioProcessor;
33
34/// Configuration for audio server.
35#[derive(Debug, Clone)]
36pub struct AudioConfig {
37 /// Desired sample rate (e.g., 44100.0). Falls back to system default.
38 pub sample_rate: f32,
39 /// Buffer size in samples.
40 pub buffer_size: u32,
41}
42
43/// Handle returned by `AudioServer::start()` that keeps both audio
44/// streams alive. Drop this handle to stop audio capture and playback.
45pub struct AudioHandle {
46 _input_stream: Stream,
47 _output_stream: Option<Stream>,
48}
49
50/// Audio server that processes OS input through a `DevAudioProcessor`
51/// and routes the processed audio to the output device.
52pub struct AudioServer {
53 processor: Box<dyn DevAudioProcessor>,
54 config: AudioConfig,
55 input_device: Device,
56 output_device: Option<Device>,
57 input_config: StreamConfig,
58 output_config: Option<StreamConfig>,
59 param_bridge: Arc<AtomicParameterBridge>,
60}
61
62impl AudioServer {
63 /// Create a new audio server with the given processor, config, and
64 /// parameter bridge for lock-free audio-thread parameter reads.
65 pub fn new(
66 processor: Box<dyn DevAudioProcessor>,
67 config: AudioConfig,
68 param_bridge: Arc<AtomicParameterBridge>,
69 ) -> Result<Self> {
70 let host = cpal::default_host();
71
72 // Input device (required)
73 let input_device = host
74 .default_input_device()
75 .context("No input device available")?;
76 tracing::info!("Using input device: {}", input_device.name()?);
77
78 let supported_input = input_device
79 .default_input_config()
80 .context("Failed to get default input config")?;
81 let input_sample_rate = supported_input.sample_rate().0;
82 tracing::info!("Input sample rate: {} Hz", input_sample_rate);
83 let input_config: StreamConfig = supported_input.into();
84
85 // Output device (optional — graceful fallback to metering-only)
86 let (output_device, output_config) = match host.default_output_device() {
87 Some(dev) => {
88 match dev.name() {
89 Ok(name) => tracing::info!("Using output device: {}", name),
90 Err(_) => tracing::info!("Using output device: (unnamed)"),
91 }
92 match dev.default_output_config() {
93 Ok(supported_output) => {
94 let output_sr = supported_output.sample_rate().0;
95 tracing::info!("Output sample rate: {} Hz", output_sr);
96 if output_sr != input_sample_rate {
97 tracing::warn!(
98 "Input/output sample rate mismatch ({} vs {}). \
99 Processing at input rate; output device may resample.",
100 input_sample_rate,
101 output_sr
102 );
103 }
104 let cfg: StreamConfig = supported_output.into();
105 (Some(dev), Some(cfg))
106 }
107 Err(e) => {
108 tracing::warn!(
109 "Failed to get output config: {}. Metering-only mode.",
110 e
111 );
112 (None, None)
113 }
114 }
115 }
116 None => {
117 tracing::warn!("No output device available. Metering-only mode.");
118 (None, None)
119 }
120 };
121
122 Ok(Self {
123 processor,
124 config,
125 input_device,
126 output_device,
127 input_config,
128 output_config,
129 param_bridge,
130 })
131 }
132
133 /// Start audio capture, processing, and playback.
134 ///
135 /// Returns an `AudioHandle` that keeps both streams alive, plus a
136 /// `MeterConsumer` for draining meter frames from a lock-free ring
137 /// buffer (RT-safe: no allocations on the audio thread).
138 ///
139 /// Drop the handle to stop audio.
140 pub fn start(mut self) -> Result<(AudioHandle, rtrb::Consumer<MeterUpdateNotification>)> {
141 // Set sample rate from the actual input device config
142 let actual_sample_rate = self.input_config.sample_rate.0 as f32;
143 self.processor.set_sample_rate(actual_sample_rate);
144
145 let mut processor = self.processor;
146 let buffer_size = self.config.buffer_size as usize;
147 let num_channels = self.input_config.channels as usize;
148 let _param_bridge = Arc::clone(&self.param_bridge);
149
150 // --- SPSC ring buffer for input→output audio transfer ---
151 // Capacity: buffer_size * num_channels * 4 blocks of headroom.
152 // Data format: interleaved f32 samples (matches cpal output).
153 let ring_capacity = buffer_size * num_channels.max(2) * 4;
154 let (mut ring_producer, mut ring_consumer) = rtrb::RingBuffer::new(ring_capacity);
155
156 // --- SPSC ring buffer for meter data (audio → consumer task) ---
157 // Capacity: 64 frames — sufficient for ~1s at 60 Hz update rate.
158 // Uses rtrb (lock-free, zero-allocation) instead of tokio channels
159 // to maintain real-time safety on the audio thread.
160 let (mut meter_producer, meter_consumer) =
161 rtrb::RingBuffer::<MeterUpdateNotification>::new(64);
162
163 let mut frame_counter = 0u64;
164
165 // Pre-allocate deinterleaved buffers BEFORE the audio callback.
166 // These are moved into the closure and reused on every invocation,
167 // avoiding heap allocations on the audio thread.
168 let mut left_buf = vec![0.0f32; buffer_size];
169 let mut right_buf = vec![0.0f32; buffer_size];
170
171 // Pre-allocate interleave buffer for writing to the ring buffer.
172 let mut interleave_buf = vec![0.0f32; buffer_size * 2];
173
174 let input_stream = self
175 .input_device
176 .build_input_stream(
177 &self.input_config,
178 move |data: &[f32], _: &cpal::InputCallbackInfo| {
179 frame_counter += 1;
180
181 let num_samples = data.len() / num_channels.max(1);
182 if num_samples == 0 || num_channels == 0 {
183 return;
184 }
185
186 let actual_samples = num_samples.min(left_buf.len());
187 let left = &mut left_buf[..actual_samples];
188 let right = &mut right_buf[..actual_samples];
189
190 // Zero-fill and deinterleave
191 left.fill(0.0);
192 right.fill(0.0);
193
194 for i in 0..actual_samples {
195 left[i] = data[i * num_channels];
196 if num_channels > 1 {
197 right[i] = data[i * num_channels + 1];
198 } else {
199 right[i] = left[i];
200 }
201 }
202
203 // Read parameter values at block boundary (RT-safe atomic reads).
204 // Currently the bridge is kept alive in the closure for future
205 // vtable v2 parameter injection. The infrastructure is in place.
206 let _ = &_param_bridge;
207
208 // Process through the user's DSP (stack-local channel array)
209 {
210 let mut channels: [&mut [f32]; 2] = [left, right];
211 processor.process(&mut channels);
212 }
213
214 // Re-borrow after process()
215 let left = &left_buf[..actual_samples];
216 let right = &right_buf[..actual_samples];
217
218 // Compute meters from processed output
219 let peak_left = left.iter().copied().fold(0.0f32, |a, b| a.max(b.abs()));
220 let rms_left =
221 (left.iter().map(|x| x * x).sum::<f32>() / left.len() as f32).sqrt();
222 let peak_right = right.iter().copied().fold(0.0f32, |a, b| a.max(b.abs()));
223 let rms_right =
224 (right.iter().map(|x| x * x).sum::<f32>() / right.len() as f32).sqrt();
225
226 // Send meter update approximately every other callback.
227 // At 44100 Hz / 512 samples per buffer ≈ 86 callbacks/sec,
228 // firing every 2nd callback gives ~43 Hz visual updates.
229 // The WebSocket/UI side already rate-limits display.
230 if frame_counter.is_multiple_of(2) {
231 let notification = MeterUpdateNotification {
232 timestamp_us: frame_counter,
233 left_peak: peak_left,
234 left_rms: rms_left,
235 right_peak: peak_right,
236 right_rms: rms_right,
237 };
238 // Push to lock-free ring buffer — RT-safe, no allocation.
239 // If the consumer is slow, older frames are silently
240 // dropped (acceptable for metering data).
241 let _ = meter_producer.push(notification);
242 }
243
244 // Interleave processed audio and write to ring buffer.
245 // If the ring buffer is full, samples are silently dropped
246 // (acceptable — temporary glitch, RT-safe).
247 let interleave = &mut interleave_buf[..actual_samples * 2];
248 for i in 0..actual_samples {
249 interleave[i * 2] = left[i];
250 interleave[i * 2 + 1] = right[i];
251 }
252
253 // Write to SPSC ring buffer — non-blocking, lock-free.
254 // Push sample by sample; if full, remaining samples are dropped.
255 for &sample in interleave.iter() {
256 if ring_producer.push(sample).is_err() {
257 break;
258 }
259 }
260 },
261 |err| {
262 tracing::error!("Audio input stream error: {}", err);
263 },
264 None,
265 )
266 .context("Failed to build input stream")?;
267
268 input_stream
269 .play()
270 .context("Failed to start input stream")?;
271 tracing::info!("Input stream started");
272
273 // --- Output stream (optional) ---
274 let output_stream = if let (Some(output_device), Some(output_config)) =
275 (self.output_device, self.output_config)
276 {
277 let stream = output_device
278 .build_output_stream(
279 &output_config,
280 move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
281 // Read from SPSC ring buffer — non-blocking, lock-free.
282 // If underflow, fill with silence (zeros).
283 for sample in data.iter_mut() {
284 *sample = ring_consumer.pop().unwrap_or(0.0);
285 }
286 },
287 |err| {
288 tracing::error!("Audio output stream error: {}", err);
289 },
290 None,
291 )
292 .context("Failed to build output stream")?;
293
294 stream.play().context("Failed to start output stream")?;
295 tracing::info!("Output stream started");
296 Some(stream)
297 } else {
298 tracing::info!("No output device — metering-only mode");
299 None
300 };
301
302 let mode = if output_stream.is_some() {
303 "full-duplex (input + output)"
304 } else {
305 "input-only (metering)"
306 };
307 tracing::info!("Audio server started in {} mode", mode);
308
309 Ok((
310 AudioHandle {
311 _input_stream: input_stream,
312 _output_stream: output_stream,
313 },
314 meter_consumer,
315 ))
316 }
317
318 /// Returns true if an output device is available for audio playback.
319 pub fn has_output(&self) -> bool {
320 self.output_device.is_some()
321 }
322}