cog_task/resource/stream/
mod.rs

1use crate::server::Config;
2use eframe::egui::mutex::RwLock;
3use eframe::egui::{TextureId, Vec2};
4use eframe::epaint::TextureManager;
5use eyre::{eyre, Context, Result};
6use serde::{Deserialize, Serialize};
7use std::fmt::{Debug, Formatter};
8use std::fs::File;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13#[cfg(feature = "ffmpeg")]
14mod ffmpeg;
15#[cfg(feature = "gstreamer")]
16mod gst;
17
18pub type FrameBuffer = Arc<Vec<(TextureId, Vec2)>>;
19
20#[derive(Clone)]
21pub enum Stream {
22    None,
23    #[cfg(feature = "gstreamer")]
24    Gst(gst::Stream),
25    #[cfg(feature = "ffmpeg")]
26    Ffmpeg(ffmpeg::Stream),
27}
28
29pub fn stream_from_file(
30    tex_manager: Arc<RwLock<TextureManager>>,
31    path: &Path,
32    config: &Config,
33) -> Result<Stream> {
34    Stream::new(tex_manager, path, config)
35}
36
37pub fn video_from_file(
38    tex_manager: Arc<RwLock<TextureManager>>,
39    path: &Path,
40    config: &Config,
41) -> Result<(FrameBuffer, f64)> {
42    Stream::new(tex_manager, path, config)?.pull_samples()
43}
44
45pub trait MediaStream
46where
47    Self: Sized,
48{
49    fn new(tex_manager: Arc<RwLock<TextureManager>>, path: &Path, config: &Config) -> Result<Self>;
50    fn cloned(
51        &self,
52        frame: Arc<Mutex<Option<(TextureId, Vec2)>>>,
53        media_mode: StreamMode,
54        volume: f32,
55    ) -> Result<Self>;
56
57    fn eos(&self) -> bool;
58    fn size(&self) -> [u32; 2];
59    fn framerate(&self) -> f64;
60    fn channels(&self) -> u16;
61    fn duration(&self) -> Duration;
62    fn has_video(&self) -> bool {
63        self.size().iter().sum::<u32>() > 0
64    }
65    fn has_audio(&self) -> bool {
66        self.channels() > 0
67    }
68
69    fn start(&mut self) -> Result<()>;
70    fn restart(&mut self) -> Result<()>;
71    fn pause(&mut self) -> Result<()>;
72    fn pull_samples(&self) -> Result<(FrameBuffer, f64)>;
73    fn process_bus(&mut self, looping: bool) -> Result<bool>;
74}
75
76#[derive(Debug, Clone)]
77pub enum StreamMode {
78    Query,
79    Normal,
80    Muted,
81    SansIntTrigger,
82    WithExtTrigger(PathBuf),
83}
84
85#[derive(Debug, Copy, Clone, Deserialize, Serialize)]
86#[serde(rename_all = "snake_case")]
87pub enum StreamBackend {
88    None,
89    Inherit,
90    #[cfg(feature = "gstreamer")]
91    Gst,
92    #[cfg(feature = "ffmpeg")]
93    Ffmpeg,
94}
95
96impl Default for StreamBackend {
97    #[inline(always)]
98    fn default() -> Self {
99        StreamBackend::Inherit
100    }
101}
102
103impl StreamBackend {
104    pub fn or(&self, other: &Self) -> Self {
105        if let Self::Inherit = self {
106            *other
107        } else {
108            *self
109        }
110    }
111}
112
113impl Debug for Stream {
114    #[inline(always)]
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        write!(f, "[video::Handle]")
117    }
118}
119
120impl Stream {
121    /// Create a new video object from a given video file.
122    #[allow(unused_variables)]
123    pub fn new(
124        tex_manager: Arc<RwLock<TextureManager>>,
125        path: &Path,
126        config: &Config,
127    ) -> Result<Self> {
128        {
129            File::open(path).wrap_err_with(|| format!("Failed to open stream file ({path:?})."))?;
130        }
131
132        let media_backend = config.stream_backend();
133        match media_backend {
134            StreamBackend::None => Err(eyre!("Cannot init a stream with backend=None.")),
135            StreamBackend::Inherit => Err(eyre!("Cannot init a stream with backend=Inherit.")),
136            #[cfg(feature = "ffmpeg")]
137            StreamBackend::Ffmpeg => {
138                ffmpeg::Stream::new(tex_manager, path, config).map(Stream::Ffmpeg)
139            }
140            #[cfg(feature = "gstreamer")]
141            StreamBackend::Gst => gst::Stream::new(tex_manager, path, config).map(Stream::Gst),
142        }
143    }
144
145    /// Check if stream has reached its end.
146    #[inline(always)]
147    pub fn eos(&self) -> bool {
148        match self {
149            Stream::None => true,
150            #[cfg(feature = "gstreamer")]
151            Stream::Gst(stream) => stream.eos(),
152            #[cfg(feature = "ffmpeg")]
153            Stream::Ffmpeg(stream) => stream.eos(),
154        }
155    }
156
157    /// Get the size/resolution of the video as `[width, height]`.
158    #[inline(always)]
159    pub fn size(&self) -> [u32; 2] {
160        match self {
161            Stream::None => [0, 0],
162            #[cfg(feature = "gstreamer")]
163            Stream::Gst(stream) => stream.size(),
164            #[cfg(feature = "ffmpeg")]
165            Stream::Ffmpeg(stream) => stream.size(),
166        }
167    }
168
169    /// Get the framerate of the video as frames per second.
170    #[inline(always)]
171    pub fn framerate(&self) -> f64 {
172        match self {
173            Stream::None => 0.0,
174            #[cfg(feature = "gstreamer")]
175            Stream::Gst(stream) => stream.framerate(),
176            #[cfg(feature = "ffmpeg")]
177            Stream::Ffmpeg(stream) => stream.framerate(),
178        }
179    }
180
181    /// Get the number of audio channels.
182    #[inline(always)]
183    pub fn channels(&self) -> u16 {
184        match self {
185            Stream::None => 0,
186            #[cfg(feature = "gstreamer")]
187            Stream::Gst(stream) => stream.channels(),
188            #[cfg(feature = "ffmpeg")]
189            Stream::Ffmpeg(stream) => stream.channels(),
190        }
191    }
192
193    /// Get the media duration.
194    #[inline(always)]
195    pub fn duration(&self) -> Duration {
196        match self {
197            Stream::None => Duration::default(),
198            #[cfg(feature = "gstreamer")]
199            Stream::Gst(stream) => stream.duration(),
200            #[cfg(feature = "ffmpeg")]
201            Stream::Ffmpeg(stream) => stream.duration(),
202        }
203    }
204
205    /// Check if stream has a video channel.
206    #[inline(always)]
207    pub fn has_video(&self) -> bool {
208        match self {
209            Stream::None => false,
210            #[cfg(feature = "gstreamer")]
211            Stream::Gst(stream) => stream.has_video(),
212            #[cfg(feature = "ffmpeg")]
213            Stream::Ffmpeg(stream) => stream.has_video(),
214        }
215    }
216
217    /// Check if stream has an audio channel.
218    #[inline(always)]
219    pub fn has_audio(&self) -> bool {
220        match self {
221            Stream::None => false,
222            #[cfg(feature = "gstreamer")]
223            Stream::Gst(stream) => stream.has_audio(),
224            #[cfg(feature = "ffmpeg")]
225            Stream::Ffmpeg(stream) => stream.has_audio(),
226        }
227    }
228
229    // /// Set the volume multiplier of the audio.
230    // /// `0.0` = 0% volume, `1.0` = 100% volume.
231    // pub fn set_volume(&mut self, volume: f64) {
232    //     self.source.set_property("volume", &volume);
233    // }
234
235    // /// Set if the audio is muted or not, without changing the volume.
236    // pub fn set_muted(&mut self, muted: bool) {
237    //     self.source.set_property("mute", &muted);
238    // }
239
240    // /// Get if the stream ended or not.
241    // #[inline]
242    // pub fn eos(&self) -> bool {
243    //     self.is_eos
244    // }
245
246    // /// Get if the stream is paused.
247    // #[inline]
248    // pub fn paused(&self) -> bool {
249    //     self.paused
250    // }
251
252    // /// Set if the media is paused or not.
253    // pub fn set_paused(&mut self, paused: bool) -> Result<(), error::Error> {
254    //     self.source
255    //         .set_state(if paused {
256    //             gst::State::Paused
257    //         } else {
258    //             gst::State::Playing
259    //         })
260    //         .map_err(|e| VideoDecodingError(format!("Failed to change video state:\n{e:#?}")))?;
261    //     self.paused = paused;
262    //     Ok(())
263    // }
264
265    /// Starts a stream; assumes it is at first frame and unpauses.
266    pub fn start(&mut self) -> Result<()> {
267        match self {
268            Stream::None => Err(eyre!("Cannot start stream with backend=None.")),
269            #[cfg(feature = "gstreamer")]
270            Stream::Gst(stream) => stream.start(),
271            #[cfg(feature = "ffmpeg")]
272            Stream::Ffmpeg(stream) => stream.start(),
273        }
274    }
275
276    /// Restarts a stream; seeks to the first frame and unpauses, sets the `eos` flag to false.
277    pub fn restart(&mut self) -> Result<()> {
278        match self {
279            Stream::None => Err(eyre!("Cannot restart stream with backend=None.")),
280            #[cfg(feature = "gstreamer")]
281            Stream::Gst(stream) => stream.restart(),
282            #[cfg(feature = "ffmpeg")]
283            Stream::Ffmpeg(stream) => stream.restart(),
284        }
285    }
286
287    /// Pauses a stream
288    pub fn pause(&mut self) -> Result<()> {
289        match self {
290            Stream::None => Err(eyre!("Cannot pause stream with backend=None.")),
291            #[cfg(feature = "gstreamer")]
292            Stream::Gst(stream) => stream.pause(),
293            #[cfg(feature = "ffmpeg")]
294            Stream::Ffmpeg(stream) => stream.pause(),
295        }
296    }
297
298    #[allow(unused_variables)]
299    pub fn process_bus(&mut self, looping: bool) -> Result<bool> {
300        match self {
301            Stream::None => Err(eyre!("Cannot process bus for stream with backend=None.")),
302            #[cfg(feature = "gstreamer")]
303            Stream::Gst(stream) => stream.process_bus(looping),
304            #[cfg(feature = "ffmpeg")]
305            Stream::Ffmpeg(stream) => stream.process_bus(looping),
306        }
307    }
308
309    #[allow(unused_variables)]
310    pub fn cloned(
311        &self,
312        frame: Arc<Mutex<Option<(TextureId, Vec2)>>>,
313        mode: StreamMode,
314        volume: f32,
315    ) -> Result<Self> {
316        match self {
317            Stream::None => Err(eyre!("Cloning stream with backend=None is pointless.")),
318            #[cfg(feature = "gstreamer")]
319            Stream::Gst(stream) => stream.cloned(frame, mode, volume).map(Stream::Gst),
320            #[cfg(feature = "ffmpeg")]
321            Stream::Ffmpeg(stream) => stream.cloned(frame, mode, volume).map(Stream::Ffmpeg),
322        }
323    }
324
325    pub fn pull_samples(&self) -> Result<(FrameBuffer, f64)> {
326        match self {
327            Stream::None => Err(eyre!("Cannot pull samples from stream with backend=None.")),
328            #[cfg(feature = "gstreamer")]
329            Stream::Gst(stream) => stream.pull_samples(),
330            #[cfg(feature = "ffmpeg")]
331            Stream::Ffmpeg(stream) => stream.pull_samples(),
332        }
333    }
334}