auxide_io/
stream_controller.rs

1use crate::buffer_size_adapter::{BufferSizeAdapter, MAX_HOST_FRAMES};
2use crate::device_management::default_output_device;
3use crate::device_management::DeviceExt;
4use crate::error_recovery::handle_process_error;
5use crate::stream_state::{AtomicStreamState, StreamState};
6use anyhow::Result;
7use auxide::rt::Runtime;
8use cpal::traits::{DeviceTrait, StreamTrait};
9use cpal::{SampleFormat, Stream};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13pub struct StreamController {
14    stream: Option<Stream>,
15    state: Arc<AtomicStreamState>,
16    error_flag: Arc<AtomicBool>,
17}
18
19impl StreamController {
20    /// Get the best available sample rate for audio output
21    pub fn get_best_sample_rate(requested_rate: f32) -> Result<f32> {
22        let device = default_output_device()?;
23        let requested_sample_rate = requested_rate as u32;
24        
25        let supported_configs: Vec<_> = device.supported_configs()?.into_iter().collect();
26        
27        // First try to find exact match
28        if let Some(config) = supported_configs
29            .iter()
30            .find(|c| {
31                c.sample_rate().0 == requested_sample_rate
32                    && c.channels() == 2
33                    && c.sample_format() == SampleFormat::F32
34            }) {
35            return Ok(config.sample_rate().0 as f32);
36        }
37        
38        // Find best alternative
39        if let Some(config) = supported_configs
40            .iter()
41            .filter(|c| c.channels() == 2 && c.sample_format() == SampleFormat::F32)
42            .min_by_key(|c| {
43                let rate = c.sample_rate().0;
44                rate.abs_diff(requested_sample_rate)
45            }) {
46            return Ok(config.sample_rate().0 as f32);
47        }
48        
49        // Fallback to any F32 config
50        if let Some(config) = supported_configs
51            .iter()
52            .find(|c| c.sample_format() == SampleFormat::F32) {
53            return Ok(config.sample_rate().0 as f32);
54        }
55        
56        Err(anyhow::anyhow!("No suitable audio configuration found"))
57    }
58
59    pub fn play(mut runtime: Runtime) -> Result<Self> {
60        AtomicStreamState::verify_lock_free_atomics()?;
61        let device = default_output_device()?;
62        let sample_rate = runtime.sample_rate() as u32;
63        
64        // Find a supported configuration that matches our runtime's sample rate
65        let config = device
66            .supported_configs()?
67            .into_iter()
68            .find(|c| {
69                c.sample_rate().0 == sample_rate
70                    && c.channels() == 2
71                    && c.sample_format() == SampleFormat::F32
72            })
73            .ok_or_else(|| anyhow::anyhow!("No suitable config for {}Hz", sample_rate))?;
74            
75        let sample_format = config.sample_format();
76        let config = config.config();
77
78        let state = Arc::new(AtomicStreamState::new(StreamState::Stopped));
79        let error_flag = Arc::new(AtomicBool::new(false));
80        let state_clone = state.clone();
81        let error_flag_clone = error_flag.clone();
82        let error_flag_clone2 = error_flag.clone();
83        let mut adapter = BufferSizeAdapter::new(runtime.plan.block_size);
84
85        let stream = match sample_format {
86            SampleFormat::F32 => device.build_output_stream(
87                &config,
88                move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
89                    if data.len() > MAX_HOST_FRAMES {
90                        handle_process_error(data);
91                        return;
92                    }
93                    match state_clone.get_state() {
94                        StreamState::Running => {
95                            if adapter.fill_host_buffer(data, &mut runtime, 2).is_err() {
96                                error_flag_clone.store(true, Ordering::Relaxed);
97                                handle_process_error(data);
98                            }
99                        }
100                        _ => {
101                            data.fill(0.0);
102                        }
103                    }
104                },
105                move |_| {
106                    error_flag_clone2.store(true, Ordering::Relaxed);
107                },
108                None,
109            )?,
110            _ => return Err(anyhow::anyhow!("Unsupported sample format")),
111        };
112
113        Ok(Self {
114            stream: Some(stream),
115            state,
116            error_flag,
117        })
118    }
119
120    pub fn start(&self) -> Result<()> {
121        if let Some(stream) = &self.stream {
122            stream.play()?;
123            self.state.set_state(StreamState::Running);
124        }
125        Ok(())
126    }
127
128    pub fn stop(&self) {
129        if let Some(stream) = &self.stream {
130            let _ = stream.pause();
131        }
132        self.state.set_state(StreamState::Stopped);
133    }
134
135    pub fn has_error(&self) -> bool {
136        self.error_flag.load(Ordering::Relaxed)
137    }
138
139    pub fn clear_error(&self) {
140        self.error_flag.store(false, Ordering::Relaxed);
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use auxide::graph::{Graph, NodeType, PortId, Rate};
148    use auxide::plan::Plan;
149
150    #[test]
151    fn test_error_flag() {
152        // Test error flag functionality (without creating actual streams)
153        let _state = Arc::new(AtomicStreamState::new(StreamState::Stopped));
154        let error_flag = Arc::new(AtomicBool::new(false));
155        
156        // Simulate the error flag behavior
157        assert!(!error_flag.load(Ordering::Relaxed));
158        error_flag.store(true, Ordering::Relaxed);
159        assert!(error_flag.load(Ordering::Relaxed));
160        error_flag.store(false, Ordering::Relaxed);
161        assert!(!error_flag.load(Ordering::Relaxed));
162    }
163
164    #[test]
165    fn test_play_without_device() {
166        // Test that play fails gracefully when no audio device is available
167        let mut graph = Graph::new();
168        let osc = graph.add_node(NodeType::SineOsc { freq: 440.0 });
169        let sink = graph.add_node(NodeType::OutputSink);
170        graph
171            .add_edge(auxide::graph::Edge {
172                from_node: osc,
173                from_port: PortId(0),
174                to_node: sink,
175                to_port: PortId(0),
176                rate: Rate::Audio,
177            })
178            .unwrap();
179        let plan = Plan::compile(&graph, 64).unwrap();
180        let runtime = Runtime::new(plan, &graph, 44100.0);
181        
182        // This should fail in test environment (no audio device)
183        let result = StreamController::play(runtime);
184        assert!(result.is_err());
185    }
186
187    #[test]
188    fn test_controller_methods_with_no_stream() {
189        // Test methods on a controller with no stream
190        let controller = StreamController {
191            stream: None,
192            state: Arc::new(AtomicStreamState::new(StreamState::Stopped)),
193            error_flag: Arc::new(AtomicBool::new(false)),
194        };
195
196        // start should not change state since no stream
197        assert_eq!(controller.state.get_state(), StreamState::Stopped);
198        let _ = controller.start();
199        assert_eq!(controller.state.get_state(), StreamState::Stopped);
200
201        // stop should set state to Stopped
202        controller.state.set_state(StreamState::Running);
203        controller.stop();
204        assert_eq!(controller.state.get_state(), StreamState::Stopped);
205
206        // error flag tests
207        assert!(!controller.has_error());
208        controller.error_flag.store(true, Ordering::Relaxed);
209        assert!(controller.has_error());
210        controller.clear_error();
211        assert!(!controller.has_error());
212    }
213
214    #[test]
215    fn test_contract_stream_controller() {
216        // Contract test: ensure buffer size validation works correctly
217        let mut adapter = BufferSizeAdapter::new(64);
218        // Call with valid size
219        assert!(adapter.adapt_to_host_buffer(1024).is_ok());
220        // Call with oversized buffer - should fail
221        assert!(adapter.adapt_to_host_buffer(MAX_HOST_FRAMES + 1).is_err());
222    }
223}