geng_rodio/source/
buffered.rs1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use crate::{Sample, Source};
7
8#[inline]
10pub fn buffered<I>(input: I) -> Buffered<I>
11where
12 I: Source,
13 I::Item: Sample,
14{
15 let total_duration = input.total_duration();
16 let first_frame = extract(input);
17
18 Buffered {
19 current_frame: first_frame,
20 position_in_frame: 0,
21 total_duration,
22 }
23}
24
25pub struct Buffered<I>
27where
28 I: Source,
29 I::Item: Sample,
30{
31 current_frame: Arc<Frame<I>>,
33
34 position_in_frame: usize,
36
37 total_duration: Option<Duration>,
39}
40
41enum Frame<I>
42where
43 I: Source,
44 I::Item: Sample,
45{
46 Data(FrameData<I>),
49
50 End,
52
53 Input(Mutex<Option<I>>),
56}
57
58struct FrameData<I>
59where
60 I: Source,
61 I::Item: Sample,
62{
63 data: Vec<I::Item>,
64 channels: u16,
65 rate: u32,
66 next: Mutex<Arc<Frame<I>>>,
67}
68
69impl<I> Drop for FrameData<I>
70where
71 I: Source,
72 I::Item: Sample,
73{
74 fn drop(&mut self) {
75 while let Ok(arc_next) = self.next.get_mut() {
82 if let Some(next_ref) = Arc::get_mut(arc_next) {
83 let next = mem::replace(next_ref, Frame::End);
85 if let Frame::Data(next_data) = next {
86 *self = next_data;
89 } else {
90 break;
91 }
92 } else {
93 break;
94 }
95 }
96 }
97}
98
99fn extract<I>(mut input: I) -> Arc<Frame<I>>
101where
102 I: Source,
103 I::Item: Sample,
104{
105 let frame_len = input.current_frame_len();
106
107 if frame_len == Some(0) {
108 return Arc::new(Frame::End);
109 }
110
111 let channels = input.channels();
112 let rate = input.sample_rate();
113 let data: Vec<I::Item> = input
114 .by_ref()
115 .take(cmp::min(frame_len.unwrap_or(32768), 32768))
116 .collect();
117
118 if data.is_empty() {
119 return Arc::new(Frame::End);
120 }
121
122 Arc::new(Frame::Data(FrameData {
123 data,
124 channels,
125 rate,
126 next: Mutex::new(Arc::new(Frame::Input(Mutex::new(Some(input))))),
127 }))
128}
129
130impl<I> Buffered<I>
131where
132 I: Source,
133 I::Item: Sample,
134{
135 fn next_frame(&mut self) {
137 let next_frame = {
138 let mut next_frame_ptr = match &*self.current_frame {
139 Frame::Data(FrameData { next, .. }) => next.lock().unwrap(),
140 _ => unreachable!(),
141 };
142
143 let next_frame = match &**next_frame_ptr {
144 Frame::Data(_) => next_frame_ptr.clone(),
145 Frame::End => next_frame_ptr.clone(),
146 Frame::Input(input) => {
147 let input = input.lock().unwrap().take().unwrap();
148 extract(input)
149 }
150 };
151
152 *next_frame_ptr = next_frame.clone();
153 next_frame
154 };
155
156 self.current_frame = next_frame;
157 self.position_in_frame = 0;
158 }
159}
160
161impl<I> Iterator for Buffered<I>
162where
163 I: Source,
164 I::Item: Sample,
165{
166 type Item = I::Item;
167
168 #[inline]
169 fn next(&mut self) -> Option<I::Item> {
170 let current_sample;
171 let advance_frame;
172
173 match &*self.current_frame {
174 Frame::Data(FrameData { data, .. }) => {
175 current_sample = Some(data[self.position_in_frame]);
176 self.position_in_frame += 1;
177 advance_frame = self.position_in_frame >= data.len();
178 }
179
180 Frame::End => {
181 current_sample = None;
182 advance_frame = false;
183 }
184
185 Frame::Input(_) => unreachable!(),
186 };
187
188 if advance_frame {
189 self.next_frame();
190 }
191
192 current_sample
193 }
194
195 #[inline]
196 fn size_hint(&self) -> (usize, Option<usize>) {
197 (0, None)
199 }
200}
201
202impl<I> Source for Buffered<I>
207where
208 I: Source,
209 I::Item: Sample,
210{
211 #[inline]
212 fn current_frame_len(&self) -> Option<usize> {
213 match &*self.current_frame {
214 Frame::Data(FrameData { data, .. }) => Some(data.len() - self.position_in_frame),
215 Frame::End => Some(0),
216 Frame::Input(_) => unreachable!(),
217 }
218 }
219
220 #[inline]
221 fn channels(&self) -> u16 {
222 match *self.current_frame {
223 Frame::Data(FrameData { channels, .. }) => channels,
224 Frame::End => 1,
225 Frame::Input(_) => unreachable!(),
226 }
227 }
228
229 #[inline]
230 fn sample_rate(&self) -> u32 {
231 match *self.current_frame {
232 Frame::Data(FrameData { rate, .. }) => rate,
233 Frame::End => 44100,
234 Frame::Input(_) => unreachable!(),
235 }
236 }
237
238 #[inline]
239 fn total_duration(&self) -> Option<Duration> {
240 self.total_duration
241 }
242}
243
244impl<I> Clone for Buffered<I>
245where
246 I: Source,
247 I::Item: Sample,
248{
249 #[inline]
250 fn clone(&self) -> Buffered<I> {
251 Buffered {
252 current_frame: self.current_frame.clone(),
253 position_in_frame: self.position_in_frame,
254 total_duration: self.total_duration,
255 }
256 }
257}