web_audio_api/media_streams/
mod.rs1use crate::{AudioBuffer, FallibleBuffer};
13use arc_swap::ArcSwap;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16
17#[derive(Copy, Clone, PartialEq, Eq, Debug)]
19pub enum MediaStreamTrackState {
20 Live,
23 Ended,
27}
28
29#[derive(Clone)]
31pub struct MediaStreamTrack {
32 inner: Arc<MediaStreamTrackInner>,
33}
34
35impl std::fmt::Debug for MediaStreamTrack {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("MediaStreamTrack")
38 .field("ended", &self.inner.ended)
39 .finish_non_exhaustive()
40 }
41}
42
43struct MediaStreamTrackInner {
44 data: ArcSwap<FallibleBuffer>,
45 position: AtomicU64,
46 ended: AtomicBool,
47 provider: Mutex<Box<dyn Iterator<Item = FallibleBuffer> + Send + Sync + 'static>>,
48}
49
50impl MediaStreamTrack {
51 #[allow(clippy::should_implement_trait)]
52 pub fn from_iter<T: IntoIterator<Item = FallibleBuffer>>(iter: T) -> Self
53 where
54 <T as IntoIterator>::IntoIter: Send + Sync + 'static,
55 {
56 let initial = Ok(AudioBuffer::from(vec![vec![0.]], 48000.));
57 let inner = MediaStreamTrackInner {
58 data: ArcSwap::from_pointee(initial),
59 position: AtomicU64::new(0),
60 ended: AtomicBool::new(false),
61 provider: Mutex::new(Box::new(iter.into_iter())),
62 };
63 MediaStreamTrack {
64 inner: Arc::new(inner),
65 }
66 }
67
68 pub fn ready_state(&self) -> MediaStreamTrackState {
69 if self.inner.ended.load(Ordering::Relaxed) {
70 MediaStreamTrackState::Ended
71 } else {
72 MediaStreamTrackState::Live
73 }
74 }
75
76 pub fn iter(&self) -> impl Iterator<Item = FallibleBuffer> {
77 MediaStreamTrackIter {
78 track: Arc::clone(&self.inner),
79 position: 0,
80 }
81 }
82
83 #[allow(clippy::missing_panics_doc)]
84 pub fn close(&self) {
85 *self.inner.provider.lock().unwrap() = Box::new(std::iter::empty());
87 }
88}
89
90struct MediaStreamTrackIter {
91 track: Arc<MediaStreamTrackInner>,
92 position: u64,
93}
94
95impl Iterator for MediaStreamTrackIter {
96 type Item = FallibleBuffer;
97
98 fn next(&mut self) -> Option<Self::Item> {
99 if self.track.ended.load(Ordering::Relaxed) {
100 return None;
101 }
102
103 let mut stream_position = self.track.position.load(Ordering::Relaxed);
104 if stream_position == self.position {
105 match self.track.provider.lock().unwrap().next() {
106 Some(buf) => {
107 let _ = self.track.data.swap(Arc::new(buf));
108 }
109 None => {
110 self.track.ended.store(true, Ordering::Relaxed);
111 return None;
112 }
113 }
114 stream_position += 1;
115 self.track.position.fetch_add(1, Ordering::Relaxed);
116 }
117
118 self.position = stream_position;
119 Some(match &self.track.data.load().as_ref() {
120 Ok(buf) => Ok(buf.clone()),
121 Err(e) => Err(e.to_string().into()),
122 })
123 }
124}
125
126#[derive(Clone, Debug)]
131pub struct MediaStream {
132 tracks: Vec<MediaStreamTrack>,
133}
134
135impl MediaStream {
136 pub fn from_tracks(tracks: Vec<MediaStreamTrack>) -> Self {
137 Self { tracks }
138 }
139
140 pub fn get_tracks(&self) -> &[MediaStreamTrack] {
141 &self.tracks
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use float_eq::assert_float_eq;
148
149 use super::*;
150
151 #[test]
152 fn test_lazy() {
153 let buffers = vec![
154 Ok(AudioBuffer::from(vec![vec![1.]], 48000.)),
155 Ok(AudioBuffer::from(vec![vec![2.]], 48000.)),
156 Ok(AudioBuffer::from(vec![vec![3.]], 48000.)),
157 ];
158 let track = MediaStreamTrack::from_iter(buffers);
159
160 assert_eq!(track.ready_state(), MediaStreamTrackState::Live);
161
162 let mut iter = track.iter();
163 assert_float_eq!(
164 iter.next().unwrap().unwrap().get_channel_data(0)[..],
165 [1.][..],
166 abs_all <= 0.
167 );
168 assert_float_eq!(
169 iter.next().unwrap().unwrap().get_channel_data(0)[..],
170 &[2.][..],
171 abs_all <= 0.
172 );
173 assert_float_eq!(
174 iter.next().unwrap().unwrap().get_channel_data(0)[..],
175 &[3.][..],
176 abs_all <= 0.
177 );
178 assert!(iter.next().is_none());
179 assert!(iter.next().is_none());
180
181 assert_eq!(track.ready_state(), MediaStreamTrackState::Ended);
182 }
183
184 #[test]
185 fn test_lazy_multiple_consumers() {
186 let buffers = vec![
187 Ok(AudioBuffer::from(vec![vec![1.]], 48000.)),
188 Ok(AudioBuffer::from(vec![vec![2.]], 48000.)),
189 Ok(AudioBuffer::from(vec![vec![3.]], 48000.)),
190 ];
191 let track = MediaStreamTrack::from_iter(buffers);
192
193 let mut iter1 = track.iter();
194 let mut iter2 = track.iter();
195
196 assert_float_eq!(
198 iter1.next().unwrap().unwrap().get_channel_data(0)[..],
199 [1.][..],
200 abs_all <= 0.
201 );
202 assert_float_eq!(
203 iter2.next().unwrap().unwrap().get_channel_data(0)[..],
204 &[1.][..],
205 abs_all <= 0.
206 );
207
208 assert_float_eq!(
210 iter1.next().unwrap().unwrap().get_channel_data(0)[..],
211 &[2.][..],
212 abs_all <= 0.
213 );
214 assert_float_eq!(
215 iter1.next().unwrap().unwrap().get_channel_data(0)[..],
216 &[3.][..],
217 abs_all <= 0.
218 );
219
220 assert_float_eq!(
222 iter2.next().unwrap().unwrap().get_channel_data(0)[..],
223 &[3.][..],
224 abs_all <= 0.
225 );
226
227 assert!(iter1.next().is_none());
228 assert!(iter2.next().is_none());
229 assert_eq!(track.ready_state(), MediaStreamTrackState::Ended);
230 }
231
232 #[test]
233 fn test_close() {
234 let buffers = vec![
235 Ok(AudioBuffer::from(vec![vec![1.]], 48000.)),
236 Ok(AudioBuffer::from(vec![vec![2.]], 48000.)),
237 Ok(AudioBuffer::from(vec![vec![3.]], 48000.)),
238 ];
239 let track = MediaStreamTrack::from_iter(buffers);
240 let mut iter = track.iter();
241
242 assert_float_eq!(
243 iter.next().unwrap().unwrap().get_channel_data(0)[..],
244 [1.][..],
245 abs_all <= 0.
246 );
247
248 track.close();
249 assert!(iter.next().is_none());
250 }
251}