Skip to main content

oxideav_scene/
source.rs

1//! Source / Sink plumbing.
2//!
3//! A scene is a media producer — drive it with a [`SceneRenderer`]
4//! and it yields a stream of [`RenderedFrame`]s at the scene's
5//! [`framerate`](crate::Scene::framerate). The [`SceneSource`] trait
6//! formalises that contract so scenes can slot into the same pipe
7//! topology that oxideav uses for live decoders, capture devices,
8//! and file readers.
9//!
10//! A [`SceneSink`] consumes `RenderedFrame`s and does something with
11//! them — encode + mux to a file, push to an RTMP endpoint, render
12//! to a window, emit PDF operators to a writer, etc. The sink trait
13//! is deliberately thin: `init` once, `push` per frame, `finalise`
14//! once. Format negotiation happens upfront via [`SourceFormat`].
15//!
16//! [`drive`] is the glue: it runs the pull loop until the source is
17//! exhausted or the sink errors out.
18
19use oxideav_core::{Error, Rational, Result, TimeBase};
20
21use crate::duration::{SceneDuration, TimeStamp};
22use crate::object::Canvas;
23use crate::render::{RenderedFrame, SceneRenderer};
24use crate::scene::Scene;
25
26/// Format contract between a [`SceneSource`] and a [`SceneSink`].
27///
28/// Everything a sink needs to set up encoders / muxers / windows
29/// before the first frame arrives.
30#[derive(Clone, Debug)]
31pub struct SourceFormat {
32    pub canvas: Canvas,
33    pub framerate: Rational,
34    pub time_base: TimeBase,
35    pub sample_rate: u32,
36    /// Whether the source has a known end. `Finite(n)` lets sinks
37    /// size output containers; `Indefinite` signals a streaming
38    /// source that runs until externally stopped.
39    pub duration: SceneDuration,
40}
41
42impl SourceFormat {
43    /// Build from a scene's current state. The renderer consumes
44    /// this at `init()` time so downstream encoder settings match
45    /// the scene's declarations.
46    pub fn from_scene(scene: &Scene) -> Self {
47        SourceFormat {
48            canvas: scene.canvas,
49            framerate: scene.framerate,
50            time_base: scene.time_base,
51            sample_rate: scene.sample_rate,
52            duration: scene.duration,
53        }
54    }
55}
56
57/// Pull-based source of rendered frames.
58///
59/// Implementors typically wrap a [`Scene`] + [`SceneRenderer`] and
60/// advance an internal frame counter per `pull()`. The first call
61/// after `prepare` emits frame 0 at timestamp 0; each subsequent
62/// call advances by `1 / framerate`.
63///
64/// Sources are **not** required to be seekable — a streaming
65/// compositor source is forward-only. Sources that can seek should
66/// expose it via an inherent method, not this trait.
67pub trait SceneSource {
68    /// Declared format. Constant across a session.
69    fn format(&self) -> SourceFormat;
70
71    /// Produce the next rendered tick. Returns `Ok(None)` when the
72    /// source is exhausted (finite scene reached its end). For
73    /// indefinite sources, this never returns `None`.
74    fn pull(&mut self) -> Result<Option<RenderedFrame>>;
75}
76
77/// Push-based sink for rendered frames.
78///
79/// Implementors set up encoders / muxers in [`init`], receive one
80/// frame per [`push`] call, then release any buffered state in
81/// [`finalise`]. A sink that fails mid-stream should return the
82/// error from `push` — [`drive`] will call `finalise` regardless.
83///
84/// [`init`]: SceneSink::init
85/// [`push`]: SceneSink::push
86/// [`finalise`]: SceneSink::finalise
87pub trait SceneSink {
88    /// Called once before the first `push`. The sink may return
89    /// `Error::Unsupported` if it can't handle the format.
90    fn init(&mut self, format: &SourceFormat) -> Result<()>;
91
92    /// Consume one rendered frame. Time is embedded in the frame's
93    /// video pts + audio sample count; the sink itself doesn't need
94    /// to track timestamps separately.
95    fn push(&mut self, frame: RenderedFrame) -> Result<()>;
96
97    /// Flush + close. No more `push` calls after this. Always
98    /// called by `drive`, even on error paths.
99    fn finalise(&mut self) -> Result<()>;
100}
101
102/// Pull-loop helper. Drives `source` → `sink` until the source is
103/// exhausted or either side errors out. `finalise` is always
104/// called on the sink; `init` happens before the first pull.
105pub fn drive(source: &mut dyn SceneSource, sink: &mut dyn SceneSink) -> Result<()> {
106    let fmt = source.format();
107    sink.init(&fmt)?;
108    let result = drive_loop(source, sink);
109    let fin = sink.finalise();
110    result.and(fin)
111}
112
113fn drive_loop(source: &mut dyn SceneSource, sink: &mut dyn SceneSink) -> Result<()> {
114    loop {
115        match source.pull()? {
116            Some(frame) => sink.push(frame)?,
117            None => return Ok(()),
118        }
119    }
120}
121
122/// Default [`SceneSource`] implementation wrapping a scene + a
123/// renderer. Advances one frame per `pull` at the scene's declared
124/// framerate; emits `None` when a finite scene's last frame has
125/// been yielded.
126pub struct RenderedSource<R: SceneRenderer> {
127    scene: Scene,
128    renderer: R,
129    next_frame: u64,
130    total_frames: Option<u64>,
131    prepared: bool,
132}
133
134impl<R: SceneRenderer> RenderedSource<R> {
135    /// Take ownership of `scene` + `renderer`. Does not call
136    /// `prepare` on the renderer — that happens lazily on the first
137    /// `pull`.
138    pub fn new(scene: Scene, renderer: R) -> Self {
139        let total_frames = scene.frame_count();
140        RenderedSource {
141            scene,
142            renderer,
143            next_frame: 0,
144            total_frames,
145            prepared: false,
146        }
147    }
148
149    /// Access the underlying scene (read-only). Useful for tests +
150    /// compositors that want to inspect state between pulls.
151    pub fn scene(&self) -> &Scene {
152        &self.scene
153    }
154
155    /// Mutate the scene between pulls. The streaming-compositor use
156    /// case uses this to apply `Operation`s pulled from a control
157    /// channel. Mid-stream mutations MUST NOT shift earlier
158    /// timestamps — append-only operations (new keyframes after
159    /// `next_timestamp()`, new objects, removed-in-future) are
160    /// safe; rewriting existing keyframes is not.
161    pub fn scene_mut(&mut self) -> &mut Scene {
162        &mut self.scene
163    }
164
165    /// Timestamp of the next frame to be pulled.
166    pub fn next_timestamp(&self) -> TimeStamp {
167        self.scene.frame_to_timestamp(self.next_frame)
168    }
169}
170
171impl<R: SceneRenderer> SceneSource for RenderedSource<R> {
172    fn format(&self) -> SourceFormat {
173        SourceFormat::from_scene(&self.scene)
174    }
175
176    fn pull(&mut self) -> Result<Option<RenderedFrame>> {
177        if let Some(total) = self.total_frames {
178            if self.next_frame >= total {
179                return Ok(None);
180            }
181        }
182        if !self.prepared {
183            self.renderer.prepare(&self.scene)?;
184            self.prepared = true;
185        }
186        let t = self.next_timestamp();
187        let frame = self.renderer.render_at(&self.scene, t)?;
188        self.next_frame += 1;
189        Ok(Some(frame))
190    }
191}
192
193/// Discarding sink — useful for correctness tests + dry runs that
194/// exercise the pull loop without wiring an encoder. Records a
195/// frame + byte counter so callers can assert progress.
196#[derive(Default)]
197pub struct NullSink {
198    pub frames_received: u64,
199    pub bytes_received: u64,
200    pub format_seen: Option<SourceFormat>,
201}
202
203impl SceneSink for NullSink {
204    fn init(&mut self, format: &SourceFormat) -> Result<()> {
205        self.format_seen = Some(format.clone());
206        Ok(())
207    }
208
209    fn push(&mut self, frame: RenderedFrame) -> Result<()> {
210        self.frames_received += 1;
211        if let Some(v) = frame.video.as_ref() {
212            self.bytes_received += v.planes.iter().map(|p| p.data.len() as u64).sum::<u64>();
213        }
214        self.bytes_received += (frame.audio.len() * std::mem::size_of::<f32>()) as u64;
215        Ok(())
216    }
217
218    fn finalise(&mut self) -> Result<()> {
219        Ok(())
220    }
221}
222
223/// Sink that forwards to a closure. Handy for tests + one-off
224/// integrations where a full trait impl is overkill.
225pub struct FnSink<F>
226where
227    F: FnMut(&SourceFormat, RenderedFrame) -> Result<()>,
228{
229    format: Option<SourceFormat>,
230    cb: F,
231}
232
233impl<F> FnSink<F>
234where
235    F: FnMut(&SourceFormat, RenderedFrame) -> Result<()>,
236{
237    pub fn new(cb: F) -> Self {
238        FnSink { format: None, cb }
239    }
240}
241
242impl<F> SceneSink for FnSink<F>
243where
244    F: FnMut(&SourceFormat, RenderedFrame) -> Result<()>,
245{
246    fn init(&mut self, format: &SourceFormat) -> Result<()> {
247        self.format = Some(format.clone());
248        Ok(())
249    }
250
251    fn push(&mut self, frame: RenderedFrame) -> Result<()> {
252        let fmt = self.format.as_ref().ok_or_else(|| {
253            Error::invalid("FnSink: push before init — call SceneSink::init first")
254        })?;
255        (self.cb)(fmt, frame)
256    }
257
258    fn finalise(&mut self) -> Result<()> {
259        Ok(())
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::render::StubRenderer;
267
268    /// Trivial `SceneSource` that emits 3 empty frames and stops.
269    struct CountingSource {
270        fmt: SourceFormat,
271        left: u32,
272    }
273
274    impl SceneSource for CountingSource {
275        fn format(&self) -> SourceFormat {
276            self.fmt.clone()
277        }
278        fn pull(&mut self) -> Result<Option<RenderedFrame>> {
279            if self.left == 0 {
280                return Ok(None);
281            }
282            self.left -= 1;
283            Ok(Some(RenderedFrame::default()))
284        }
285    }
286
287    #[test]
288    fn drive_runs_until_source_empty() {
289        let scene = Scene::default();
290        let fmt = SourceFormat::from_scene(&scene);
291        let mut src = CountingSource { fmt, left: 3 };
292        let mut sink = NullSink::default();
293        drive(&mut src, &mut sink).unwrap();
294        assert_eq!(sink.frames_received, 3);
295        assert!(sink.format_seen.is_some());
296    }
297
298    #[test]
299    fn rendered_source_stops_at_frame_count() {
300        // 3 frames at 30 fps = 100 ms → Finite(100).
301        let scene = Scene {
302            duration: SceneDuration::Finite(100),
303            ..Scene::default()
304        };
305        // 3 frames expected (0, 33, 66 ms; 100 ms is past the end).
306        assert_eq!(scene.frame_count(), Some(3));
307        // StubRenderer returns Unsupported, so we can't actually
308        // pull successfully — but the frame-counting bookkeeping is
309        // what matters here. Confirm via next_timestamp.
310        let src = RenderedSource::new(scene, StubRenderer);
311        assert_eq!(src.next_timestamp(), 0);
312    }
313
314    #[test]
315    fn fn_sink_forwards_to_closure() {
316        let mut count = 0u32;
317        let mut sink = FnSink::new(|_fmt, _frame| {
318            count += 1;
319            Ok(())
320        });
321        let fmt = SourceFormat::from_scene(&Scene::default());
322        sink.init(&fmt).unwrap();
323        sink.push(RenderedFrame::default()).unwrap();
324        sink.push(RenderedFrame::default()).unwrap();
325        sink.finalise().unwrap();
326        assert_eq!(count, 2);
327    }
328
329    #[test]
330    fn fn_sink_rejects_push_before_init() {
331        let mut sink = FnSink::new(|_fmt, _frame| Ok(()));
332        let err = sink.push(RenderedFrame::default()).unwrap_err();
333        assert!(matches!(err, Error::InvalidData(_)));
334    }
335}