ezk/
source.rs

1use crate::{Frame, MediaType, Result};
2use downcast_rs::Downcast;
3use reusable_box::{ReusableBox, ReusedBoxFuture};
4use std::future::Future;
5
6/// Types that implement [`Source`] and are allowed to prematurely drop the [`next_event`](Source::next_event) future.
7///
8/// Code that uses something like tokio's [`select!`] macro may cancel the call to `next_event`.
9/// This cancellation is only "safe" if the `Source` is marked by this trait.
10///
11/// When using [`BoxedSource`] try using [`BoxedSourceCancelSafe`] instead if this trait is required. See [`Source::boxed_cancel_safe`].
12pub trait NextEventIsCancelSafe {}
13
14#[derive(Debug)]
15pub enum SourceEvent<M: MediaType> {
16    /// Source produced a frame
17    Frame(Frame<M>),
18
19    /// This source will provide no more data
20    // TODO: should this be a permanent, or should a source be able to restart?
21    EndOfData,
22
23    /// After this event is received the source must be renegotiated using [`Source::negotiate_config`]. This may
24    /// occur when upstream sources (or their dependencies) change.
25    RenegotiationNeeded,
26}
27
28pub trait Source: Send + Sized + 'static {
29    type MediaType: MediaType;
30
31    fn capabilities(
32        &mut self,
33    ) -> impl Future<Output = Result<Vec<<Self::MediaType as MediaType>::ConfigRange>>> + Send;
34
35    /// Provide a list of available configurations ranges. Upstream sources then filter out invalid/incompatible
36    /// configs until the "root" source then decides on the streaming config, which is then returned down the source
37    /// stack. This must always be called before trying to call [`Source::next_event`].
38    ///
39    /// # Cancel safety
40    ///
41    /// This method should __never__ be considered cancel safe, as it may leave some sources in the stack configured and others not.
42    fn negotiate_config(
43        &mut self,
44        available: Vec<<Self::MediaType as MediaType>::ConfigRange>,
45    ) -> impl Future<Output = Result<<Self::MediaType as MediaType>::Config>> + Send;
46
47    /// Fetch the next event from the source. This method should be called as much as possible to
48    /// allow source to drive their internal logic without having to rely on extra tasks.
49    ///
50    /// Should return [`SourceEvent::RenegotiationNeeded`] when called before [`negotiate_config`](Source::negotiate_config).
51    ///
52    /// # Cancel safety
53    ///
54    /// The cancel safety of this method is marked using the [`NextEventIsCancelSafe`] trait
55    fn next_event(&mut self) -> impl Future<Output = Result<SourceEvent<Self::MediaType>>> + Send;
56
57    /// Erase the type of this source
58    fn boxed(self) -> BoxedSource<Self::MediaType> {
59        BoxedSource {
60            source: Box::new(self),
61            reusable_box: ReusableBox::new(),
62        }
63    }
64
65    /// Erase the type of this source, keeping the information that the `next_event` future is cancel safe
66    fn boxed_cancel_safe(self) -> BoxedSourceCancelSafe<Self::MediaType>
67    where
68        Self: NextEventIsCancelSafe,
69    {
70        BoxedSourceCancelSafe::new(self)
71    }
72}
73
74/// Type erased source
75///
76/// Used when generics are not possible since one wants to store different source or to avoid deep generic nestings
77/// which inevitably leads to long compile times.
78pub struct BoxedSource<M: MediaType> {
79    source: Box<dyn DynSource<MediaType = M>>,
80
81    // to avoid frequent reallocations of futures
82    // every future is allocated using this reusable box
83    reusable_box: ReusableBox,
84}
85
86impl<M: MediaType> BoxedSource<M> {
87    #[inline]
88    pub fn new(source: impl Source<MediaType = M>) -> Self {
89        source.boxed()
90    }
91
92    #[inline]
93    pub fn downcast<T: Source<MediaType = M>>(self) -> Result<Box<T>, Self> {
94        self.source.downcast::<T>().map_err(|source| Self {
95            source,
96            reusable_box: self.reusable_box,
97        })
98    }
99
100    #[inline]
101    pub fn downcast_ref<T: Source<MediaType = M>>(&self) -> Option<&T> {
102        self.source.downcast_ref::<T>()
103    }
104
105    #[inline]
106    pub fn downcast_mut<T: Source<MediaType = M>>(&mut self) -> Option<&mut T> {
107        self.source.downcast_mut::<T>()
108    }
109}
110
111impl<M: MediaType> Source for BoxedSource<M> {
112    type MediaType = M;
113
114    async fn capabilities(&mut self) -> Result<Vec<<Self::MediaType as MediaType>::ConfigRange>> {
115        self.source.capabilities(&mut self.reusable_box).await
116    }
117
118    async fn negotiate_config(
119        &mut self,
120        available: Vec<<Self::MediaType as MediaType>::ConfigRange>,
121    ) -> Result<<Self::MediaType as MediaType>::Config> {
122        self.source
123            .negotiate_config(available, &mut self.reusable_box)
124            .await
125    }
126
127    async fn next_event(&mut self) -> Result<SourceEvent<Self::MediaType>> {
128        self.source.next_event(&mut self.reusable_box).await
129    }
130
131    fn boxed(self) -> BoxedSource<Self::MediaType> {
132        self
133    }
134
135    fn boxed_cancel_safe(self) -> BoxedSourceCancelSafe<Self::MediaType>
136    where
137        Self: NextEventIsCancelSafe,
138    {
139        BoxedSourceCancelSafe(self)
140    }
141}
142
143// Type erased object safe source, only used in BoxedSource
144trait DynSource: Downcast + Send + 'static {
145    type MediaType: MediaType;
146
147    fn capabilities<'a>(
148        &'a mut self,
149        bx: &'a mut ReusableBox,
150    ) -> ReusedBoxFuture<'a, Result<Vec<<Self::MediaType as MediaType>::ConfigRange>>>;
151
152    fn negotiate_config<'a>(
153        &'a mut self,
154        available: Vec<<Self::MediaType as MediaType>::ConfigRange>,
155        bx: &'a mut ReusableBox,
156    ) -> ReusedBoxFuture<'a, Result<<Self::MediaType as MediaType>::Config>>;
157
158    fn next_event<'a>(
159        &'a mut self,
160        bx: &'a mut ReusableBox,
161    ) -> ReusedBoxFuture<'a, Result<SourceEvent<Self::MediaType>>>;
162}
163
164downcast_rs::impl_downcast!(DynSource assoc MediaType);
165
166impl<S: Source> DynSource for S {
167    type MediaType = S::MediaType;
168
169    fn capabilities<'a>(
170        &'a mut self,
171        bx: &'a mut ReusableBox,
172    ) -> ReusedBoxFuture<'a, Result<Vec<<Self::MediaType as MediaType>::ConfigRange>>> {
173        bx.store_future(Source::capabilities(self))
174    }
175
176    fn negotiate_config<'a>(
177        &'a mut self,
178        available: Vec<<Self::MediaType as MediaType>::ConfigRange>,
179        bx: &'a mut ReusableBox,
180    ) -> ReusedBoxFuture<'a, Result<<Self::MediaType as MediaType>::Config>> {
181        bx.store_future(Source::negotiate_config(self, available))
182    }
183
184    fn next_event<'a>(
185        &'a mut self,
186        bx: &'a mut ReusableBox,
187    ) -> ReusedBoxFuture<'a, Result<SourceEvent<Self::MediaType>>> {
188        bx.store_future(Source::next_event(self))
189    }
190}
191
192/// [`BoxedSource`] with NextEventIsCancelSafe implemented
193pub struct BoxedSourceCancelSafe<M: MediaType>(BoxedSource<M>);
194
195impl<M: MediaType> NextEventIsCancelSafe for BoxedSourceCancelSafe<M> {}
196
197impl<M: MediaType> BoxedSourceCancelSafe<M> {
198    #[inline]
199    pub fn new(source: impl Source<MediaType = M> + NextEventIsCancelSafe) -> Self {
200        Self(source.boxed())
201    }
202
203    #[inline]
204    pub fn downcast<T: Source<MediaType = M>>(self) -> Result<Box<T>, Self> {
205        self.0.downcast().map_err(Self)
206    }
207
208    #[inline]
209    pub fn downcast_ref<T: Source<MediaType = M>>(&self) -> Option<&T> {
210        self.0.downcast_ref()
211    }
212
213    #[inline]
214    pub fn downcast_mut<T: Source<MediaType = M>>(&mut self) -> Option<&mut T> {
215        self.0.downcast_mut()
216    }
217}
218
219impl<M: MediaType> Source for BoxedSourceCancelSafe<M> {
220    type MediaType = M;
221
222    async fn capabilities(&mut self) -> Result<Vec<<Self::MediaType as MediaType>::ConfigRange>> {
223        Source::capabilities(&mut self.0).await
224    }
225
226    async fn negotiate_config(
227        &mut self,
228        available: Vec<<Self::MediaType as MediaType>::ConfigRange>,
229    ) -> Result<<Self::MediaType as MediaType>::Config> {
230        Source::negotiate_config(&mut self.0, available).await
231    }
232
233    async fn next_event(&mut self) -> Result<SourceEvent<Self::MediaType>> {
234        Source::next_event(&mut self.0).await
235    }
236
237    fn boxed(self) -> BoxedSource<Self::MediaType> {
238        self.0
239    }
240
241    fn boxed_cancel_safe(self) -> BoxedSourceCancelSafe<Self::MediaType>
242    where
243        Self: NextEventIsCancelSafe,
244    {
245        self
246    }
247}