Skip to main content

xsynth_core/
buffered_renderer.rs

1use std::{
2    collections::VecDeque,
3    io,
4    sync::{
5        atomic::{AtomicI64, AtomicUsize, Ordering},
6        Arc, RwLock,
7    },
8    thread::{self, JoinHandle},
9    time::{Duration, Instant},
10};
11
12use crossbeam_channel::{unbounded, Receiver};
13
14use crate::AudioStreamParams;
15
16use super::AudioPipe;
17
18/// Holds the statistics for an instance of BufferedRenderer.
19#[derive(Debug, Clone)]
20struct BufferedRendererStats {
21    samples: Arc<AtomicI64>,
22
23    last_samples_after_read: Arc<AtomicI64>,
24
25    last_request_samples: Arc<AtomicI64>,
26
27    render_time: Arc<RwLock<VecDeque<f64>>>,
28
29    render_size: Arc<AtomicUsize>,
30}
31
32/// Reads the statistics of an instance of BufferedRenderer in a usable way.
33pub struct BufferedRendererStatsReader {
34    stats: BufferedRendererStats,
35}
36
37impl BufferedRendererStatsReader {
38    /// The number of samples currently buffered.
39    /// Can be negative if the reader is waiting for more samples.
40    pub fn samples(&self) -> i64 {
41        self.stats.samples.load(Ordering::Relaxed)
42    }
43
44    /// The number of samples that were in the buffer after the last read.
45    pub fn last_samples_after_read(&self) -> i64 {
46        self.stats.last_samples_after_read.load(Ordering::Relaxed)
47    }
48
49    /// The last number of samples last requested by the read command.
50    pub fn last_request_samples(&self) -> i64 {
51        self.stats.last_request_samples.load(Ordering::Relaxed)
52    }
53
54    /// The number of samples to render each iteration.
55    pub fn render_size(&self) -> usize {
56        self.stats.render_size.load(Ordering::Relaxed)
57    }
58
59    /// The average render time percentages (0 to 1)
60    /// of how long the render thread spent rendering, from the max allowed time.
61    pub fn average_renderer_load(&self) -> f64 {
62        let queue = self.stats.render_time.read().unwrap();
63        let total = queue.len();
64        if total == 0 {
65            0.0
66        } else {
67            queue.iter().sum::<f64>() / total as f64
68        }
69    }
70
71    /// The last render time percentage (0 to 1)
72    /// of how long the render thread spent rendering, from the max allowed time.
73    pub fn last_renderer_load(&self) -> f64 {
74        let queue = self.stats.render_time.read().unwrap();
75        *queue.front().unwrap_or(&0.0)
76    }
77}
78
79/// The helper struct for deferred sample rendering.
80/// Helps avoid stutter when the render time is exceding the max time allowed by the audio driver.
81///
82/// Instead, it renders in a separate thread with much smaller sample sizes, causing a minimal impact on latency
83/// while allowing more time to render per sample.
84///
85/// Designed to be used in realtime playback only.
86pub struct BufferedRenderer {
87    stats: BufferedRendererStats,
88
89    /// The receiver for samples (the render thread has the sender).
90    receive: Receiver<Vec<f32>>,
91
92    /// Remainder of samples from the last received samples vec.
93    remainder: Vec<f32>,
94
95    /// Whether the render thread should be killed.
96    killed: Arc<RwLock<bool>>,
97
98    /// The thread handle to wait for at the end.
99    thread_handle: Option<JoinHandle<()>>,
100
101    stream_params: AudioStreamParams,
102}
103
104impl BufferedRenderer {
105    /// Creates a new instance of BufferedRenderer.
106    ///
107    /// - `render`: An object implementing the AudioPipe struct for BufferedRenderer to
108    ///   read samples from
109    /// - `stream_params`: Parameters of the output audio
110    /// - `render_size`: The number of samples to render each iteration
111    pub fn new<F: 'static + AudioPipe + Send>(
112        mut render: F,
113        stream_params: AudioStreamParams,
114        render_size: usize,
115    ) -> Result<Self, io::Error> {
116        let (tx, rx) = unbounded();
117
118        let samples = Arc::new(AtomicI64::new(0));
119        let last_request_samples = Arc::new(AtomicI64::new(0));
120        let render_size = Arc::new(AtomicUsize::new(render_size));
121
122        let last_samples_after_read = Arc::new(AtomicI64::new(0));
123
124        let render_time = Arc::new(RwLock::new(VecDeque::new()));
125
126        let killed = Arc::new(RwLock::new(false));
127
128        let thread_handle = {
129            let samples = samples.clone();
130            let last_request_samples = last_request_samples.clone();
131            let render_size = render_size.clone();
132            let render_time = render_time.clone();
133            let killed = killed.clone();
134            thread::Builder::new()
135                .name("xsynth_buffered_rendering".to_string())
136                .spawn(move || loop {
137                    let size = render_size.load(Ordering::SeqCst);
138
139                    // The expected render time per iteration. It is slightly smaller (*90/100) than
140                    // the real time so the render thread can catch up if it's behind.
141                    let delay =
142                        Duration::from_secs(1) * size as u32 / stream_params.sample_rate * 90 / 100;
143
144                    // If the render thread is ahead by over ~10%, wait until more samples are required.
145                    loop {
146                        let samples = samples.load(Ordering::SeqCst);
147                        let last_requested = last_request_samples.load(Ordering::SeqCst);
148                        if samples > last_requested * 110 / 100 {
149                            spin_sleep::sleep(delay / 10);
150                        } else {
151                            break;
152                        }
153
154                        if *killed.read().unwrap() {
155                            return;
156                        }
157                    }
158
159                    let start = Instant::now();
160                    let end = start + delay;
161
162                    // Create the vec and write the samples
163                    let mut vec =
164                        vec![Default::default(); size * stream_params.channels.count() as usize];
165                    render.read_samples(&mut vec);
166
167                    // Send the samples, break if the pipe is broken
168                    samples.fetch_add(vec.len() as i64, Ordering::SeqCst);
169                    match tx.send(vec) {
170                        Ok(_) => {}
171                        Err(_) => break,
172                    };
173
174                    // Write the elapsed render time percentage to the render_time queue
175                    {
176                        let mut queue = render_time.write().unwrap();
177                        let elaspsed = start.elapsed().as_secs_f64();
178                        let total = delay.as_secs_f64();
179                        queue.push_front(elaspsed / total);
180                        if queue.len() > 100 {
181                            queue.pop_back();
182                        }
183                    }
184
185                    // Sleep until the next iteration
186                    let now = Instant::now();
187                    if end > now {
188                        spin_sleep::sleep(end - now);
189                    }
190                })?
191        };
192
193        Ok(Self {
194            stats: BufferedRendererStats {
195                samples,
196                last_request_samples,
197                render_time,
198                render_size,
199                last_samples_after_read,
200            },
201            receive: rx,
202            remainder: Vec::new(),
203            stream_params,
204            thread_handle: Some(thread_handle),
205            killed,
206        })
207    }
208
209    /// Reads samples from the remainder and the output queue into the destination array.
210    pub fn read(&mut self, dest: &mut [f32]) {
211        dest.fill(0.0);
212
213        let mut i: usize = 0;
214        let len = dest.len().min(self.remainder.len());
215        let samples = self
216            .stats
217            .samples
218            .fetch_sub(dest.len() as i64, Ordering::SeqCst);
219
220        self.stats
221            .last_request_samples
222            .store(dest.len() as i64, Ordering::SeqCst);
223
224        // Read from current remainder
225        for r in self.remainder.drain(0..len) {
226            dest[i] = r;
227            i += 1;
228        }
229
230        // Read from output queue, leave the remainder if there is any
231        while self.remainder.is_empty() {
232            let mut buf = self.receive.recv().unwrap();
233
234            let len = buf.len().min(dest.len() - i);
235            for r in buf.drain(0..len) {
236                dest[i] = r;
237                i += 1;
238            }
239
240            self.remainder = buf;
241        }
242
243        self.stats
244            .last_samples_after_read
245            .store(samples, Ordering::Relaxed);
246    }
247
248    /// Sets the number of samples that should be rendered each iteration.
249    pub fn set_render_size(&self, size: usize) {
250        self.stats.render_size.store(size, Ordering::SeqCst);
251    }
252
253    /// Returns a statistics reader.
254    /// See the `BufferedRendererStatsReader` documentation for more information.
255    pub fn get_buffer_stats(&self) -> BufferedRendererStatsReader {
256        BufferedRendererStatsReader {
257            stats: self.stats.clone(),
258        }
259    }
260}
261
262impl Drop for BufferedRenderer {
263    fn drop(&mut self) {
264        *self.killed.write().unwrap() = true;
265        if let Some(handle) = self.thread_handle.take() {
266            if handle.join().is_err() {
267                eprintln!("xsynth-core: buffered renderer thread panicked during shutdown");
268            }
269        }
270    }
271}
272
273impl AudioPipe for BufferedRenderer {
274    fn stream_params(&self) -> &'_ AudioStreamParams {
275        &self.stream_params
276    }
277
278    fn read_samples_unchecked(&mut self, to: &mut [f32]) {
279        self.read(to)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use std::{
286        collections::VecDeque,
287        sync::{
288            atomic::{AtomicI64, AtomicUsize},
289            Arc, RwLock,
290        },
291    };
292
293    use super::{BufferedRendererStats, BufferedRendererStatsReader};
294
295    #[test]
296    fn average_renderer_load_is_zero_when_no_samples_have_been_rendered() {
297        let reader = BufferedRendererStatsReader {
298            stats: BufferedRendererStats {
299                samples: Arc::new(AtomicI64::new(0)),
300                last_samples_after_read: Arc::new(AtomicI64::new(0)),
301                last_request_samples: Arc::new(AtomicI64::new(0)),
302                render_time: Arc::new(RwLock::new(VecDeque::new())),
303                render_size: Arc::new(AtomicUsize::new(0)),
304            },
305        };
306
307        assert_eq!(reader.average_renderer_load(), 0.0);
308        assert_eq!(reader.last_renderer_load(), 0.0);
309    }
310}