xsynth_core/
buffered_renderer.rs1use std::{
2 collections::VecDeque,
3 sync::{
4 atomic::{AtomicI64, AtomicUsize, Ordering},
5 Arc, RwLock,
6 },
7 thread::{self, JoinHandle},
8 time::{Duration, Instant},
9};
10
11use crossbeam_channel::{unbounded, Receiver};
12
13use crate::AudioStreamParams;
14
15use super::AudioPipe;
16
17#[derive(Debug, Clone)]
19struct BufferedRendererStats {
20 samples: Arc<AtomicI64>,
21
22 last_samples_after_read: Arc<AtomicI64>,
23
24 last_request_samples: Arc<AtomicI64>,
25
26 render_time: Arc<RwLock<VecDeque<f64>>>,
27
28 render_size: Arc<AtomicUsize>,
29}
30
31pub struct BufferedRendererStatsReader {
33 stats: BufferedRendererStats,
34}
35
36impl BufferedRendererStatsReader {
37 pub fn samples(&self) -> i64 {
40 self.stats.samples.load(Ordering::Relaxed)
41 }
42
43 pub fn last_samples_after_read(&self) -> i64 {
45 self.stats.last_samples_after_read.load(Ordering::Relaxed)
46 }
47
48 pub fn last_request_samples(&self) -> i64 {
50 self.stats.last_request_samples.load(Ordering::Relaxed)
51 }
52
53 pub fn render_size(&self) -> usize {
55 self.stats.render_size.load(Ordering::Relaxed)
56 }
57
58 pub fn average_renderer_load(&self) -> f64 {
61 let queue = self.stats.render_time.read().unwrap();
62 let total = queue.len();
63 queue.iter().sum::<f64>() / total as f64
64 }
65
66 pub fn last_renderer_load(&self) -> f64 {
69 let queue = self.stats.render_time.read().unwrap();
70 *queue.front().unwrap_or(&0.0)
71 }
72}
73
74pub struct BufferedRenderer {
82 stats: BufferedRendererStats,
83
84 receive: Receiver<Vec<f32>>,
86
87 remainder: Vec<f32>,
89
90 killed: Arc<RwLock<bool>>,
92
93 thread_handle: Option<JoinHandle<()>>,
95
96 stream_params: AudioStreamParams,
97}
98
99impl BufferedRenderer {
100 pub fn new<F: 'static + AudioPipe + Send>(
107 mut render: F,
108 stream_params: AudioStreamParams,
109 render_size: usize,
110 ) -> Self {
111 let (tx, rx) = unbounded();
112
113 let samples = Arc::new(AtomicI64::new(0));
114 let last_request_samples = Arc::new(AtomicI64::new(0));
115 let render_size = Arc::new(AtomicUsize::new(render_size));
116
117 let last_samples_after_read = Arc::new(AtomicI64::new(0));
118
119 let render_time = Arc::new(RwLock::new(VecDeque::new()));
120
121 let killed = Arc::new(RwLock::new(false));
122
123 let thread_handle = {
124 let samples = samples.clone();
125 let last_request_samples = last_request_samples.clone();
126 let render_size = render_size.clone();
127 let render_time = render_time.clone();
128 let killed = killed.clone();
129 thread::Builder::new()
130 .name("xsynth_buffered_rendering".to_string())
131 .spawn(move || loop {
132 let size = render_size.load(Ordering::SeqCst);
133
134 let delay =
137 Duration::from_secs(1) * size as u32 / stream_params.sample_rate * 90 / 100;
138
139 loop {
141 let samples = samples.load(Ordering::SeqCst);
142 let last_requested = last_request_samples.load(Ordering::SeqCst);
143 if samples > last_requested * 110 / 100 {
144 spin_sleep::sleep(delay / 10);
145 } else {
146 break;
147 }
148
149 if *killed.read().unwrap() {
150 return;
151 }
152 }
153
154 let start = Instant::now();
155 let end = start + delay;
156
157 let mut vec =
159 vec![Default::default(); size * stream_params.channels.count() as usize];
160 render.read_samples(&mut vec);
161
162 samples.fetch_add(vec.len() as i64, Ordering::SeqCst);
164 match tx.send(vec) {
165 Ok(_) => {}
166 Err(_) => break,
167 };
168
169 {
171 let mut queue = render_time.write().unwrap();
172 let elaspsed = start.elapsed().as_secs_f64();
173 let total = delay.as_secs_f64();
174 queue.push_front(elaspsed / total);
175 if queue.len() > 100 {
176 queue.pop_back();
177 }
178 }
179
180 let now = Instant::now();
182 if end > now {
183 spin_sleep::sleep(end - now);
184 }
185 })
186 .unwrap()
187 };
188
189 Self {
190 stats: BufferedRendererStats {
191 samples,
192 last_request_samples,
193 render_time,
194 render_size,
195 last_samples_after_read,
196 },
197 receive: rx,
198 remainder: Vec::new(),
199 stream_params,
200 thread_handle: Some(thread_handle),
201 killed,
202 }
203 }
204
205 pub fn read(&mut self, dest: &mut [f32]) {
207 dest.fill(0.0);
208
209 let mut i: usize = 0;
210 let len = dest.len().min(self.remainder.len());
211 let samples = self
212 .stats
213 .samples
214 .fetch_sub(dest.len() as i64, Ordering::SeqCst);
215
216 self.stats
217 .last_request_samples
218 .store(dest.len() as i64, Ordering::SeqCst);
219
220 for r in self.remainder.drain(0..len) {
222 dest[i] = r;
223 i += 1;
224 }
225
226 while self.remainder.is_empty() {
228 let mut buf = self.receive.recv().unwrap();
229
230 let len = buf.len().min(dest.len() - i);
231 for r in buf.drain(0..len) {
232 dest[i] = r;
233 i += 1;
234 }
235
236 self.remainder = buf;
237 }
238
239 self.stats
240 .last_samples_after_read
241 .store(samples, Ordering::Relaxed);
242 }
243
244 pub fn set_render_size(&self, size: usize) {
246 self.stats.render_size.store(size, Ordering::SeqCst);
247 }
248
249 pub fn get_buffer_stats(&self) -> BufferedRendererStatsReader {
252 BufferedRendererStatsReader {
253 stats: self.stats.clone(),
254 }
255 }
256}
257
258impl Drop for BufferedRenderer {
259 fn drop(&mut self) {
260 *self.killed.write().unwrap() = true;
261 self.thread_handle.take().unwrap().join().unwrap();
262 }
263}
264
265impl AudioPipe for BufferedRenderer {
266 fn stream_params(&self) -> &'_ AudioStreamParams {
267 &self.stream_params
268 }
269
270 fn read_samples_unchecked(&mut self, to: &mut [f32]) {
271 self.read(to)
272 }
273}