geng_rodio/source/
buffered.rs

1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use crate::{Sample, Source};
7
8/// Internal function that builds a `Buffered` object.
9#[inline]
10pub fn buffered<I>(input: I) -> Buffered<I>
11where
12    I: Source,
13    I::Item: Sample,
14{
15    let total_duration = input.total_duration();
16    let first_frame = extract(input);
17
18    Buffered {
19        current_frame: first_frame,
20        position_in_frame: 0,
21        total_duration,
22    }
23}
24
25/// Iterator that at the same time extracts data from the iterator and stores it in a buffer.
26pub struct Buffered<I>
27where
28    I: Source,
29    I::Item: Sample,
30{
31    /// Immutable reference to the next frame of data. Cannot be `Frame::Input`.
32    current_frame: Arc<Frame<I>>,
33
34    /// The position in number of samples of this iterator inside `current_frame`.
35    position_in_frame: usize,
36
37    /// Obtained once at creation and never modified again.
38    total_duration: Option<Duration>,
39}
40
41enum Frame<I>
42where
43    I: Source,
44    I::Item: Sample,
45{
46    /// Data that has already been extracted from the iterator. Also contains a pointer to the
47    /// next frame.
48    Data(FrameData<I>),
49
50    /// No more data.
51    End,
52
53    /// Unextracted data. The `Option` should never be `None` and is only here for easier data
54    /// processing.
55    Input(Mutex<Option<I>>),
56}
57
58struct FrameData<I>
59where
60    I: Source,
61    I::Item: Sample,
62{
63    data: Vec<I::Item>,
64    channels: u16,
65    rate: u32,
66    next: Mutex<Arc<Frame<I>>>,
67}
68
69impl<I> Drop for FrameData<I>
70where
71    I: Source,
72    I::Item: Sample,
73{
74    fn drop(&mut self) {
75        // This is necessary to prevent stack overflows deallocating long chains of the mutually
76        // recursive `Frame` and `FrameData` types. This iteratively traverses as much of the
77        // chain as needs to be deallocated, and repeatedly "pops" the head off the list. This
78        // solves the problem, as when the time comes to actually deallocate the `FrameData`,
79        // the `next` field will contain a `Frame::End`, or an `Arc` with additional references,
80        // so the depth of recursive drops will be bounded.
81        while let Ok(arc_next) = self.next.get_mut() {
82            if let Some(next_ref) = Arc::get_mut(arc_next) {
83                // This allows us to own the next Frame.
84                let next = mem::replace(next_ref, Frame::End);
85                if let Frame::Data(next_data) = next {
86                    // Swap the current FrameData with the next one, allowing the current one
87                    // to go out of scope.
88                    *self = next_data;
89                } else {
90                    break;
91                }
92            } else {
93                break;
94            }
95        }
96    }
97}
98
99/// Builds a frame from the input iterator.
100fn extract<I>(mut input: I) -> Arc<Frame<I>>
101where
102    I: Source,
103    I::Item: Sample,
104{
105    let frame_len = input.current_frame_len();
106
107    if frame_len == Some(0) {
108        return Arc::new(Frame::End);
109    }
110
111    let channels = input.channels();
112    let rate = input.sample_rate();
113    let data: Vec<I::Item> = input
114        .by_ref()
115        .take(cmp::min(frame_len.unwrap_or(32768), 32768))
116        .collect();
117
118    if data.is_empty() {
119        return Arc::new(Frame::End);
120    }
121
122    Arc::new(Frame::Data(FrameData {
123        data,
124        channels,
125        rate,
126        next: Mutex::new(Arc::new(Frame::Input(Mutex::new(Some(input))))),
127    }))
128}
129
130impl<I> Buffered<I>
131where
132    I: Source,
133    I::Item: Sample,
134{
135    /// Advances to the next frame.
136    fn next_frame(&mut self) {
137        let next_frame = {
138            let mut next_frame_ptr = match &*self.current_frame {
139                Frame::Data(FrameData { next, .. }) => next.lock().unwrap(),
140                _ => unreachable!(),
141            };
142
143            let next_frame = match &**next_frame_ptr {
144                Frame::Data(_) => next_frame_ptr.clone(),
145                Frame::End => next_frame_ptr.clone(),
146                Frame::Input(input) => {
147                    let input = input.lock().unwrap().take().unwrap();
148                    extract(input)
149                }
150            };
151
152            *next_frame_ptr = next_frame.clone();
153            next_frame
154        };
155
156        self.current_frame = next_frame;
157        self.position_in_frame = 0;
158    }
159}
160
161impl<I> Iterator for Buffered<I>
162where
163    I: Source,
164    I::Item: Sample,
165{
166    type Item = I::Item;
167
168    #[inline]
169    fn next(&mut self) -> Option<I::Item> {
170        let current_sample;
171        let advance_frame;
172
173        match &*self.current_frame {
174            Frame::Data(FrameData { data, .. }) => {
175                current_sample = Some(data[self.position_in_frame]);
176                self.position_in_frame += 1;
177                advance_frame = self.position_in_frame >= data.len();
178            }
179
180            Frame::End => {
181                current_sample = None;
182                advance_frame = false;
183            }
184
185            Frame::Input(_) => unreachable!(),
186        };
187
188        if advance_frame {
189            self.next_frame();
190        }
191
192        current_sample
193    }
194
195    #[inline]
196    fn size_hint(&self) -> (usize, Option<usize>) {
197        // TODO:
198        (0, None)
199    }
200}
201
202// TODO: uncomment when `size_hint` is fixed
203/*impl<I> ExactSizeIterator for Amplify<I> where I: Source + ExactSizeIterator, I::Item: Sample {
204}*/
205
206impl<I> Source for Buffered<I>
207where
208    I: Source,
209    I::Item: Sample,
210{
211    #[inline]
212    fn current_frame_len(&self) -> Option<usize> {
213        match &*self.current_frame {
214            Frame::Data(FrameData { data, .. }) => Some(data.len() - self.position_in_frame),
215            Frame::End => Some(0),
216            Frame::Input(_) => unreachable!(),
217        }
218    }
219
220    #[inline]
221    fn channels(&self) -> u16 {
222        match *self.current_frame {
223            Frame::Data(FrameData { channels, .. }) => channels,
224            Frame::End => 1,
225            Frame::Input(_) => unreachable!(),
226        }
227    }
228
229    #[inline]
230    fn sample_rate(&self) -> u32 {
231        match *self.current_frame {
232            Frame::Data(FrameData { rate, .. }) => rate,
233            Frame::End => 44100,
234            Frame::Input(_) => unreachable!(),
235        }
236    }
237
238    #[inline]
239    fn total_duration(&self) -> Option<Duration> {
240        self.total_duration
241    }
242}
243
244impl<I> Clone for Buffered<I>
245where
246    I: Source,
247    I::Item: Sample,
248{
249    #[inline]
250    fn clone(&self) -> Buffered<I> {
251        Buffered {
252            current_frame: self.current_frame.clone(),
253            position_in_frame: self.position_in_frame,
254            total_duration: self.total_duration,
255        }
256    }
257}