1use crate::builder::{NativePipelineBuilder, PipelineBuildRequest};
2use crate::contracts::{CaptureBackend, DeviceEnumerator, ProcessorBuildRequest, ProcessorFactory};
3use crate::core::{
4 ActiveListening, ActiveRecording, AudioError, AudioResult, CaptureDiagnostics, InputDeviceInfo,
5 NativeCaptureConfig,
6};
7use crate::factory::ProfileProcessorFactory;
8use crate::metrics::LevelMetrics;
9use crate::mic_sim::MicrophoneSimulatorFactory;
10use crate::pipeline::{AudioPipeline, PlaybackBuffer, PlaybackSink};
11use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
12use cpal::{SampleFormat, StreamConfig};
13use serde_json::Value;
14use std::process::Command;
15use std::sync::mpsc;
16use std::sync::{Arc, Mutex};
17use std::thread::{self, JoinHandle};
18use std::time::Instant;
19
20pub struct NativeCaptureBackend;
21
22impl DeviceEnumerator for NativeCaptureBackend {
23 fn list_input_devices(&self) -> AudioResult<Vec<InputDeviceInfo>> {
24 list_input_devices()
25 }
26
27 fn set_default_input_device(&self, device_id: &str) -> AudioResult<()> {
28 set_default_input_device(device_id)
29 }
30}
31
32pub fn list_input_devices() -> AudioResult<Vec<InputDeviceInfo>> {
33 #[cfg(target_os = "linux")]
34 if let Ok(devices) = list_pipewire_input_devices() {
35 if !devices.is_empty() {
36 return Ok(devices);
37 }
38 }
39
40 list_cpal_input_devices()
41}
42
43pub fn set_default_input_device(device_id: &str) -> AudioResult<()> {
44 #[cfg(target_os = "linux")]
45 {
46 let output = Command::new("pactl")
47 .args(["set-default-source", device_id])
48 .output()
49 .map_err(|err| {
50 AudioError::new(format!(
51 "Failed to invoke pactl set-default-source for '{}': {}",
52 device_id, err
53 ))
54 })?;
55
56 if !output.status.success() {
57 let stderr = String::from_utf8_lossy(&output.stderr);
58 return Err(AudioError::new(format!(
59 "pactl set-default-source '{}' failed: {}",
60 device_id,
61 stderr.trim()
62 )));
63 }
64
65 return Ok(());
66 }
67
68 #[allow(unreachable_code)]
69 Err(AudioError::new(
70 "Setting the default input device is only supported on Linux",
71 ))
72}
73
74fn list_cpal_input_devices() -> AudioResult<Vec<InputDeviceInfo>> {
75 let host = cpal::default_host();
76 let default_name = host
77 .default_input_device()
78 .and_then(|device| device.description().ok())
79 .map(|d| d.to_string());
80 let devices = host
81 .input_devices()
82 .map_err(|err| AudioError::new(format!("Failed to enumerate input devices: {}", err)))?;
83
84 let mut result = Vec::new();
85 for device in devices {
86 let name = device
87 .description()
88 .map_err(|err| AudioError::new(format!("Failed to read input device name: {}", err)))?
89 .to_string();
90 let is_default = default_name.as_deref() == Some(name.as_str());
91 result.push(InputDeviceInfo {
92 id: name.clone(),
93 name,
94 is_default,
95 });
96 }
97
98 result.sort_by(|a, b| a.name.cmp(&b.name));
99 Ok(result)
100}
101
102#[cfg(target_os = "linux")]
103fn list_pipewire_input_devices() -> AudioResult<Vec<InputDeviceInfo>> {
104 let default_source = pactl_default_source().ok().flatten();
105 let output = Command::new("pactl")
106 .args(["-f", "json", "list", "sources"])
107 .output()
108 .map_err(|err| AudioError::new(format!("Failed to invoke pactl list sources: {}", err)))?;
109
110 if !output.status.success() {
111 let stderr = String::from_utf8_lossy(&output.stderr);
112 return Err(AudioError::new(format!(
113 "pactl list sources failed: {}",
114 stderr.trim()
115 )));
116 }
117
118 let value: Value = serde_json::from_slice(&output.stdout)
119 .map_err(|err| AudioError::new(format!("Failed to parse pactl source list: {}", err)))?;
120 let sources = value
121 .as_array()
122 .ok_or_else(|| AudioError::new("Unexpected pactl source list payload"))?;
123
124 let mut result = Vec::new();
125
126 for source in sources {
127 if !is_selectable_pipewire_source(source) {
128 continue;
129 }
130
131 let Some(id) = source.get("name").and_then(Value::as_str) else {
132 continue;
133 };
134
135 let display_name = friendly_pipewire_source_name(source).unwrap_or_else(|| id.to_string());
136 let is_default = default_source.as_deref() == Some(id);
137
138 result.push(InputDeviceInfo {
139 id: id.to_string(),
140 name: display_name,
141 is_default,
142 });
143 }
144
145 result.sort_by(|a, b| a.name.cmp(&b.name));
146 Ok(result)
147}
148
149#[cfg(target_os = "linux")]
150fn pactl_default_source() -> AudioResult<Option<String>> {
151 let output = Command::new("pactl")
152 .arg("info")
153 .output()
154 .map_err(|err| AudioError::new(format!("Failed to invoke pactl info: {}", err)))?;
155
156 if !output.status.success() {
157 let stderr = String::from_utf8_lossy(&output.stderr);
158 return Err(AudioError::new(format!(
159 "pactl info failed: {}",
160 stderr.trim()
161 )));
162 }
163
164 let stdout = String::from_utf8_lossy(&output.stdout);
165 Ok(stdout
166 .lines()
167 .find_map(|line| line.strip_prefix("Default Source: "))
168 .map(|value| value.trim().to_string()))
169}
170
171#[cfg(target_os = "linux")]
172fn is_selectable_pipewire_source(source: &Value) -> bool {
173 let Some(id) = source.get("name").and_then(Value::as_str) else {
174 return false;
175 };
176 if id.ends_with(".monitor") {
177 return false;
178 }
179
180 if source
181 .get("monitor_source")
182 .and_then(Value::as_str)
183 .is_some_and(|value| !value.is_empty())
184 {
185 return false;
186 }
187
188 let properties = source.get("properties").and_then(Value::as_object);
189 if properties
190 .and_then(|props| props.get("media.class"))
191 .and_then(Value::as_str)
192 .is_some_and(|class| class != "Audio/Source")
193 {
194 return false;
195 }
196
197 let description = source
198 .get("description")
199 .and_then(Value::as_str)
200 .unwrap_or_default();
201 !description.starts_with("Monitor of ")
202}
203
204#[cfg(target_os = "linux")]
205fn friendly_pipewire_source_name(source: &Value) -> Option<String> {
206 let properties = source.get("properties").and_then(Value::as_object);
207 let device_description = properties
208 .and_then(|props| props.get("device.description"))
209 .and_then(Value::as_str)
210 .or_else(|| source.get("description").and_then(Value::as_str))?;
211
212 let active_port_name = source.get("active_port").and_then(Value::as_str);
213 let active_port_description = source
214 .get("ports")
215 .and_then(Value::as_array)
216 .and_then(|ports| {
217 ports.iter().find_map(|port| {
218 let name = port.get("name").and_then(Value::as_str)?;
219 if Some(name) == active_port_name {
220 port.get("description").and_then(Value::as_str)
221 } else {
222 None
223 }
224 })
225 });
226
227 if let Some(port_description) = active_port_description {
228 return Some(format!("{} - {}", port_description, device_description));
229 }
230
231 Some(device_description.to_string())
232}
233
234impl NativeCaptureBackend {
235 pub fn new() -> Self {
236 Self
237 }
238
239 fn build_pipeline(
240 config: &NativeCaptureConfig,
241 sample_rate: u32,
242 device_channels: u16,
243 device_name: Option<String>,
244 ) -> AudioResult<AudioPipeline> {
245 NativePipelineBuilder::new().build(PipelineBuildRequest {
246 output_path: config.output_path.clone(),
247 profile: config.profile,
248 sample_rate,
249 device_channels,
250 device_name,
251 gain_db: config.input_gain_db,
252 limiter_threshold: config.limiter_threshold,
253 high_pass_hz: config.high_pass_hz,
254 noise_suppression_amount: config.noise_suppression_amount,
255 noise_calibration_ms: config.noise_calibration_ms,
256 delay_effect: config.delay_effect,
257 stage_overrides: config.stage_overrides.clone(),
258 microphone_sim: config.microphone_sim,
259 })
260 }
261
262 fn build_live_pipeline(
263 config: &NativeCaptureConfig,
264 sample_rate: u32,
265 device_channels: u16,
266 device_name: Option<String>,
267 playback_buffer: Arc<Mutex<PlaybackBuffer>>,
268 ) -> AudioResult<AudioPipeline> {
269 let output_spec = crate::core::AudioSpec {
270 sample_rate,
271 channels: 1,
272 };
273 let processors =
274 ProfileProcessorFactory::new().build_processors(ProcessorBuildRequest {
275 profile: config.profile,
276 gain_db: config.input_gain_db,
277 limiter_threshold: config.limiter_threshold,
278 high_pass_hz: config.high_pass_hz,
279 noise_suppression_amount: config.noise_suppression_amount,
280 noise_calibration_ms: config.noise_calibration_ms,
281 delay_effect: config.delay_effect,
282 stage_overrides: config.stage_overrides.clone(),
283 })?;
284 let processor_names = processors
285 .iter()
286 .map(|processor| processor.name().to_string())
287 .collect::<Vec<_>>();
288 let microphone_sim_processors =
289 MicrophoneSimulatorFactory::new().build_processors(config.microphone_sim)?;
290 let microphone_sim_processor_names = microphone_sim_processors
291 .iter()
292 .map(|processor| processor.name().to_string())
293 .collect::<Vec<_>>();
294 let microphone_sim_model = config
295 .microphone_sim
296 .active_model()
297 .map(|model| model.as_str().to_string());
298
299 let mut notes = config
300 .delay_effect
301 .map(|effect| vec![format!("delay_effect={}", effect.preset.as_str())])
302 .unwrap_or_default();
303 notes.push("mode=live_listening".to_string());
304 notes.push("output=default".to_string());
305 notes.push("pipeline_order=microphone_sim->voice_processing->playback".to_string());
306 if let Some(model) = µphone_sim_model {
307 notes.push(format!("microphone_sim={}", model));
308 }
309
310 AudioPipeline::new(
311 output_spec,
312 CaptureDiagnostics {
313 backend: "native_listening".to_string(),
314 profile: config.profile.as_str().to_string(),
315 profile_base: Some(config.profile.as_str().to_string()),
316 device_name,
317 sample_rate: Some(sample_rate),
318 channels: Some(device_channels),
319 processor_names,
320 processor_stage_overrides: config
321 .stage_overrides
322 .iter()
323 .map(|(stage, mode)| format!("{}={}", stage, mode.as_str()))
324 .collect(),
325 resolved_delay_preset: config
326 .delay_effect
327 .map(|effect| effect.preset.as_str().to_string()),
328 microphone_sim_model,
329 microphone_sim_processor_names,
330 notes,
331 ..CaptureDiagnostics::default()
332 },
333 microphone_sim_processors,
334 processors,
335 Box::new(PlaybackSink::from_buffer(playback_buffer)),
336 Box::new(LevelMetrics::default()),
337 )
338 }
339
340 pub fn start_live(&self, config: NativeCaptureConfig) -> AudioResult<Box<dyn ActiveListening>> {
341 let (ready_tx, ready_rx) = mpsc::channel();
342 let (cmd_tx, cmd_rx) = mpsc::channel();
343
344 let handle = thread::spawn(move || {
345 native_listening_thread(config, cmd_rx, ready_tx);
346 });
347
348 match ready_rx
349 .recv()
350 .map_err(|_| AudioError::new("Native listening thread failed to report readiness"))?
351 {
352 Ok(()) => Ok(Box::new(NativeListeningHandle {
353 cmd_tx,
354 join_handle: Some(handle),
355 })),
356 Err(err) => {
357 let _ = handle.join();
358 Err(err)
359 }
360 }
361 }
362}
363
364impl CaptureBackend for NativeCaptureBackend {
365 fn start(&self, config: NativeCaptureConfig) -> AudioResult<Box<dyn ActiveRecording>> {
366 let (ready_tx, ready_rx) = mpsc::channel();
367 let (cmd_tx, cmd_rx) = mpsc::channel();
368
369 let handle = thread::spawn(move || {
370 native_capture_thread(config, cmd_rx, ready_tx);
371 });
372
373 match ready_rx
374 .recv()
375 .map_err(|_| AudioError::new("Native capture thread failed to report readiness"))?
376 {
377 Ok(()) => Ok(Box::new(NativeRecordingHandle {
378 cmd_tx,
379 join_handle: Some(handle),
380 })),
381 Err(err) => {
382 let _ = handle.join();
383 Err(err)
384 }
385 }
386 }
387}
388
389enum NativeCommand {
390 Stop(mpsc::Sender<AudioResult<CaptureDiagnostics>>),
391}
392
393enum NativeListeningCommand {
394 Stop(mpsc::Sender<AudioResult<CaptureDiagnostics>>),
395 UpdateConfig(NativeCaptureConfig, mpsc::Sender<AudioResult<()>>),
396}
397
398fn native_capture_thread(
399 config: NativeCaptureConfig,
400 cmd_rx: mpsc::Receiver<NativeCommand>,
401 ready_tx: mpsc::Sender<AudioResult<()>>,
402) {
403 let result = start_native_stream(config);
404 let (stream, pipeline, start_time, notes) = match result {
405 Ok(values) => {
406 let _ = ready_tx.send(Ok(()));
407 values
408 }
409 Err(err) => {
410 let _ = ready_tx.send(Err(err));
411 return;
412 }
413 };
414
415 let _keep_stream_alive = stream;
416
417 match cmd_rx.recv() {
418 Ok(NativeCommand::Stop(reply_tx)) => {
419 drop(_keep_stream_alive);
420 let duration_ms = Some(start_time.elapsed().as_millis() as i64);
421 let diagnostics = pipeline
422 .lock()
423 .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))
424 .and_then(|mut pipeline| pipeline.finalize(duration_ms))
425 .map(|mut diagnostics| {
426 diagnostics.notes.extend(notes);
427 diagnostics
428 });
429 let _ = reply_tx.send(diagnostics);
430 }
431 Err(_) => {}
432 }
433}
434
435fn native_listening_thread(
436 config: NativeCaptureConfig,
437 cmd_rx: mpsc::Receiver<NativeListeningCommand>,
438 ready_tx: mpsc::Sender<AudioResult<()>>,
439) {
440 let result = start_live_monitor_stream(config);
441 let (input_stream, output_stream, pipeline, playback_buffer, start_time, notes) = match result {
442 Ok(values) => {
443 let _ = ready_tx.send(Ok(()));
444 values
445 }
446 Err(err) => {
447 let _ = ready_tx.send(Err(err));
448 return;
449 }
450 };
451
452 let _keep_input_stream_alive = input_stream;
453 let _keep_output_stream_alive = output_stream;
454
455 loop {
456 match cmd_rx.recv() {
457 Ok(NativeListeningCommand::Stop(reply_tx)) => {
458 drop(_keep_input_stream_alive);
459 drop(_keep_output_stream_alive);
460 let duration_ms = Some(start_time.elapsed().as_millis() as i64);
461 let diagnostics = pipeline
462 .lock()
463 .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))
464 .and_then(|mut pipeline| pipeline.finalize(duration_ms))
465 .map(|mut diagnostics| {
466 diagnostics.notes.extend(notes.clone());
467 diagnostics
468 });
469 let _ = reply_tx.send(diagnostics);
470 break;
471 }
472 Ok(NativeListeningCommand::UpdateConfig(config, reply_tx)) => {
473 let result = rebuild_live_pipeline(&pipeline, &playback_buffer, config);
474 let _ = reply_tx.send(result);
475 }
476 Err(_) => break,
477 }
478 }
479}
480
481type NativeThreadState = (
482 cpal::Stream,
483 Arc<Mutex<AudioPipeline>>,
484 Instant,
485 Vec<String>,
486);
487
488type NativeListeningThreadState = (
489 cpal::Stream,
490 cpal::Stream,
491 Arc<Mutex<AudioPipeline>>,
492 Arc<Mutex<PlaybackBuffer>>,
493 Instant,
494 Vec<String>,
495);
496
497fn start_native_stream(config: NativeCaptureConfig) -> AudioResult<NativeThreadState> {
498 let host = cpal::default_host();
499 let device = select_input_device(&host, config.preferred_input_device.as_deref())?;
500 let device_name = device.description().ok().map(|d| d.to_string());
501 let supported_config = device
502 .default_input_config()
503 .map_err(|err| AudioError::new(format!("Failed to read input device config: {}", err)))?;
504
505 let (sample_rate, channels) =
506 select_stream_config(&device, config.target_sample_rate, config.target_channels).unwrap_or(
507 (
508 supported_config.sample_rate(),
509 supported_config.channels(),
510 ),
511 );
512
513 let pipeline = Arc::new(Mutex::new(NativeCaptureBackend::build_pipeline(
514 &config,
515 sample_rate,
516 channels,
517 device_name.clone(),
518 )?));
519
520 let cpal_config = StreamConfig {
521 channels,
522 sample_rate: sample_rate,
523 buffer_size: cpal::BufferSize::Default,
524 };
525 let stream = build_input_stream(
526 &device,
527 supported_config.sample_format(),
528 &cpal_config,
529 channels,
530 Arc::clone(&pipeline),
531 native_input_stream_error,
532 )?;
533
534 stream
535 .play()
536 .map_err(|err| AudioError::new(format!("Failed to start input stream: {}", err)))?;
537
538 Ok((
539 stream,
540 pipeline,
541 Instant::now(),
542 vec![
543 format!("device={}", device_name.unwrap_or_else(|| "unknown".into())),
544 format!(
545 "configured_device={}",
546 config
547 .preferred_input_device
548 .unwrap_or_else(|| "system_default".to_string())
549 ),
550 format!("channels={}", channels),
551 format!("sample_rate={}", sample_rate),
552 format!("noise_calibration_ms={}", config.noise_calibration_ms),
553 ],
554 ))
555}
556
557fn native_input_stream_error(err: cpal::StreamError) {
558 log::error!("Native audio stream error: {}", err);
559}
560
561fn native_listening_input_stream_error(err: cpal::StreamError) {
562 log::error!("Native listening input stream error: {}", err);
563}
564
565fn native_listening_output_stream_error(err: cpal::StreamError) {
566 log::error!("Native listening output stream error: {}", err);
567}
568
569fn start_live_monitor_stream(
570 config: NativeCaptureConfig,
571) -> AudioResult<NativeListeningThreadState> {
572 let host = cpal::default_host();
573 let input_device = select_input_device(&host, config.preferred_input_device.as_deref())?;
574 let input_device_name = input_device.description().ok().map(|d| d.to_string());
575 let input_config = input_device
576 .default_input_config()
577 .map_err(|err| AudioError::new(format!("Failed to read input device config: {}", err)))?;
578 let output_device = host
579 .default_output_device()
580 .ok_or_else(|| AudioError::new("No output device available"))?;
581 let output_device_name = output_device.description().ok().map(|d| d.to_string());
582 let output_config = output_device
583 .default_output_config()
584 .map_err(|err| AudioError::new(format!("Failed to read output device config: {}", err)))?;
585
586 let output_sample_rate = output_config.sample_rate();
587 let output_channels = output_config.channels().max(1);
588 let input_channels = select_stream_config(
589 &input_device,
590 output_sample_rate,
591 config.target_channels.max(1),
592 )
593 .map(|(_, channels)| channels)
594 .ok_or_else(|| {
595 AudioError::new(format!(
596 "Input device does not support {} Hz for live listening",
597 output_sample_rate
598 ))
599 })?;
600
601 let cpal_input_config = StreamConfig {
602 channels: input_channels,
603 sample_rate: output_sample_rate,
604 buffer_size: cpal::BufferSize::Default,
605 };
606 let cpal_output_config = StreamConfig {
607 channels: output_channels,
608 sample_rate: output_sample_rate,
609 buffer_size: cpal::BufferSize::Default,
610 };
611
612 let max_buffer_samples = (output_sample_rate as usize * 2).max(4096);
613 let (_, playback_buffer) = PlaybackSink::with_capacity(max_buffer_samples);
614 let pipeline = Arc::new(Mutex::new(NativeCaptureBackend::build_live_pipeline(
615 &config,
616 output_sample_rate,
617 input_channels,
618 input_device_name.clone(),
619 Arc::clone(&playback_buffer),
620 )?));
621
622 let input_stream = build_input_stream(
623 &input_device,
624 input_config.sample_format(),
625 &cpal_input_config,
626 input_channels,
627 Arc::clone(&pipeline),
628 native_listening_input_stream_error,
629 )?;
630 let output_stream = build_output_stream(
631 &output_device,
632 output_config.sample_format(),
633 &cpal_output_config,
634 Arc::clone(&playback_buffer),
635 native_listening_output_stream_error,
636 )?;
637
638 input_stream
639 .play()
640 .map_err(|err| AudioError::new(format!("Failed to start input stream: {}", err)))?;
641 output_stream
642 .play()
643 .map_err(|err| AudioError::new(format!("Failed to start output stream: {}", err)))?;
644
645 Ok((
646 input_stream,
647 output_stream,
648 pipeline,
649 playback_buffer,
650 Instant::now(),
651 vec![
652 format!(
653 "input_device={}",
654 input_device_name.unwrap_or_else(|| "unknown".into())
655 ),
656 format!(
657 "output_device={}",
658 output_device_name.unwrap_or_else(|| "unknown".into())
659 ),
660 format!("input_channels={}", input_channels),
661 format!("output_channels={}", output_channels),
662 format!("sample_rate={}", output_sample_rate),
663 ],
664 ))
665}
666
667fn rebuild_live_pipeline(
668 pipeline: &Arc<Mutex<AudioPipeline>>,
669 playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
670 config: NativeCaptureConfig,
671) -> AudioResult<()> {
672 if let Ok(mut buffer) = playback_buffer.lock() {
673 buffer.clear();
674 }
675
676 let (pipeline_sample_rate, pipeline_channels, device_name) = {
677 let current = pipeline
678 .lock()
679 .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))?;
680 (
681 current.sample_rate(),
682 current.input_channels(),
683 current.device_name().map(ToOwned::to_owned),
684 )
685 };
686
687 let rebuilt = NativeCaptureBackend::build_live_pipeline(
688 &config,
689 pipeline_sample_rate,
690 pipeline_channels,
691 device_name,
692 Arc::clone(playback_buffer),
693 )?;
694
695 let mut current = pipeline
696 .lock()
697 .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))?;
698 *current = rebuilt;
699 Ok(())
700}
701
702fn build_input_stream(
703 device: &cpal::Device,
704 sample_format: SampleFormat,
705 config: &StreamConfig,
706 input_channels: u16,
707 pipeline: Arc<Mutex<AudioPipeline>>,
708 err_fn: fn(cpal::StreamError),
709) -> AudioResult<cpal::Stream> {
710 let stream = match sample_format {
711 SampleFormat::F32 => {
712 let pipeline_for_stream = Arc::clone(&pipeline);
713 device.build_input_stream(
714 config,
715 move |data: &[f32], _| {
716 process_stream_buffer(&pipeline_for_stream, data, input_channels)
717 },
718 err_fn,
719 None,
720 )
721 }
722 SampleFormat::I16 => {
723 let pipeline_for_stream = Arc::clone(&pipeline);
724 device.build_input_stream(
725 config,
726 move |data: &[i16], _| {
727 let converted = data
728 .iter()
729 .map(|sample| *sample as f32 / i16::MAX as f32)
730 .collect::<Vec<_>>();
731 process_stream_buffer(&pipeline_for_stream, &converted, input_channels)
732 },
733 err_fn,
734 None,
735 )
736 }
737 SampleFormat::U16 => {
738 let pipeline_for_stream = Arc::clone(&pipeline);
739 device.build_input_stream(
740 config,
741 move |data: &[u16], _| {
742 let converted = data
743 .iter()
744 .map(|sample| (*sample as f32 - 32768.0) / 32768.0)
745 .collect::<Vec<_>>();
746 process_stream_buffer(&pipeline_for_stream, &converted, input_channels)
747 },
748 err_fn,
749 None,
750 )
751 }
752 other => {
753 return Err(AudioError::new(format!(
754 "Unsupported input sample format: {:?}",
755 other
756 )))
757 }
758 }
759 .map_err(|err| AudioError::new(format!("Failed to build input stream: {}", err)))?;
760
761 Ok(stream)
762}
763
764fn build_output_stream(
765 device: &cpal::Device,
766 sample_format: SampleFormat,
767 config: &StreamConfig,
768 playback_buffer: Arc<Mutex<PlaybackBuffer>>,
769 err_fn: fn(cpal::StreamError),
770) -> AudioResult<cpal::Stream> {
771 let output_channels = config.channels;
772 let stream = match sample_format {
773 SampleFormat::F32 => {
774 let buffer = Arc::clone(&playback_buffer);
775 device.build_output_stream(
776 config,
777 move |data: &mut [f32], _| fill_output_buffer_f32(data, output_channels, &buffer),
778 err_fn,
779 None,
780 )
781 }
782 SampleFormat::I16 => {
783 let buffer = Arc::clone(&playback_buffer);
784 device.build_output_stream(
785 config,
786 move |data: &mut [i16], _| fill_output_buffer_i16(data, output_channels, &buffer),
787 err_fn,
788 None,
789 )
790 }
791 SampleFormat::U16 => {
792 let buffer = Arc::clone(&playback_buffer);
793 device.build_output_stream(
794 config,
795 move |data: &mut [u16], _| fill_output_buffer_u16(data, output_channels, &buffer),
796 err_fn,
797 None,
798 )
799 }
800 other => {
801 return Err(AudioError::new(format!(
802 "Unsupported output sample format: {:?}",
803 other
804 )))
805 }
806 }
807 .map_err(|err| AudioError::new(format!("Failed to build output stream: {}", err)))?;
808
809 Ok(stream)
810}
811
812fn fill_output_buffer_f32(
813 data: &mut [f32],
814 channels: u16,
815 playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
816) {
817 let channels = channels.max(1) as usize;
818 if let Ok(mut buffer) = playback_buffer.lock() {
819 for frame in data.chunks_mut(channels) {
820 let sample = buffer.pop_mono_sample().unwrap_or(0.0).clamp(-1.0, 1.0);
821 for out in frame {
822 *out = sample;
823 }
824 }
825 } else {
826 data.fill(0.0);
827 }
828}
829
830fn fill_output_buffer_i16(
831 data: &mut [i16],
832 channels: u16,
833 playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
834) {
835 let channels = channels.max(1) as usize;
836 if let Ok(mut buffer) = playback_buffer.lock() {
837 for frame in data.chunks_mut(channels) {
838 let sample =
839 (buffer.pop_mono_sample().unwrap_or(0.0).clamp(-1.0, 1.0) * i16::MAX as f32) as i16;
840 for out in frame {
841 *out = sample;
842 }
843 }
844 } else {
845 data.fill(0);
846 }
847}
848
849fn fill_output_buffer_u16(
850 data: &mut [u16],
851 channels: u16,
852 playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
853) {
854 let channels = channels.max(1) as usize;
855 if let Ok(mut buffer) = playback_buffer.lock() {
856 for frame in data.chunks_mut(channels) {
857 let sample = ((buffer.pop_mono_sample().unwrap_or(0.0).clamp(-1.0, 1.0) * 32767.0)
858 + 32768.0)
859 .round()
860 .clamp(0.0, u16::MAX as f32) as u16;
861 for out in frame {
862 *out = sample;
863 }
864 }
865 } else {
866 data.fill(u16::MIN);
867 }
868}
869
870fn select_input_device(
871 host: &cpal::Host,
872 preferred_name: Option<&str>,
873) -> AudioResult<cpal::Device> {
874 #[cfg(target_os = "linux")]
875 {
876 if let Some(preferred_name) = preferred_name {
877 if let Err(err) = set_default_input_device(preferred_name) {
878 log::warn!(
879 "Failed to set preferred PipeWire source '{}' as default: {}",
880 preferred_name,
881 err
882 );
883 }
884 }
885
886 if let Some(device) = find_named_input_device(host, "pipewire")? {
887 return Ok(device);
888 }
889 }
890
891 if let Some(preferred_name) = preferred_name {
892 let devices = host.input_devices().map_err(|err| {
893 AudioError::new(format!("Failed to enumerate input devices: {}", err))
894 })?;
895 for device in devices {
896 if device.description().ok().map(|d| d.to_string()).as_deref() == Some(preferred_name) {
897 return Ok(device);
898 }
899 }
900 log::warn!(
901 "Preferred input device '{}' not found, falling back to system default",
902 preferred_name
903 );
904 }
905
906 host.default_input_device()
907 .ok_or_else(|| AudioError::new("No input device available"))
908}
909
910fn find_named_input_device(
911 host: &cpal::Host,
912 target_name: &str,
913) -> AudioResult<Option<cpal::Device>> {
914 let devices = host
915 .input_devices()
916 .map_err(|err| AudioError::new(format!("Failed to enumerate input devices: {}", err)))?;
917 for device in devices {
918 if device
919 .description()
920 .ok()
921 .is_some_and(|name| name.to_string().eq_ignore_ascii_case(target_name))
922 {
923 return Ok(Some(device));
924 }
925 }
926 Ok(None)
927}
928
929fn process_stream_buffer(pipeline: &Arc<Mutex<AudioPipeline>>, data: &[f32], channels: u16) {
930 if let Ok(mut pipeline) = pipeline.lock() {
931 if let Err(err) = pipeline.process_input(data, channels) {
932 log::error!("Audio pipeline processing failed: {}", err);
933 }
934 }
935}
936
937fn select_stream_config(
938 device: &cpal::Device,
939 target_sample_rate: u32,
940 target_channels: u16,
941) -> Option<(u32, u16)> {
942 let supported_configs = device.supported_input_configs().ok()?;
943 let mut best_match = None;
944 for cfg in supported_configs {
945 let min_rate = cfg.min_sample_rate();
946 let max_rate = cfg.max_sample_rate();
947 let channels = cfg.channels();
948 if target_sample_rate >= min_rate
949 && target_sample_rate <= max_rate
950 && channels == target_channels
951 {
952 return Some((target_sample_rate, target_channels));
953 }
954 if best_match.is_none() && target_sample_rate >= min_rate && target_sample_rate <= max_rate
955 {
956 best_match = Some((target_sample_rate, channels));
957 }
958 }
959 best_match
960}
961
962struct NativeRecordingHandle {
963 cmd_tx: mpsc::Sender<NativeCommand>,
964 join_handle: Option<JoinHandle<()>>,
965}
966
967struct NativeListeningHandle {
968 cmd_tx: mpsc::Sender<NativeListeningCommand>,
969 join_handle: Option<JoinHandle<()>>,
970}
971
972impl ActiveRecording for NativeRecordingHandle {
973 fn stop(&mut self) -> AudioResult<CaptureDiagnostics> {
974 let (reply_tx, reply_rx) = mpsc::channel();
975 self.cmd_tx
976 .send(NativeCommand::Stop(reply_tx))
977 .map_err(|_| AudioError::new("Native capture thread is no longer running"))?;
978 let diagnostics = reply_rx
979 .recv()
980 .map_err(|_| AudioError::new("No response from native capture thread"))??;
981 if let Some(handle) = self.join_handle.take() {
982 let _ = handle.join();
983 }
984 Ok(diagnostics)
985 }
986}
987
988impl ActiveListening for NativeListeningHandle {
989 fn stop(&mut self) -> AudioResult<CaptureDiagnostics> {
990 let (reply_tx, reply_rx) = mpsc::channel();
991 self.cmd_tx
992 .send(NativeListeningCommand::Stop(reply_tx))
993 .map_err(|_| AudioError::new("Native listening thread is no longer running"))?;
994 let diagnostics = reply_rx
995 .recv()
996 .map_err(|_| AudioError::new("No response from native listening thread"))??;
997 if let Some(handle) = self.join_handle.take() {
998 let _ = handle.join();
999 }
1000 Ok(diagnostics)
1001 }
1002
1003 fn update_config(&mut self, config: NativeCaptureConfig) -> AudioResult<()> {
1004 let (reply_tx, reply_rx) = mpsc::channel();
1005 self.cmd_tx
1006 .send(NativeListeningCommand::UpdateConfig(config, reply_tx))
1007 .map_err(|_| AudioError::new("Native listening thread is no longer running"))?;
1008 reply_rx
1009 .recv()
1010 .map_err(|_| AudioError::new("No response from native listening thread"))??;
1011 Ok(())
1012 }
1013}