Skip to main content

zk_audio/
native.rs

1use crate::builder::{NativePipelineBuilder, PipelineBuildRequest};
2use crate::contracts::{CaptureBackend, DeviceEnumerator, ProcessorBuildRequest, ProcessorFactory};
3use crate::core::{
4    ActiveListening, ActiveRecording, AudioError, AudioResult, CaptureDiagnostics, InputDeviceInfo,
5    NativeCaptureConfig,
6};
7use crate::factory::ProfileProcessorFactory;
8use crate::metrics::LevelMetrics;
9use crate::mic_sim::MicrophoneSimulatorFactory;
10use crate::pipeline::{AudioPipeline, PlaybackBuffer, PlaybackSink};
11use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
12use cpal::{SampleFormat, StreamConfig};
13use serde_json::Value;
14use std::process::Command;
15use std::sync::mpsc;
16use std::sync::{Arc, Mutex};
17use std::thread::{self, JoinHandle};
18use std::time::Instant;
19
20pub struct NativeCaptureBackend;
21
22impl DeviceEnumerator for NativeCaptureBackend {
23    fn list_input_devices(&self) -> AudioResult<Vec<InputDeviceInfo>> {
24        list_input_devices()
25    }
26
27    fn set_default_input_device(&self, device_id: &str) -> AudioResult<()> {
28        set_default_input_device(device_id)
29    }
30}
31
32pub fn list_input_devices() -> AudioResult<Vec<InputDeviceInfo>> {
33    #[cfg(target_os = "linux")]
34    if let Ok(devices) = list_pipewire_input_devices() {
35        if !devices.is_empty() {
36            return Ok(devices);
37        }
38    }
39
40    list_cpal_input_devices()
41}
42
43pub fn set_default_input_device(device_id: &str) -> AudioResult<()> {
44    #[cfg(target_os = "linux")]
45    {
46        let output = Command::new("pactl")
47            .args(["set-default-source", device_id])
48            .output()
49            .map_err(|err| {
50                AudioError::new(format!(
51                    "Failed to invoke pactl set-default-source for '{}': {}",
52                    device_id, err
53                ))
54            })?;
55
56        if !output.status.success() {
57            let stderr = String::from_utf8_lossy(&output.stderr);
58            return Err(AudioError::new(format!(
59                "pactl set-default-source '{}' failed: {}",
60                device_id,
61                stderr.trim()
62            )));
63        }
64
65        return Ok(());
66    }
67
68    #[allow(unreachable_code)]
69    Err(AudioError::new(
70        "Setting the default input device is only supported on Linux",
71    ))
72}
73
74fn list_cpal_input_devices() -> AudioResult<Vec<InputDeviceInfo>> {
75    let host = cpal::default_host();
76    let default_name = host
77        .default_input_device()
78        .and_then(|device| device.description().ok())
79        .map(|d| d.to_string());
80    let devices = host
81        .input_devices()
82        .map_err(|err| AudioError::new(format!("Failed to enumerate input devices: {}", err)))?;
83
84    let mut result = Vec::new();
85    for device in devices {
86        let name = device
87            .description()
88            .map_err(|err| AudioError::new(format!("Failed to read input device name: {}", err)))?
89            .to_string();
90        let is_default = default_name.as_deref() == Some(name.as_str());
91        result.push(InputDeviceInfo {
92            id: name.clone(),
93            name,
94            is_default,
95        });
96    }
97
98    result.sort_by(|a, b| a.name.cmp(&b.name));
99    Ok(result)
100}
101
102#[cfg(target_os = "linux")]
103fn list_pipewire_input_devices() -> AudioResult<Vec<InputDeviceInfo>> {
104    let default_source = pactl_default_source().ok().flatten();
105    let output = Command::new("pactl")
106        .args(["-f", "json", "list", "sources"])
107        .output()
108        .map_err(|err| AudioError::new(format!("Failed to invoke pactl list sources: {}", err)))?;
109
110    if !output.status.success() {
111        let stderr = String::from_utf8_lossy(&output.stderr);
112        return Err(AudioError::new(format!(
113            "pactl list sources failed: {}",
114            stderr.trim()
115        )));
116    }
117
118    let value: Value = serde_json::from_slice(&output.stdout)
119        .map_err(|err| AudioError::new(format!("Failed to parse pactl source list: {}", err)))?;
120    let sources = value
121        .as_array()
122        .ok_or_else(|| AudioError::new("Unexpected pactl source list payload"))?;
123
124    let mut result = Vec::new();
125
126    for source in sources {
127        if !is_selectable_pipewire_source(source) {
128            continue;
129        }
130
131        let Some(id) = source.get("name").and_then(Value::as_str) else {
132            continue;
133        };
134
135        let display_name = friendly_pipewire_source_name(source).unwrap_or_else(|| id.to_string());
136        let is_default = default_source.as_deref() == Some(id);
137
138        result.push(InputDeviceInfo {
139            id: id.to_string(),
140            name: display_name,
141            is_default,
142        });
143    }
144
145    result.sort_by(|a, b| a.name.cmp(&b.name));
146    Ok(result)
147}
148
149#[cfg(target_os = "linux")]
150fn pactl_default_source() -> AudioResult<Option<String>> {
151    let output = Command::new("pactl")
152        .arg("info")
153        .output()
154        .map_err(|err| AudioError::new(format!("Failed to invoke pactl info: {}", err)))?;
155
156    if !output.status.success() {
157        let stderr = String::from_utf8_lossy(&output.stderr);
158        return Err(AudioError::new(format!(
159            "pactl info failed: {}",
160            stderr.trim()
161        )));
162    }
163
164    let stdout = String::from_utf8_lossy(&output.stdout);
165    Ok(stdout
166        .lines()
167        .find_map(|line| line.strip_prefix("Default Source: "))
168        .map(|value| value.trim().to_string()))
169}
170
171#[cfg(target_os = "linux")]
172fn is_selectable_pipewire_source(source: &Value) -> bool {
173    let Some(id) = source.get("name").and_then(Value::as_str) else {
174        return false;
175    };
176    if id.ends_with(".monitor") {
177        return false;
178    }
179
180    if source
181        .get("monitor_source")
182        .and_then(Value::as_str)
183        .is_some_and(|value| !value.is_empty())
184    {
185        return false;
186    }
187
188    let properties = source.get("properties").and_then(Value::as_object);
189    if properties
190        .and_then(|props| props.get("media.class"))
191        .and_then(Value::as_str)
192        .is_some_and(|class| class != "Audio/Source")
193    {
194        return false;
195    }
196
197    let description = source
198        .get("description")
199        .and_then(Value::as_str)
200        .unwrap_or_default();
201    !description.starts_with("Monitor of ")
202}
203
204#[cfg(target_os = "linux")]
205fn friendly_pipewire_source_name(source: &Value) -> Option<String> {
206    let properties = source.get("properties").and_then(Value::as_object);
207    let device_description = properties
208        .and_then(|props| props.get("device.description"))
209        .and_then(Value::as_str)
210        .or_else(|| source.get("description").and_then(Value::as_str))?;
211
212    let active_port_name = source.get("active_port").and_then(Value::as_str);
213    let active_port_description = source
214        .get("ports")
215        .and_then(Value::as_array)
216        .and_then(|ports| {
217            ports.iter().find_map(|port| {
218                let name = port.get("name").and_then(Value::as_str)?;
219                if Some(name) == active_port_name {
220                    port.get("description").and_then(Value::as_str)
221                } else {
222                    None
223                }
224            })
225        });
226
227    if let Some(port_description) = active_port_description {
228        return Some(format!("{} - {}", port_description, device_description));
229    }
230
231    Some(device_description.to_string())
232}
233
234impl NativeCaptureBackend {
235    pub fn new() -> Self {
236        Self
237    }
238
239    fn build_pipeline(
240        config: &NativeCaptureConfig,
241        sample_rate: u32,
242        device_channels: u16,
243        device_name: Option<String>,
244    ) -> AudioResult<AudioPipeline> {
245        NativePipelineBuilder::new().build(PipelineBuildRequest {
246            output_path: config.output_path.clone(),
247            profile: config.profile,
248            sample_rate,
249            device_channels,
250            device_name,
251            gain_db: config.input_gain_db,
252            limiter_threshold: config.limiter_threshold,
253            high_pass_hz: config.high_pass_hz,
254            noise_suppression_amount: config.noise_suppression_amount,
255            noise_calibration_ms: config.noise_calibration_ms,
256            delay_effect: config.delay_effect,
257            stage_overrides: config.stage_overrides.clone(),
258            microphone_sim: config.microphone_sim,
259        })
260    }
261
262    fn build_live_pipeline(
263        config: &NativeCaptureConfig,
264        sample_rate: u32,
265        device_channels: u16,
266        device_name: Option<String>,
267        playback_buffer: Arc<Mutex<PlaybackBuffer>>,
268    ) -> AudioResult<AudioPipeline> {
269        let output_spec = crate::core::AudioSpec {
270            sample_rate,
271            channels: 1,
272        };
273        let processors =
274            ProfileProcessorFactory::new().build_processors(ProcessorBuildRequest {
275                profile: config.profile,
276                gain_db: config.input_gain_db,
277                limiter_threshold: config.limiter_threshold,
278                high_pass_hz: config.high_pass_hz,
279                noise_suppression_amount: config.noise_suppression_amount,
280                noise_calibration_ms: config.noise_calibration_ms,
281                delay_effect: config.delay_effect,
282                stage_overrides: config.stage_overrides.clone(),
283            })?;
284        let processor_names = processors
285            .iter()
286            .map(|processor| processor.name().to_string())
287            .collect::<Vec<_>>();
288        let microphone_sim_processors =
289            MicrophoneSimulatorFactory::new().build_processors(config.microphone_sim)?;
290        let microphone_sim_processor_names = microphone_sim_processors
291            .iter()
292            .map(|processor| processor.name().to_string())
293            .collect::<Vec<_>>();
294        let microphone_sim_model = config
295            .microphone_sim
296            .active_model()
297            .map(|model| model.as_str().to_string());
298
299        let mut notes = config
300            .delay_effect
301            .map(|effect| vec![format!("delay_effect={}", effect.preset.as_str())])
302            .unwrap_or_default();
303        notes.push("mode=live_listening".to_string());
304        notes.push("output=default".to_string());
305        notes.push("pipeline_order=microphone_sim->voice_processing->playback".to_string());
306        if let Some(model) = &microphone_sim_model {
307            notes.push(format!("microphone_sim={}", model));
308        }
309
310        AudioPipeline::new(
311            output_spec,
312            CaptureDiagnostics {
313                backend: "native_listening".to_string(),
314                profile: config.profile.as_str().to_string(),
315                profile_base: Some(config.profile.as_str().to_string()),
316                device_name,
317                sample_rate: Some(sample_rate),
318                channels: Some(device_channels),
319                processor_names,
320                processor_stage_overrides: config
321                    .stage_overrides
322                    .iter()
323                    .map(|(stage, mode)| format!("{}={}", stage, mode.as_str()))
324                    .collect(),
325                resolved_delay_preset: config
326                    .delay_effect
327                    .map(|effect| effect.preset.as_str().to_string()),
328                microphone_sim_model,
329                microphone_sim_processor_names,
330                notes,
331                ..CaptureDiagnostics::default()
332            },
333            microphone_sim_processors,
334            processors,
335            Box::new(PlaybackSink::from_buffer(playback_buffer)),
336            Box::new(LevelMetrics::default()),
337        )
338    }
339
340    pub fn start_live(&self, config: NativeCaptureConfig) -> AudioResult<Box<dyn ActiveListening>> {
341        let (ready_tx, ready_rx) = mpsc::channel();
342        let (cmd_tx, cmd_rx) = mpsc::channel();
343
344        let handle = thread::spawn(move || {
345            native_listening_thread(config, cmd_rx, ready_tx);
346        });
347
348        match ready_rx
349            .recv()
350            .map_err(|_| AudioError::new("Native listening thread failed to report readiness"))?
351        {
352            Ok(()) => Ok(Box::new(NativeListeningHandle {
353                cmd_tx,
354                join_handle: Some(handle),
355            })),
356            Err(err) => {
357                let _ = handle.join();
358                Err(err)
359            }
360        }
361    }
362}
363
364impl CaptureBackend for NativeCaptureBackend {
365    fn start(&self, config: NativeCaptureConfig) -> AudioResult<Box<dyn ActiveRecording>> {
366        let (ready_tx, ready_rx) = mpsc::channel();
367        let (cmd_tx, cmd_rx) = mpsc::channel();
368
369        let handle = thread::spawn(move || {
370            native_capture_thread(config, cmd_rx, ready_tx);
371        });
372
373        match ready_rx
374            .recv()
375            .map_err(|_| AudioError::new("Native capture thread failed to report readiness"))?
376        {
377            Ok(()) => Ok(Box::new(NativeRecordingHandle {
378                cmd_tx,
379                join_handle: Some(handle),
380            })),
381            Err(err) => {
382                let _ = handle.join();
383                Err(err)
384            }
385        }
386    }
387}
388
389enum NativeCommand {
390    Stop(mpsc::Sender<AudioResult<CaptureDiagnostics>>),
391}
392
393enum NativeListeningCommand {
394    Stop(mpsc::Sender<AudioResult<CaptureDiagnostics>>),
395    UpdateConfig(NativeCaptureConfig, mpsc::Sender<AudioResult<()>>),
396}
397
398fn native_capture_thread(
399    config: NativeCaptureConfig,
400    cmd_rx: mpsc::Receiver<NativeCommand>,
401    ready_tx: mpsc::Sender<AudioResult<()>>,
402) {
403    let result = start_native_stream(config);
404    let (stream, pipeline, start_time, notes) = match result {
405        Ok(values) => {
406            let _ = ready_tx.send(Ok(()));
407            values
408        }
409        Err(err) => {
410            let _ = ready_tx.send(Err(err));
411            return;
412        }
413    };
414
415    let _keep_stream_alive = stream;
416
417    match cmd_rx.recv() {
418        Ok(NativeCommand::Stop(reply_tx)) => {
419            drop(_keep_stream_alive);
420            let duration_ms = Some(start_time.elapsed().as_millis() as i64);
421            let diagnostics = pipeline
422                .lock()
423                .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))
424                .and_then(|mut pipeline| pipeline.finalize(duration_ms))
425                .map(|mut diagnostics| {
426                    diagnostics.notes.extend(notes);
427                    diagnostics
428                });
429            let _ = reply_tx.send(diagnostics);
430        }
431        Err(_) => {}
432    }
433}
434
435fn native_listening_thread(
436    config: NativeCaptureConfig,
437    cmd_rx: mpsc::Receiver<NativeListeningCommand>,
438    ready_tx: mpsc::Sender<AudioResult<()>>,
439) {
440    let result = start_live_monitor_stream(config);
441    let (input_stream, output_stream, pipeline, playback_buffer, start_time, notes) = match result {
442        Ok(values) => {
443            let _ = ready_tx.send(Ok(()));
444            values
445        }
446        Err(err) => {
447            let _ = ready_tx.send(Err(err));
448            return;
449        }
450    };
451
452    let _keep_input_stream_alive = input_stream;
453    let _keep_output_stream_alive = output_stream;
454
455    loop {
456        match cmd_rx.recv() {
457            Ok(NativeListeningCommand::Stop(reply_tx)) => {
458                drop(_keep_input_stream_alive);
459                drop(_keep_output_stream_alive);
460                let duration_ms = Some(start_time.elapsed().as_millis() as i64);
461                let diagnostics = pipeline
462                    .lock()
463                    .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))
464                    .and_then(|mut pipeline| pipeline.finalize(duration_ms))
465                    .map(|mut diagnostics| {
466                        diagnostics.notes.extend(notes.clone());
467                        diagnostics
468                    });
469                let _ = reply_tx.send(diagnostics);
470                break;
471            }
472            Ok(NativeListeningCommand::UpdateConfig(config, reply_tx)) => {
473                let result = rebuild_live_pipeline(&pipeline, &playback_buffer, config);
474                let _ = reply_tx.send(result);
475            }
476            Err(_) => break,
477        }
478    }
479}
480
481type NativeThreadState = (
482    cpal::Stream,
483    Arc<Mutex<AudioPipeline>>,
484    Instant,
485    Vec<String>,
486);
487
488type NativeListeningThreadState = (
489    cpal::Stream,
490    cpal::Stream,
491    Arc<Mutex<AudioPipeline>>,
492    Arc<Mutex<PlaybackBuffer>>,
493    Instant,
494    Vec<String>,
495);
496
497fn start_native_stream(config: NativeCaptureConfig) -> AudioResult<NativeThreadState> {
498    let host = cpal::default_host();
499    let device = select_input_device(&host, config.preferred_input_device.as_deref())?;
500    let device_name = device.description().ok().map(|d| d.to_string());
501    let supported_config = device
502        .default_input_config()
503        .map_err(|err| AudioError::new(format!("Failed to read input device config: {}", err)))?;
504
505    let (sample_rate, channels) =
506        select_stream_config(&device, config.target_sample_rate, config.target_channels).unwrap_or(
507            (
508                supported_config.sample_rate(),
509                supported_config.channels(),
510            ),
511        );
512
513    let pipeline = Arc::new(Mutex::new(NativeCaptureBackend::build_pipeline(
514        &config,
515        sample_rate,
516        channels,
517        device_name.clone(),
518    )?));
519
520    let cpal_config = StreamConfig {
521        channels,
522        sample_rate: sample_rate,
523        buffer_size: cpal::BufferSize::Default,
524    };
525    let stream = build_input_stream(
526        &device,
527        supported_config.sample_format(),
528        &cpal_config,
529        channels,
530        Arc::clone(&pipeline),
531        native_input_stream_error,
532    )?;
533
534    stream
535        .play()
536        .map_err(|err| AudioError::new(format!("Failed to start input stream: {}", err)))?;
537
538    Ok((
539        stream,
540        pipeline,
541        Instant::now(),
542        vec![
543            format!("device={}", device_name.unwrap_or_else(|| "unknown".into())),
544            format!(
545                "configured_device={}",
546                config
547                    .preferred_input_device
548                    .unwrap_or_else(|| "system_default".to_string())
549            ),
550            format!("channels={}", channels),
551            format!("sample_rate={}", sample_rate),
552            format!("noise_calibration_ms={}", config.noise_calibration_ms),
553        ],
554    ))
555}
556
557fn native_input_stream_error(err: cpal::StreamError) {
558    log::error!("Native audio stream error: {}", err);
559}
560
561fn native_listening_input_stream_error(err: cpal::StreamError) {
562    log::error!("Native listening input stream error: {}", err);
563}
564
565fn native_listening_output_stream_error(err: cpal::StreamError) {
566    log::error!("Native listening output stream error: {}", err);
567}
568
569fn start_live_monitor_stream(
570    config: NativeCaptureConfig,
571) -> AudioResult<NativeListeningThreadState> {
572    let host = cpal::default_host();
573    let input_device = select_input_device(&host, config.preferred_input_device.as_deref())?;
574    let input_device_name = input_device.description().ok().map(|d| d.to_string());
575    let input_config = input_device
576        .default_input_config()
577        .map_err(|err| AudioError::new(format!("Failed to read input device config: {}", err)))?;
578    let output_device = host
579        .default_output_device()
580        .ok_or_else(|| AudioError::new("No output device available"))?;
581    let output_device_name = output_device.description().ok().map(|d| d.to_string());
582    let output_config = output_device
583        .default_output_config()
584        .map_err(|err| AudioError::new(format!("Failed to read output device config: {}", err)))?;
585
586    let output_sample_rate = output_config.sample_rate();
587    let output_channels = output_config.channels().max(1);
588    let input_channels = select_stream_config(
589        &input_device,
590        output_sample_rate,
591        config.target_channels.max(1),
592    )
593    .map(|(_, channels)| channels)
594    .ok_or_else(|| {
595        AudioError::new(format!(
596            "Input device does not support {} Hz for live listening",
597            output_sample_rate
598        ))
599    })?;
600
601    let cpal_input_config = StreamConfig {
602        channels: input_channels,
603        sample_rate: output_sample_rate,
604        buffer_size: cpal::BufferSize::Default,
605    };
606    let cpal_output_config = StreamConfig {
607        channels: output_channels,
608        sample_rate: output_sample_rate,
609        buffer_size: cpal::BufferSize::Default,
610    };
611
612    let max_buffer_samples = (output_sample_rate as usize * 2).max(4096);
613    let (_, playback_buffer) = PlaybackSink::with_capacity(max_buffer_samples);
614    let pipeline = Arc::new(Mutex::new(NativeCaptureBackend::build_live_pipeline(
615        &config,
616        output_sample_rate,
617        input_channels,
618        input_device_name.clone(),
619        Arc::clone(&playback_buffer),
620    )?));
621
622    let input_stream = build_input_stream(
623        &input_device,
624        input_config.sample_format(),
625        &cpal_input_config,
626        input_channels,
627        Arc::clone(&pipeline),
628        native_listening_input_stream_error,
629    )?;
630    let output_stream = build_output_stream(
631        &output_device,
632        output_config.sample_format(),
633        &cpal_output_config,
634        Arc::clone(&playback_buffer),
635        native_listening_output_stream_error,
636    )?;
637
638    input_stream
639        .play()
640        .map_err(|err| AudioError::new(format!("Failed to start input stream: {}", err)))?;
641    output_stream
642        .play()
643        .map_err(|err| AudioError::new(format!("Failed to start output stream: {}", err)))?;
644
645    Ok((
646        input_stream,
647        output_stream,
648        pipeline,
649        playback_buffer,
650        Instant::now(),
651        vec![
652            format!(
653                "input_device={}",
654                input_device_name.unwrap_or_else(|| "unknown".into())
655            ),
656            format!(
657                "output_device={}",
658                output_device_name.unwrap_or_else(|| "unknown".into())
659            ),
660            format!("input_channels={}", input_channels),
661            format!("output_channels={}", output_channels),
662            format!("sample_rate={}", output_sample_rate),
663        ],
664    ))
665}
666
667fn rebuild_live_pipeline(
668    pipeline: &Arc<Mutex<AudioPipeline>>,
669    playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
670    config: NativeCaptureConfig,
671) -> AudioResult<()> {
672    if let Ok(mut buffer) = playback_buffer.lock() {
673        buffer.clear();
674    }
675
676    let (pipeline_sample_rate, pipeline_channels, device_name) = {
677        let current = pipeline
678            .lock()
679            .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))?;
680        (
681            current.sample_rate(),
682            current.input_channels(),
683            current.device_name().map(ToOwned::to_owned),
684        )
685    };
686
687    let rebuilt = NativeCaptureBackend::build_live_pipeline(
688        &config,
689        pipeline_sample_rate,
690        pipeline_channels,
691        device_name,
692        Arc::clone(playback_buffer),
693    )?;
694
695    let mut current = pipeline
696        .lock()
697        .map_err(|_| AudioError::new("Audio pipeline lock poisoned"))?;
698    *current = rebuilt;
699    Ok(())
700}
701
702fn build_input_stream(
703    device: &cpal::Device,
704    sample_format: SampleFormat,
705    config: &StreamConfig,
706    input_channels: u16,
707    pipeline: Arc<Mutex<AudioPipeline>>,
708    err_fn: fn(cpal::StreamError),
709) -> AudioResult<cpal::Stream> {
710    let stream = match sample_format {
711        SampleFormat::F32 => {
712            let pipeline_for_stream = Arc::clone(&pipeline);
713            device.build_input_stream(
714                config,
715                move |data: &[f32], _| {
716                    process_stream_buffer(&pipeline_for_stream, data, input_channels)
717                },
718                err_fn,
719                None,
720            )
721        }
722        SampleFormat::I16 => {
723            let pipeline_for_stream = Arc::clone(&pipeline);
724            device.build_input_stream(
725                config,
726                move |data: &[i16], _| {
727                    let converted = data
728                        .iter()
729                        .map(|sample| *sample as f32 / i16::MAX as f32)
730                        .collect::<Vec<_>>();
731                    process_stream_buffer(&pipeline_for_stream, &converted, input_channels)
732                },
733                err_fn,
734                None,
735            )
736        }
737        SampleFormat::U16 => {
738            let pipeline_for_stream = Arc::clone(&pipeline);
739            device.build_input_stream(
740                config,
741                move |data: &[u16], _| {
742                    let converted = data
743                        .iter()
744                        .map(|sample| (*sample as f32 - 32768.0) / 32768.0)
745                        .collect::<Vec<_>>();
746                    process_stream_buffer(&pipeline_for_stream, &converted, input_channels)
747                },
748                err_fn,
749                None,
750            )
751        }
752        other => {
753            return Err(AudioError::new(format!(
754                "Unsupported input sample format: {:?}",
755                other
756            )))
757        }
758    }
759    .map_err(|err| AudioError::new(format!("Failed to build input stream: {}", err)))?;
760
761    Ok(stream)
762}
763
764fn build_output_stream(
765    device: &cpal::Device,
766    sample_format: SampleFormat,
767    config: &StreamConfig,
768    playback_buffer: Arc<Mutex<PlaybackBuffer>>,
769    err_fn: fn(cpal::StreamError),
770) -> AudioResult<cpal::Stream> {
771    let output_channels = config.channels;
772    let stream = match sample_format {
773        SampleFormat::F32 => {
774            let buffer = Arc::clone(&playback_buffer);
775            device.build_output_stream(
776                config,
777                move |data: &mut [f32], _| fill_output_buffer_f32(data, output_channels, &buffer),
778                err_fn,
779                None,
780            )
781        }
782        SampleFormat::I16 => {
783            let buffer = Arc::clone(&playback_buffer);
784            device.build_output_stream(
785                config,
786                move |data: &mut [i16], _| fill_output_buffer_i16(data, output_channels, &buffer),
787                err_fn,
788                None,
789            )
790        }
791        SampleFormat::U16 => {
792            let buffer = Arc::clone(&playback_buffer);
793            device.build_output_stream(
794                config,
795                move |data: &mut [u16], _| fill_output_buffer_u16(data, output_channels, &buffer),
796                err_fn,
797                None,
798            )
799        }
800        other => {
801            return Err(AudioError::new(format!(
802                "Unsupported output sample format: {:?}",
803                other
804            )))
805        }
806    }
807    .map_err(|err| AudioError::new(format!("Failed to build output stream: {}", err)))?;
808
809    Ok(stream)
810}
811
812fn fill_output_buffer_f32(
813    data: &mut [f32],
814    channels: u16,
815    playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
816) {
817    let channels = channels.max(1) as usize;
818    if let Ok(mut buffer) = playback_buffer.lock() {
819        for frame in data.chunks_mut(channels) {
820            let sample = buffer.pop_mono_sample().unwrap_or(0.0).clamp(-1.0, 1.0);
821            for out in frame {
822                *out = sample;
823            }
824        }
825    } else {
826        data.fill(0.0);
827    }
828}
829
830fn fill_output_buffer_i16(
831    data: &mut [i16],
832    channels: u16,
833    playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
834) {
835    let channels = channels.max(1) as usize;
836    if let Ok(mut buffer) = playback_buffer.lock() {
837        for frame in data.chunks_mut(channels) {
838            let sample =
839                (buffer.pop_mono_sample().unwrap_or(0.0).clamp(-1.0, 1.0) * i16::MAX as f32) as i16;
840            for out in frame {
841                *out = sample;
842            }
843        }
844    } else {
845        data.fill(0);
846    }
847}
848
849fn fill_output_buffer_u16(
850    data: &mut [u16],
851    channels: u16,
852    playback_buffer: &Arc<Mutex<PlaybackBuffer>>,
853) {
854    let channels = channels.max(1) as usize;
855    if let Ok(mut buffer) = playback_buffer.lock() {
856        for frame in data.chunks_mut(channels) {
857            let sample = ((buffer.pop_mono_sample().unwrap_or(0.0).clamp(-1.0, 1.0) * 32767.0)
858                + 32768.0)
859                .round()
860                .clamp(0.0, u16::MAX as f32) as u16;
861            for out in frame {
862                *out = sample;
863            }
864        }
865    } else {
866        data.fill(u16::MIN);
867    }
868}
869
870fn select_input_device(
871    host: &cpal::Host,
872    preferred_name: Option<&str>,
873) -> AudioResult<cpal::Device> {
874    #[cfg(target_os = "linux")]
875    {
876        if let Some(preferred_name) = preferred_name {
877            if let Err(err) = set_default_input_device(preferred_name) {
878                log::warn!(
879                    "Failed to set preferred PipeWire source '{}' as default: {}",
880                    preferred_name,
881                    err
882                );
883            }
884        }
885
886        if let Some(device) = find_named_input_device(host, "pipewire")? {
887            return Ok(device);
888        }
889    }
890
891    if let Some(preferred_name) = preferred_name {
892        let devices = host.input_devices().map_err(|err| {
893            AudioError::new(format!("Failed to enumerate input devices: {}", err))
894        })?;
895        for device in devices {
896            if device.description().ok().map(|d| d.to_string()).as_deref() == Some(preferred_name) {
897                return Ok(device);
898            }
899        }
900        log::warn!(
901            "Preferred input device '{}' not found, falling back to system default",
902            preferred_name
903        );
904    }
905
906    host.default_input_device()
907        .ok_or_else(|| AudioError::new("No input device available"))
908}
909
910fn find_named_input_device(
911    host: &cpal::Host,
912    target_name: &str,
913) -> AudioResult<Option<cpal::Device>> {
914    let devices = host
915        .input_devices()
916        .map_err(|err| AudioError::new(format!("Failed to enumerate input devices: {}", err)))?;
917    for device in devices {
918        if device
919            .description()
920            .ok()
921            .is_some_and(|name| name.to_string().eq_ignore_ascii_case(target_name))
922        {
923            return Ok(Some(device));
924        }
925    }
926    Ok(None)
927}
928
929fn process_stream_buffer(pipeline: &Arc<Mutex<AudioPipeline>>, data: &[f32], channels: u16) {
930    if let Ok(mut pipeline) = pipeline.lock() {
931        if let Err(err) = pipeline.process_input(data, channels) {
932            log::error!("Audio pipeline processing failed: {}", err);
933        }
934    }
935}
936
937fn select_stream_config(
938    device: &cpal::Device,
939    target_sample_rate: u32,
940    target_channels: u16,
941) -> Option<(u32, u16)> {
942    let supported_configs = device.supported_input_configs().ok()?;
943    let mut best_match = None;
944    for cfg in supported_configs {
945        let min_rate = cfg.min_sample_rate();
946        let max_rate = cfg.max_sample_rate();
947        let channels = cfg.channels();
948        if target_sample_rate >= min_rate
949            && target_sample_rate <= max_rate
950            && channels == target_channels
951        {
952            return Some((target_sample_rate, target_channels));
953        }
954        if best_match.is_none() && target_sample_rate >= min_rate && target_sample_rate <= max_rate
955        {
956            best_match = Some((target_sample_rate, channels));
957        }
958    }
959    best_match
960}
961
962struct NativeRecordingHandle {
963    cmd_tx: mpsc::Sender<NativeCommand>,
964    join_handle: Option<JoinHandle<()>>,
965}
966
967struct NativeListeningHandle {
968    cmd_tx: mpsc::Sender<NativeListeningCommand>,
969    join_handle: Option<JoinHandle<()>>,
970}
971
972impl ActiveRecording for NativeRecordingHandle {
973    fn stop(&mut self) -> AudioResult<CaptureDiagnostics> {
974        let (reply_tx, reply_rx) = mpsc::channel();
975        self.cmd_tx
976            .send(NativeCommand::Stop(reply_tx))
977            .map_err(|_| AudioError::new("Native capture thread is no longer running"))?;
978        let diagnostics = reply_rx
979            .recv()
980            .map_err(|_| AudioError::new("No response from native capture thread"))??;
981        if let Some(handle) = self.join_handle.take() {
982            let _ = handle.join();
983        }
984        Ok(diagnostics)
985    }
986}
987
988impl ActiveListening for NativeListeningHandle {
989    fn stop(&mut self) -> AudioResult<CaptureDiagnostics> {
990        let (reply_tx, reply_rx) = mpsc::channel();
991        self.cmd_tx
992            .send(NativeListeningCommand::Stop(reply_tx))
993            .map_err(|_| AudioError::new("Native listening thread is no longer running"))?;
994        let diagnostics = reply_rx
995            .recv()
996            .map_err(|_| AudioError::new("No response from native listening thread"))??;
997        if let Some(handle) = self.join_handle.take() {
998            let _ = handle.join();
999        }
1000        Ok(diagnostics)
1001    }
1002
1003    fn update_config(&mut self, config: NativeCaptureConfig) -> AudioResult<()> {
1004        let (reply_tx, reply_rx) = mpsc::channel();
1005        self.cmd_tx
1006            .send(NativeListeningCommand::UpdateConfig(config, reply_tx))
1007            .map_err(|_| AudioError::new("Native listening thread is no longer running"))?;
1008        reply_rx
1009            .recv()
1010            .map_err(|_| AudioError::new("No response from native listening thread"))??;
1011        Ok(())
1012    }
1013}