xsynth_core/
buffered_renderer.rs

1use std::{
2    collections::VecDeque,
3    sync::{
4        atomic::{AtomicI64, AtomicUsize, Ordering},
5        Arc, RwLock,
6    },
7    thread::{self, JoinHandle},
8    time::{Duration, Instant},
9};
10
11use crossbeam_channel::{unbounded, Receiver};
12
13use crate::AudioStreamParams;
14
15use super::AudioPipe;
16
17/// Holds the statistics for an instance of BufferedRenderer.
18#[derive(Debug, Clone)]
19struct BufferedRendererStats {
20    samples: Arc<AtomicI64>,
21
22    last_samples_after_read: Arc<AtomicI64>,
23
24    last_request_samples: Arc<AtomicI64>,
25
26    render_time: Arc<RwLock<VecDeque<f64>>>,
27
28    render_size: Arc<AtomicUsize>,
29}
30
31/// Reads the statistics of an instance of BufferedRenderer in a usable way.
32pub struct BufferedRendererStatsReader {
33    stats: BufferedRendererStats,
34}
35
36impl BufferedRendererStatsReader {
37    /// The number of samples currently buffered.
38    /// Can be negative if the reader is waiting for more samples.
39    pub fn samples(&self) -> i64 {
40        self.stats.samples.load(Ordering::Relaxed)
41    }
42
43    /// The number of samples that were in the buffer after the last read.
44    pub fn last_samples_after_read(&self) -> i64 {
45        self.stats.last_samples_after_read.load(Ordering::Relaxed)
46    }
47
48    /// The last number of samples last requested by the read command.
49    pub fn last_request_samples(&self) -> i64 {
50        self.stats.last_request_samples.load(Ordering::Relaxed)
51    }
52
53    /// The number of samples to render each iteration.
54    pub fn render_size(&self) -> usize {
55        self.stats.render_size.load(Ordering::Relaxed)
56    }
57
58    /// The average render time percentages (0 to 1)
59    /// of how long the render thread spent rendering, from the max allowed time.
60    pub fn average_renderer_load(&self) -> f64 {
61        let queue = self.stats.render_time.read().unwrap();
62        let total = queue.len();
63        queue.iter().sum::<f64>() / total as f64
64    }
65
66    /// The last render time percentage (0 to 1)
67    /// of how long the render thread spent rendering, from the max allowed time.
68    pub fn last_renderer_load(&self) -> f64 {
69        let queue = self.stats.render_time.read().unwrap();
70        *queue.front().unwrap_or(&0.0)
71    }
72}
73
74/// The helper struct for deferred sample rendering.
75/// Helps avoid stutter when the render time is exceding the max time allowed by the audio driver.
76///
77/// Instead, it renders in a separate thread with much smaller sample sizes, causing a minimal impact on latency
78/// while allowing more time to render per sample.
79///
80/// Designed to be used in realtime playback only.
81pub struct BufferedRenderer {
82    stats: BufferedRendererStats,
83
84    /// The receiver for samples (the render thread has the sender).
85    receive: Receiver<Vec<f32>>,
86
87    /// Remainder of samples from the last received samples vec.
88    remainder: Vec<f32>,
89
90    /// Whether the render thread should be killed.
91    killed: Arc<RwLock<bool>>,
92
93    /// The thread handle to wait for at the end.
94    thread_handle: Option<JoinHandle<()>>,
95
96    stream_params: AudioStreamParams,
97}
98
99impl BufferedRenderer {
100    /// Creates a new instance of BufferedRenderer.
101    ///
102    /// - `render`: An object implementing the AudioPipe struct for BufferedRenderer to
103    ///   read samples from
104    /// - `stream_params`: Parameters of the output audio
105    /// - `render_size`: The number of samples to render each iteration
106    pub fn new<F: 'static + AudioPipe + Send>(
107        mut render: F,
108        stream_params: AudioStreamParams,
109        render_size: usize,
110    ) -> Self {
111        let (tx, rx) = unbounded();
112
113        let samples = Arc::new(AtomicI64::new(0));
114        let last_request_samples = Arc::new(AtomicI64::new(0));
115        let render_size = Arc::new(AtomicUsize::new(render_size));
116
117        let last_samples_after_read = Arc::new(AtomicI64::new(0));
118
119        let render_time = Arc::new(RwLock::new(VecDeque::new()));
120
121        let killed = Arc::new(RwLock::new(false));
122
123        let thread_handle = {
124            let samples = samples.clone();
125            let last_request_samples = last_request_samples.clone();
126            let render_size = render_size.clone();
127            let render_time = render_time.clone();
128            let killed = killed.clone();
129            thread::Builder::new()
130                .name("xsynth_buffered_rendering".to_string())
131                .spawn(move || loop {
132                    let size = render_size.load(Ordering::SeqCst);
133
134                    // The expected render time per iteration. It is slightly smaller (*90/100) than
135                    // the real time so the render thread can catch up if it's behind.
136                    let delay =
137                        Duration::from_secs(1) * size as u32 / stream_params.sample_rate * 90 / 100;
138
139                    // If the render thread is ahead by over ~10%, wait until more samples are required.
140                    loop {
141                        let samples = samples.load(Ordering::SeqCst);
142                        let last_requested = last_request_samples.load(Ordering::SeqCst);
143                        if samples > last_requested * 110 / 100 {
144                            spin_sleep::sleep(delay / 10);
145                        } else {
146                            break;
147                        }
148
149                        if *killed.read().unwrap() {
150                            return;
151                        }
152                    }
153
154                    let start = Instant::now();
155                    let end = start + delay;
156
157                    // Create the vec and write the samples
158                    let mut vec =
159                        vec![Default::default(); size * stream_params.channels.count() as usize];
160                    render.read_samples(&mut vec);
161
162                    // Send the samples, break if the pipe is broken
163                    samples.fetch_add(vec.len() as i64, Ordering::SeqCst);
164                    match tx.send(vec) {
165                        Ok(_) => {}
166                        Err(_) => break,
167                    };
168
169                    // Write the elapsed render time percentage to the render_time queue
170                    {
171                        let mut queue = render_time.write().unwrap();
172                        let elaspsed = start.elapsed().as_secs_f64();
173                        let total = delay.as_secs_f64();
174                        queue.push_front(elaspsed / total);
175                        if queue.len() > 100 {
176                            queue.pop_back();
177                        }
178                    }
179
180                    // Sleep until the next iteration
181                    let now = Instant::now();
182                    if end > now {
183                        spin_sleep::sleep(end - now);
184                    }
185                })
186                .unwrap()
187        };
188
189        Self {
190            stats: BufferedRendererStats {
191                samples,
192                last_request_samples,
193                render_time,
194                render_size,
195                last_samples_after_read,
196            },
197            receive: rx,
198            remainder: Vec::new(),
199            stream_params,
200            thread_handle: Some(thread_handle),
201            killed,
202        }
203    }
204
205    /// Reads samples from the remainder and the output queue into the destination array.
206    pub fn read(&mut self, dest: &mut [f32]) {
207        dest.fill(0.0);
208
209        let mut i: usize = 0;
210        let len = dest.len().min(self.remainder.len());
211        let samples = self
212            .stats
213            .samples
214            .fetch_sub(dest.len() as i64, Ordering::SeqCst);
215
216        self.stats
217            .last_request_samples
218            .store(dest.len() as i64, Ordering::SeqCst);
219
220        // Read from current remainder
221        for r in self.remainder.drain(0..len) {
222            dest[i] = r;
223            i += 1;
224        }
225
226        // Read from output queue, leave the remainder if there is any
227        while self.remainder.is_empty() {
228            let mut buf = self.receive.recv().unwrap();
229
230            let len = buf.len().min(dest.len() - i);
231            for r in buf.drain(0..len) {
232                dest[i] = r;
233                i += 1;
234            }
235
236            self.remainder = buf;
237        }
238
239        self.stats
240            .last_samples_after_read
241            .store(samples, Ordering::Relaxed);
242    }
243
244    /// Sets the number of samples that should be rendered each iteration.
245    pub fn set_render_size(&self, size: usize) {
246        self.stats.render_size.store(size, Ordering::SeqCst);
247    }
248
249    /// Returns a statistics reader.
250    /// See the `BufferedRendererStatsReader` documentation for more information.
251    pub fn get_buffer_stats(&self) -> BufferedRendererStatsReader {
252        BufferedRendererStatsReader {
253            stats: self.stats.clone(),
254        }
255    }
256}
257
258impl Drop for BufferedRenderer {
259    fn drop(&mut self) {
260        *self.killed.write().unwrap() = true;
261        self.thread_handle.take().unwrap().join().unwrap();
262    }
263}
264
265impl AudioPipe for BufferedRenderer {
266    fn stream_params(&self) -> &'_ AudioStreamParams {
267        &self.stream_params
268    }
269
270    fn read_samples_unchecked(&mut self, to: &mut [f32]) {
271        self.read(to)
272    }
273}