auxide_io/
stream_controller.rs1use crate::buffer_size_adapter::{BufferSizeAdapter, MAX_HOST_FRAMES};
2use crate::device_management::default_output_device;
3use crate::device_management::DeviceExt;
4use crate::error_recovery::handle_process_error;
5use crate::stream_state::{AtomicStreamState, StreamState};
6use anyhow::Result;
7use auxide::rt::Runtime;
8use cpal::traits::{DeviceTrait, StreamTrait};
9use cpal::{SampleFormat, Stream};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13pub struct StreamController {
14 stream: Option<Stream>,
15 state: Arc<AtomicStreamState>,
16 error_flag: Arc<AtomicBool>,
17}
18
19impl StreamController {
20 pub fn get_best_sample_rate(requested_rate: f32) -> Result<f32> {
22 let device = default_output_device()?;
23 let requested_sample_rate = requested_rate as u32;
24
25 let supported_configs: Vec<_> = device.supported_configs()?.into_iter().collect();
26
27 if let Some(config) = supported_configs
29 .iter()
30 .find(|c| {
31 c.sample_rate().0 == requested_sample_rate
32 && c.channels() == 2
33 && c.sample_format() == SampleFormat::F32
34 }) {
35 return Ok(config.sample_rate().0 as f32);
36 }
37
38 if let Some(config) = supported_configs
40 .iter()
41 .filter(|c| c.channels() == 2 && c.sample_format() == SampleFormat::F32)
42 .min_by_key(|c| {
43 let rate = c.sample_rate().0;
44 rate.abs_diff(requested_sample_rate)
45 }) {
46 return Ok(config.sample_rate().0 as f32);
47 }
48
49 if let Some(config) = supported_configs
51 .iter()
52 .find(|c| c.sample_format() == SampleFormat::F32) {
53 return Ok(config.sample_rate().0 as f32);
54 }
55
56 Err(anyhow::anyhow!("No suitable audio configuration found"))
57 }
58
59 pub fn play(mut runtime: Runtime) -> Result<Self> {
60 AtomicStreamState::verify_lock_free_atomics()?;
61 let device = default_output_device()?;
62 let sample_rate = runtime.sample_rate() as u32;
63
64 let config = device
66 .supported_configs()?
67 .into_iter()
68 .find(|c| {
69 c.sample_rate().0 == sample_rate
70 && c.channels() == 2
71 && c.sample_format() == SampleFormat::F32
72 })
73 .ok_or_else(|| anyhow::anyhow!("No suitable config for {}Hz", sample_rate))?;
74
75 let sample_format = config.sample_format();
76 let config = config.config();
77
78 let state = Arc::new(AtomicStreamState::new(StreamState::Stopped));
79 let error_flag = Arc::new(AtomicBool::new(false));
80 let state_clone = state.clone();
81 let error_flag_clone = error_flag.clone();
82 let error_flag_clone2 = error_flag.clone();
83 let mut adapter = BufferSizeAdapter::new(runtime.plan.block_size);
84
85 let stream = match sample_format {
86 SampleFormat::F32 => device.build_output_stream(
87 &config,
88 move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
89 if data.len() > MAX_HOST_FRAMES {
90 handle_process_error(data);
91 return;
92 }
93 match state_clone.get_state() {
94 StreamState::Running => {
95 if adapter.fill_host_buffer(data, &mut runtime, 2).is_err() {
96 error_flag_clone.store(true, Ordering::Relaxed);
97 handle_process_error(data);
98 }
99 }
100 _ => {
101 data.fill(0.0);
102 }
103 }
104 },
105 move |_| {
106 error_flag_clone2.store(true, Ordering::Relaxed);
107 },
108 None,
109 )?,
110 _ => return Err(anyhow::anyhow!("Unsupported sample format")),
111 };
112
113 Ok(Self {
114 stream: Some(stream),
115 state,
116 error_flag,
117 })
118 }
119
120 pub fn start(&self) -> Result<()> {
121 if let Some(stream) = &self.stream {
122 stream.play()?;
123 self.state.set_state(StreamState::Running);
124 }
125 Ok(())
126 }
127
128 pub fn stop(&self) {
129 if let Some(stream) = &self.stream {
130 let _ = stream.pause();
131 }
132 self.state.set_state(StreamState::Stopped);
133 }
134
135 pub fn has_error(&self) -> bool {
136 self.error_flag.load(Ordering::Relaxed)
137 }
138
139 pub fn clear_error(&self) {
140 self.error_flag.store(false, Ordering::Relaxed);
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use auxide::graph::{Graph, NodeType, PortId, Rate};
148 use auxide::plan::Plan;
149
150 #[test]
151 fn test_error_flag() {
152 let _state = Arc::new(AtomicStreamState::new(StreamState::Stopped));
154 let error_flag = Arc::new(AtomicBool::new(false));
155
156 assert!(!error_flag.load(Ordering::Relaxed));
158 error_flag.store(true, Ordering::Relaxed);
159 assert!(error_flag.load(Ordering::Relaxed));
160 error_flag.store(false, Ordering::Relaxed);
161 assert!(!error_flag.load(Ordering::Relaxed));
162 }
163
164 #[test]
165 fn test_play_without_device() {
166 let mut graph = Graph::new();
168 let osc = graph.add_node(NodeType::SineOsc { freq: 440.0 });
169 let sink = graph.add_node(NodeType::OutputSink);
170 graph
171 .add_edge(auxide::graph::Edge {
172 from_node: osc,
173 from_port: PortId(0),
174 to_node: sink,
175 to_port: PortId(0),
176 rate: Rate::Audio,
177 })
178 .unwrap();
179 let plan = Plan::compile(&graph, 64).unwrap();
180 let runtime = Runtime::new(plan, &graph, 44100.0);
181
182 let result = StreamController::play(runtime);
184 assert!(result.is_err());
185 }
186
187 #[test]
188 fn test_controller_methods_with_no_stream() {
189 let controller = StreamController {
191 stream: None,
192 state: Arc::new(AtomicStreamState::new(StreamState::Stopped)),
193 error_flag: Arc::new(AtomicBool::new(false)),
194 };
195
196 assert_eq!(controller.state.get_state(), StreamState::Stopped);
198 let _ = controller.start();
199 assert_eq!(controller.state.get_state(), StreamState::Stopped);
200
201 controller.state.set_state(StreamState::Running);
203 controller.stop();
204 assert_eq!(controller.state.get_state(), StreamState::Stopped);
205
206 assert!(!controller.has_error());
208 controller.error_flag.store(true, Ordering::Relaxed);
209 assert!(controller.has_error());
210 controller.clear_error();
211 assert!(!controller.has_error());
212 }
213
214 #[test]
215 fn test_contract_stream_controller() {
216 let mut adapter = BufferSizeAdapter::new(64);
218 assert!(adapter.adapt_to_host_buffer(1024).is_ok());
220 assert!(adapter.adapt_to_host_buffer(MAX_HOST_FRAMES + 1).is_err());
222 }
223}