xsynth_core/
buffered_renderer.rs1use std::{
2 collections::VecDeque,
3 io,
4 sync::{
5 atomic::{AtomicI64, AtomicUsize, Ordering},
6 Arc, RwLock,
7 },
8 thread::{self, JoinHandle},
9 time::{Duration, Instant},
10};
11
12use crossbeam_channel::{unbounded, Receiver};
13
14use crate::AudioStreamParams;
15
16use super::AudioPipe;
17
18#[derive(Debug, Clone)]
20struct BufferedRendererStats {
21 samples: Arc<AtomicI64>,
22
23 last_samples_after_read: Arc<AtomicI64>,
24
25 last_request_samples: Arc<AtomicI64>,
26
27 render_time: Arc<RwLock<VecDeque<f64>>>,
28
29 render_size: Arc<AtomicUsize>,
30}
31
32pub struct BufferedRendererStatsReader {
34 stats: BufferedRendererStats,
35}
36
37impl BufferedRendererStatsReader {
38 pub fn samples(&self) -> i64 {
41 self.stats.samples.load(Ordering::Relaxed)
42 }
43
44 pub fn last_samples_after_read(&self) -> i64 {
46 self.stats.last_samples_after_read.load(Ordering::Relaxed)
47 }
48
49 pub fn last_request_samples(&self) -> i64 {
51 self.stats.last_request_samples.load(Ordering::Relaxed)
52 }
53
54 pub fn render_size(&self) -> usize {
56 self.stats.render_size.load(Ordering::Relaxed)
57 }
58
59 pub fn average_renderer_load(&self) -> f64 {
62 let queue = self.stats.render_time.read().unwrap();
63 let total = queue.len();
64 if total == 0 {
65 0.0
66 } else {
67 queue.iter().sum::<f64>() / total as f64
68 }
69 }
70
71 pub fn last_renderer_load(&self) -> f64 {
74 let queue = self.stats.render_time.read().unwrap();
75 *queue.front().unwrap_or(&0.0)
76 }
77}
78
79pub struct BufferedRenderer {
87 stats: BufferedRendererStats,
88
89 receive: Receiver<Vec<f32>>,
91
92 remainder: Vec<f32>,
94
95 killed: Arc<RwLock<bool>>,
97
98 thread_handle: Option<JoinHandle<()>>,
100
101 stream_params: AudioStreamParams,
102}
103
104impl BufferedRenderer {
105 pub fn new<F: 'static + AudioPipe + Send>(
112 mut render: F,
113 stream_params: AudioStreamParams,
114 render_size: usize,
115 ) -> Result<Self, io::Error> {
116 let (tx, rx) = unbounded();
117
118 let samples = Arc::new(AtomicI64::new(0));
119 let last_request_samples = Arc::new(AtomicI64::new(0));
120 let render_size = Arc::new(AtomicUsize::new(render_size));
121
122 let last_samples_after_read = Arc::new(AtomicI64::new(0));
123
124 let render_time = Arc::new(RwLock::new(VecDeque::new()));
125
126 let killed = Arc::new(RwLock::new(false));
127
128 let thread_handle = {
129 let samples = samples.clone();
130 let last_request_samples = last_request_samples.clone();
131 let render_size = render_size.clone();
132 let render_time = render_time.clone();
133 let killed = killed.clone();
134 thread::Builder::new()
135 .name("xsynth_buffered_rendering".to_string())
136 .spawn(move || loop {
137 let size = render_size.load(Ordering::SeqCst);
138
139 let delay =
142 Duration::from_secs(1) * size as u32 / stream_params.sample_rate * 90 / 100;
143
144 loop {
146 let samples = samples.load(Ordering::SeqCst);
147 let last_requested = last_request_samples.load(Ordering::SeqCst);
148 if samples > last_requested * 110 / 100 {
149 spin_sleep::sleep(delay / 10);
150 } else {
151 break;
152 }
153
154 if *killed.read().unwrap() {
155 return;
156 }
157 }
158
159 let start = Instant::now();
160 let end = start + delay;
161
162 let mut vec =
164 vec![Default::default(); size * stream_params.channels.count() as usize];
165 render.read_samples(&mut vec);
166
167 samples.fetch_add(vec.len() as i64, Ordering::SeqCst);
169 match tx.send(vec) {
170 Ok(_) => {}
171 Err(_) => break,
172 };
173
174 {
176 let mut queue = render_time.write().unwrap();
177 let elaspsed = start.elapsed().as_secs_f64();
178 let total = delay.as_secs_f64();
179 queue.push_front(elaspsed / total);
180 if queue.len() > 100 {
181 queue.pop_back();
182 }
183 }
184
185 let now = Instant::now();
187 if end > now {
188 spin_sleep::sleep(end - now);
189 }
190 })?
191 };
192
193 Ok(Self {
194 stats: BufferedRendererStats {
195 samples,
196 last_request_samples,
197 render_time,
198 render_size,
199 last_samples_after_read,
200 },
201 receive: rx,
202 remainder: Vec::new(),
203 stream_params,
204 thread_handle: Some(thread_handle),
205 killed,
206 })
207 }
208
209 pub fn read(&mut self, dest: &mut [f32]) {
211 dest.fill(0.0);
212
213 let mut i: usize = 0;
214 let len = dest.len().min(self.remainder.len());
215 let samples = self
216 .stats
217 .samples
218 .fetch_sub(dest.len() as i64, Ordering::SeqCst);
219
220 self.stats
221 .last_request_samples
222 .store(dest.len() as i64, Ordering::SeqCst);
223
224 for r in self.remainder.drain(0..len) {
226 dest[i] = r;
227 i += 1;
228 }
229
230 while self.remainder.is_empty() {
232 let mut buf = self.receive.recv().unwrap();
233
234 let len = buf.len().min(dest.len() - i);
235 for r in buf.drain(0..len) {
236 dest[i] = r;
237 i += 1;
238 }
239
240 self.remainder = buf;
241 }
242
243 self.stats
244 .last_samples_after_read
245 .store(samples, Ordering::Relaxed);
246 }
247
248 pub fn set_render_size(&self, size: usize) {
250 self.stats.render_size.store(size, Ordering::SeqCst);
251 }
252
253 pub fn get_buffer_stats(&self) -> BufferedRendererStatsReader {
256 BufferedRendererStatsReader {
257 stats: self.stats.clone(),
258 }
259 }
260}
261
262impl Drop for BufferedRenderer {
263 fn drop(&mut self) {
264 *self.killed.write().unwrap() = true;
265 if let Some(handle) = self.thread_handle.take() {
266 if handle.join().is_err() {
267 eprintln!("xsynth-core: buffered renderer thread panicked during shutdown");
268 }
269 }
270 }
271}
272
273impl AudioPipe for BufferedRenderer {
274 fn stream_params(&self) -> &'_ AudioStreamParams {
275 &self.stream_params
276 }
277
278 fn read_samples_unchecked(&mut self, to: &mut [f32]) {
279 self.read(to)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use std::{
286 collections::VecDeque,
287 sync::{
288 atomic::{AtomicI64, AtomicUsize},
289 Arc, RwLock,
290 },
291 };
292
293 use super::{BufferedRendererStats, BufferedRendererStatsReader};
294
295 #[test]
296 fn average_renderer_load_is_zero_when_no_samples_have_been_rendered() {
297 let reader = BufferedRendererStatsReader {
298 stats: BufferedRendererStats {
299 samples: Arc::new(AtomicI64::new(0)),
300 last_samples_after_read: Arc::new(AtomicI64::new(0)),
301 last_request_samples: Arc::new(AtomicI64::new(0)),
302 render_time: Arc::new(RwLock::new(VecDeque::new())),
303 render_size: Arc::new(AtomicUsize::new(0)),
304 },
305 };
306
307 assert_eq!(reader.average_renderer_load(), 0.0);
308 assert_eq!(reader.last_renderer_load(), 0.0);
309 }
310}