web_audio_api/media_streams/
mod.rs

1//! Primitives of the Media Capture and Streams API
2//!
3//! The Media Capture and Streams API, often called the Media Streams API or MediaStream API, is an
4//! API related to WebRTC which provides support for streaming audio and video data.
5//!
6//! It provides the interfaces and methods for working with the streams and their constituent
7//! tracks, the constraints associated with data formats, the success and error callbacks when
8//! using the data asynchronously, and the events that are fired during the process.
9//!
10//! <https://developer.mozilla.org/en-US/docs/Web/API/Media_Capture_and_Streams_API>
11
12use crate::{AudioBuffer, FallibleBuffer};
13use arc_swap::ArcSwap;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16
17/// Ready-state of a [`MediaStreamTrack`]
18#[derive(Copy, Clone, PartialEq, Eq, Debug)]
19pub enum MediaStreamTrackState {
20    /// The track is active (the track's underlying media source is making a best-effort attempt to
21    /// provide data in real time).
22    Live,
23    /// The track has ended (the track's underlying media source is no longer providing data, and
24    /// will never provide more data for this track). Once a track enters this state, it never
25    /// exits it.
26    Ended,
27}
28
29/// Single media track within a [`MediaStream`]
30#[derive(Clone)]
31pub struct MediaStreamTrack {
32    inner: Arc<MediaStreamTrackInner>,
33}
34
35impl std::fmt::Debug for MediaStreamTrack {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("MediaStreamTrack")
38            .field("ended", &self.inner.ended)
39            .finish_non_exhaustive()
40    }
41}
42
43struct MediaStreamTrackInner {
44    data: ArcSwap<FallibleBuffer>,
45    position: AtomicU64,
46    ended: AtomicBool,
47    provider: Mutex<Box<dyn Iterator<Item = FallibleBuffer> + Send + Sync + 'static>>,
48}
49
50impl MediaStreamTrack {
51    #[allow(clippy::should_implement_trait)]
52    pub fn from_iter<T: IntoIterator<Item = FallibleBuffer>>(iter: T) -> Self
53    where
54        <T as IntoIterator>::IntoIter: Send + Sync + 'static,
55    {
56        let initial = Ok(AudioBuffer::from(vec![vec![0.]], 48000.));
57        let inner = MediaStreamTrackInner {
58            data: ArcSwap::from_pointee(initial),
59            position: AtomicU64::new(0),
60            ended: AtomicBool::new(false),
61            provider: Mutex::new(Box::new(iter.into_iter())),
62        };
63        MediaStreamTrack {
64            inner: Arc::new(inner),
65        }
66    }
67
68    pub fn ready_state(&self) -> MediaStreamTrackState {
69        if self.inner.ended.load(Ordering::Relaxed) {
70            MediaStreamTrackState::Ended
71        } else {
72            MediaStreamTrackState::Live
73        }
74    }
75
76    pub fn iter(&self) -> impl Iterator<Item = FallibleBuffer> {
77        MediaStreamTrackIter {
78            track: Arc::clone(&self.inner),
79            position: 0,
80        }
81    }
82
83    #[allow(clippy::missing_panics_doc)]
84    pub fn close(&self) {
85        // TODO, close should only close this instance but should leave clones unaltered.
86        *self.inner.provider.lock().unwrap() = Box::new(std::iter::empty());
87    }
88}
89
90struct MediaStreamTrackIter {
91    track: Arc<MediaStreamTrackInner>,
92    position: u64,
93}
94
95impl Iterator for MediaStreamTrackIter {
96    type Item = FallibleBuffer;
97
98    fn next(&mut self) -> Option<Self::Item> {
99        if self.track.ended.load(Ordering::Relaxed) {
100            return None;
101        }
102
103        let mut stream_position = self.track.position.load(Ordering::Relaxed);
104        if stream_position == self.position {
105            match self.track.provider.lock().unwrap().next() {
106                Some(buf) => {
107                    let _ = self.track.data.swap(Arc::new(buf));
108                }
109                None => {
110                    self.track.ended.store(true, Ordering::Relaxed);
111                    return None;
112                }
113            }
114            stream_position += 1;
115            self.track.position.fetch_add(1, Ordering::Relaxed);
116        }
117
118        self.position = stream_position;
119        Some(match &self.track.data.load().as_ref() {
120            Ok(buf) => Ok(buf.clone()),
121            Err(e) => Err(e.to_string().into()),
122        })
123    }
124}
125
126/// Stream of media content.
127///
128/// A stream consists of several tracks, such as video or audio tracks. Each track is specified as
129/// an instance of [`MediaStreamTrack`].
130#[derive(Clone, Debug)]
131pub struct MediaStream {
132    tracks: Vec<MediaStreamTrack>,
133}
134
135impl MediaStream {
136    pub fn from_tracks(tracks: Vec<MediaStreamTrack>) -> Self {
137        Self { tracks }
138    }
139
140    pub fn get_tracks(&self) -> &[MediaStreamTrack] {
141        &self.tracks
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use float_eq::assert_float_eq;
148
149    use super::*;
150
151    #[test]
152    fn test_lazy() {
153        let buffers = vec![
154            Ok(AudioBuffer::from(vec![vec![1.]], 48000.)),
155            Ok(AudioBuffer::from(vec![vec![2.]], 48000.)),
156            Ok(AudioBuffer::from(vec![vec![3.]], 48000.)),
157        ];
158        let track = MediaStreamTrack::from_iter(buffers);
159
160        assert_eq!(track.ready_state(), MediaStreamTrackState::Live);
161
162        let mut iter = track.iter();
163        assert_float_eq!(
164            iter.next().unwrap().unwrap().get_channel_data(0)[..],
165            [1.][..],
166            abs_all <= 0.
167        );
168        assert_float_eq!(
169            iter.next().unwrap().unwrap().get_channel_data(0)[..],
170            &[2.][..],
171            abs_all <= 0.
172        );
173        assert_float_eq!(
174            iter.next().unwrap().unwrap().get_channel_data(0)[..],
175            &[3.][..],
176            abs_all <= 0.
177        );
178        assert!(iter.next().is_none());
179        assert!(iter.next().is_none());
180
181        assert_eq!(track.ready_state(), MediaStreamTrackState::Ended);
182    }
183
184    #[test]
185    fn test_lazy_multiple_consumers() {
186        let buffers = vec![
187            Ok(AudioBuffer::from(vec![vec![1.]], 48000.)),
188            Ok(AudioBuffer::from(vec![vec![2.]], 48000.)),
189            Ok(AudioBuffer::from(vec![vec![3.]], 48000.)),
190        ];
191        let track = MediaStreamTrack::from_iter(buffers);
192
193        let mut iter1 = track.iter();
194        let mut iter2 = track.iter();
195
196        // first poll iter1 once, then iter2 once
197        assert_float_eq!(
198            iter1.next().unwrap().unwrap().get_channel_data(0)[..],
199            [1.][..],
200            abs_all <= 0.
201        );
202        assert_float_eq!(
203            iter2.next().unwrap().unwrap().get_channel_data(0)[..],
204            &[1.][..],
205            abs_all <= 0.
206        );
207
208        // then poll iter1 twice
209        assert_float_eq!(
210            iter1.next().unwrap().unwrap().get_channel_data(0)[..],
211            &[2.][..],
212            abs_all <= 0.
213        );
214        assert_float_eq!(
215            iter1.next().unwrap().unwrap().get_channel_data(0)[..],
216            &[3.][..],
217            abs_all <= 0.
218        );
219
220        // polling iter2 will now yield the latest buffer
221        assert_float_eq!(
222            iter2.next().unwrap().unwrap().get_channel_data(0)[..],
223            &[3.][..],
224            abs_all <= 0.
225        );
226
227        assert!(iter1.next().is_none());
228        assert!(iter2.next().is_none());
229        assert_eq!(track.ready_state(), MediaStreamTrackState::Ended);
230    }
231
232    #[test]
233    fn test_close() {
234        let buffers = vec![
235            Ok(AudioBuffer::from(vec![vec![1.]], 48000.)),
236            Ok(AudioBuffer::from(vec![vec![2.]], 48000.)),
237            Ok(AudioBuffer::from(vec![vec![3.]], 48000.)),
238        ];
239        let track = MediaStreamTrack::from_iter(buffers);
240        let mut iter = track.iter();
241
242        assert_float_eq!(
243            iter.next().unwrap().unwrap().get_channel_data(0)[..],
244            [1.][..],
245            abs_all <= 0.
246        );
247
248        track.close();
249        assert!(iter.next().is_none());
250    }
251}