Skip to main content

oxideav_core/registry/
source.rs

1//! Generic source registry.
2//!
3//! `SourceRegistry` maps URI schemes (`file`, `http`, `rtmp`, `generate`,
4//! …) to opener functions and dispatches `open(uri)` to the right driver.
5//! A driver opens a URI as one of three shapes:
6//!
7//! * [`BytesSource`] — a `Read + Seek` byte stream that downstream code
8//!   then passes to a container demuxer (the historical shape, used by
9//!   `file://` and `http(s)://`).
10//! * [`PacketSource`] — a producer of already-demuxed [`Packet`]s. Used
11//!   by transport-layer protocols that do their own demux (RTMP, future
12//!   SRT / WebRTC). Skips the container layer entirely.
13//! * [`FrameSource`] — a producer of already-decoded [`Frame`]s. Used by
14//!   synthetic generators that emit frames natively, skipping both the
15//!   container and decoder stages.
16//!
17//! The driver picks the variant when it registers; [`SourceRegistry::open`]
18//! returns the corresponding [`SourceOutput`] enum so the pipeline
19//! executor can branch on the source shape.
20
21use std::collections::HashMap;
22use std::io::{Read, Seek};
23
24use crate::{CodecParameters, Error, Frame, Packet, Result, StreamInfo};
25
26// ───────────────────────── traits ─────────────────────────
27
28/// A seekable byte stream (`Read + Seek + Send`). Replaces the historical
29/// `Box<dyn ReadSeek>` opener-return type with a name that mirrors the
30/// other source-shape traits in this module. Blanket-implemented for
31/// every type that satisfies the bounds, so existing readers (files,
32/// `Cursor<Vec<u8>>`, HTTP-over-Range adapters) work unchanged.
33pub trait BytesSource: Read + Seek + Send {}
34impl<T: Read + Seek + Send> BytesSource for T {}
35
36/// A producer of already-demuxed [`Packet`]s.
37///
38/// Used by transport-layer protocols that perform demux themselves
39/// (RTMP, RTSP, …). The pipeline executor consumes packets directly,
40/// skipping the container-demux stage that bytes-shape sources go
41/// through.
42pub trait PacketSource: Send {
43    /// Streams advertised by this source. Stable across the lifetime of
44    /// the source.
45    fn streams(&self) -> &[StreamInfo];
46
47    /// Read the next packet from any stream. Returns [`Error::Eof`] at
48    /// end of stream.
49    fn next_packet(&mut self) -> Result<Packet>;
50
51    /// Source-level metadata as ordered (key, value) pairs. Default is
52    /// empty.
53    fn metadata(&self) -> &[(String, String)] {
54        &[]
55    }
56
57    /// Source-level duration in microseconds, if known. Default is
58    /// `None`. Live sources (RTMP push, etc.) typically return `None`.
59    fn duration_micros(&self) -> Option<i64> {
60        None
61    }
62}
63
64/// A producer of already-decoded [`Frame`]s.
65///
66/// Used by synthetic generators (testsrc, sine sweep, gradient image,
67/// …) that emit decoded frames natively. The pipeline executor consumes
68/// frames directly, skipping both the container-demux and decode stages.
69pub trait FrameSource: Send {
70    /// Codec parameters describing the frames this source emits. Stable
71    /// across the lifetime of the source. Even though the frames are
72    /// already decoded, downstream filters and encoders need the
73    /// parameter shape (sample rate / pixel format / channel layout /
74    /// frame rate / …) to configure themselves.
75    fn params(&self) -> &CodecParameters;
76
77    /// Produce the next frame. Returns [`Error::Eof`] at end of stream.
78    fn next_frame(&mut self) -> Result<Frame>;
79
80    /// Source-level metadata as ordered (key, value) pairs. Default is
81    /// empty.
82    fn metadata(&self) -> &[(String, String)] {
83        &[]
84    }
85
86    /// Source-level duration in microseconds, if known. Default is
87    /// `None`.
88    fn duration_micros(&self) -> Option<i64> {
89        None
90    }
91}
92
93/// What a [`SourceRegistry::open`] call returns. The variant is decided
94/// at driver-registration time, so callers can match on the shape and
95/// branch the pipeline accordingly.
96pub enum SourceOutput {
97    Bytes(Box<dyn BytesSource>),
98    Packets(Box<dyn PacketSource>),
99    Frames(Box<dyn FrameSource>),
100}
101
102// ───────────────────────── opener function aliases ─────────────────────────
103
104/// Opener for a [`BytesSource`] driver.
105pub type OpenBytesFn = fn(uri: &str) -> Result<Box<dyn BytesSource>>;
106
107/// Opener for a [`PacketSource`] driver.
108pub type OpenPacketsFn = fn(uri: &str) -> Result<Box<dyn PacketSource>>;
109
110/// Opener for a [`FrameSource`] driver.
111pub type OpenFramesFn = fn(uri: &str) -> Result<Box<dyn FrameSource>>;
112
113/// Internal per-scheme entry: which opener kind is registered for this
114/// scheme. Stored in a single map so [`SourceRegistry::open`] can
115/// dispatch with a single lookup, then match the variant to wrap in the
116/// returned [`SourceOutput`].
117enum OpenerEntry {
118    Bytes(OpenBytesFn),
119    Packets(OpenPacketsFn),
120    Frames(OpenFramesFn),
121}
122
123// ───────────────────────── SourceRegistry ─────────────────────────
124
125/// Registry mapping URI schemes to opener functions. Each scheme picks
126/// one of three opener kinds (bytes / packets / frames) at registration
127/// time; callers see the choice via the [`SourceOutput`] variant
128/// returned from [`open`](Self::open).
129#[derive(Default)]
130pub struct SourceRegistry {
131    schemes: HashMap<String, OpenerEntry>,
132}
133
134impl SourceRegistry {
135    /// Empty registry. Callers must register at least one driver before
136    /// calling [`open`](Self::open). The conventional minimum is the
137    /// `file` driver (provided by the `oxideav-source` crate).
138    pub fn new() -> Self {
139        Self::default()
140    }
141
142    /// Register a [`BytesSource`] opener for a scheme. Schemes are
143    /// normalised to ASCII lowercase. Replaces any prior registration
144    /// (including registrations of other opener kinds).
145    pub fn register_bytes(&mut self, scheme: &str, opener: OpenBytesFn) {
146        self.schemes
147            .insert(scheme.to_ascii_lowercase(), OpenerEntry::Bytes(opener));
148    }
149
150    /// Register a [`PacketSource`] opener for a scheme. Schemes are
151    /// normalised to ASCII lowercase. Replaces any prior registration
152    /// (including registrations of other opener kinds).
153    pub fn register_packets(&mut self, scheme: &str, opener: OpenPacketsFn) {
154        self.schemes
155            .insert(scheme.to_ascii_lowercase(), OpenerEntry::Packets(opener));
156    }
157
158    /// Register a [`FrameSource`] opener for a scheme. Schemes are
159    /// normalised to ASCII lowercase. Replaces any prior registration
160    /// (including registrations of other opener kinds).
161    pub fn register_frames(&mut self, scheme: &str, opener: OpenFramesFn) {
162        self.schemes
163            .insert(scheme.to_ascii_lowercase(), OpenerEntry::Frames(opener));
164    }
165
166    /// Open a URI. The URI's scheme determines which opener runs; bare
167    /// paths (no scheme) and unrecognised schemes both fall back to the
168    /// `file` driver if it is registered.
169    ///
170    /// Returns a [`SourceOutput`] whose variant matches the registered
171    /// opener kind: bytes-shape drivers return `SourceOutput::Bytes`,
172    /// packet-shape drivers return `SourceOutput::Packets`, and so on.
173    pub fn open(&self, uri_str: &str) -> Result<SourceOutput> {
174        let (scheme, _) = split_scheme(uri_str);
175        let scheme = scheme.to_ascii_lowercase();
176        if let Some(entry) = self.schemes.get(&scheme) {
177            return dispatch(entry, uri_str);
178        }
179        // Fall back to file driver for unknown schemes.
180        if let Some(entry) = self.schemes.get("file") {
181            return dispatch(entry, uri_str);
182        }
183        Err(Error::Unsupported(format!(
184            "no source driver for scheme '{scheme}' (URI: {uri_str})"
185        )))
186    }
187
188    /// Iterate the registered schemes (for diagnostics).
189    pub fn schemes(&self) -> impl Iterator<Item = &str> {
190        self.schemes.keys().map(|s| s.as_str())
191    }
192}
193
194fn dispatch(entry: &OpenerEntry, uri_str: &str) -> Result<SourceOutput> {
195    match entry {
196        OpenerEntry::Bytes(open) => open(uri_str).map(SourceOutput::Bytes),
197        OpenerEntry::Packets(open) => open(uri_str).map(SourceOutput::Packets),
198        OpenerEntry::Frames(open) => open(uri_str).map(SourceOutput::Frames),
199    }
200}
201
202/// Split a URI into `(scheme, rest)`. Bare paths (no scheme) report scheme
203/// `"file"` and `rest = uri`. Path-like inputs that happen to start with
204/// `c:` on Windows are treated as bare paths.
205pub(crate) fn split_scheme(uri: &str) -> (&str, &str) {
206    if let Some(idx) = uri.find(':') {
207        let (scheme, rest) = uri.split_at(idx);
208        let rest = &rest[1..]; // skip ':'
209
210        // Reject single-letter scheme that looks like a Windows drive letter.
211        if scheme.len() == 1 && scheme.chars().next().unwrap().is_ascii_alphabetic() {
212            return ("file", uri);
213        }
214
215        // Scheme must be ASCII alphanumeric / `+` / `-` / `.`, starting with a letter.
216        let valid = !scheme.is_empty()
217            && scheme.chars().next().unwrap().is_ascii_alphabetic()
218            && scheme
219                .chars()
220                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '+' | '-' | '.'));
221
222        if !valid {
223            return ("file", uri);
224        }
225
226        // Strip leading `//` from rest if present.
227        let rest = rest.strip_prefix("//").unwrap_or(rest);
228        return (scheme, rest);
229    }
230    ("file", uri)
231}
232
233// ───────────────────────── tests ─────────────────────────
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::frame::{AudioFrame, Frame};
239    use crate::packet::Packet;
240    use crate::stream::{CodecId, CodecParameters, StreamInfo};
241    use crate::time::TimeBase;
242    use std::io::{Cursor, Read};
243
244    // ---- mock BytesSource ----
245    fn open_bytes_mock(_uri: &str) -> Result<Box<dyn BytesSource>> {
246        Ok(Box::new(Cursor::new(b"hello world".to_vec())))
247    }
248
249    #[test]
250    fn register_bytes_and_open_returns_bytes_variant() {
251        let mut reg = SourceRegistry::new();
252        reg.register_bytes("mockb", open_bytes_mock);
253        let out = reg.open("mockb://anything").expect("open");
254        match out {
255            SourceOutput::Bytes(mut r) => {
256                let mut buf = String::new();
257                r.read_to_string(&mut buf).unwrap();
258                assert_eq!(buf, "hello world");
259            }
260            _ => panic!("expected SourceOutput::Bytes"),
261        }
262    }
263
264    // ---- mock PacketSource ----
265    struct MockPacketSource {
266        streams: Vec<StreamInfo>,
267        emitted: bool,
268    }
269
270    impl MockPacketSource {
271        fn new() -> Self {
272            let params = CodecParameters::audio(CodecId::new("pcm_s16le"));
273            let s = StreamInfo {
274                index: 0,
275                time_base: TimeBase::new(1, 1000),
276                duration: None,
277                start_time: None,
278                params,
279            };
280            Self {
281                streams: vec![s],
282                emitted: false,
283            }
284        }
285    }
286
287    impl PacketSource for MockPacketSource {
288        fn streams(&self) -> &[StreamInfo] {
289            &self.streams
290        }
291        fn next_packet(&mut self) -> Result<Packet> {
292            if self.emitted {
293                return Err(Error::Eof);
294            }
295            self.emitted = true;
296            Ok(Packet::new(0, TimeBase::new(1, 1000), vec![1, 2, 3, 4]))
297        }
298    }
299
300    fn open_packets_mock(_uri: &str) -> Result<Box<dyn PacketSource>> {
301        Ok(Box::new(MockPacketSource::new()))
302    }
303
304    #[test]
305    fn register_packets_and_open_returns_packets_variant() {
306        let mut reg = SourceRegistry::new();
307        reg.register_packets("mockp", open_packets_mock);
308        let out = reg.open("mockp://anything").expect("open");
309        match out {
310            SourceOutput::Packets(mut p) => {
311                assert_eq!(p.streams().len(), 1);
312                let pkt = p.next_packet().expect("first packet");
313                assert_eq!(pkt.data, vec![1, 2, 3, 4]);
314                assert!(matches!(p.next_packet(), Err(Error::Eof)));
315            }
316            _ => panic!("expected SourceOutput::Packets"),
317        }
318    }
319
320    // ---- mock FrameSource ----
321    struct MockFrameSource {
322        params: CodecParameters,
323        emitted: bool,
324    }
325
326    impl MockFrameSource {
327        fn new() -> Self {
328            Self {
329                params: CodecParameters::audio(CodecId::new("pcm_s16le")),
330                emitted: false,
331            }
332        }
333    }
334
335    impl FrameSource for MockFrameSource {
336        fn params(&self) -> &CodecParameters {
337            &self.params
338        }
339        fn next_frame(&mut self) -> Result<Frame> {
340            if self.emitted {
341                return Err(Error::Eof);
342            }
343            self.emitted = true;
344            Ok(Frame::Audio(AudioFrame {
345                samples: 1,
346                pts: Some(0),
347                data: vec![vec![0u8, 0u8]],
348            }))
349        }
350    }
351
352    fn open_frames_mock(_uri: &str) -> Result<Box<dyn FrameSource>> {
353        Ok(Box::new(MockFrameSource::new()))
354    }
355
356    #[test]
357    fn register_frames_and_open_returns_frames_variant() {
358        let mut reg = SourceRegistry::new();
359        reg.register_frames("mockf", open_frames_mock);
360        let out = reg.open("mockf://anything").expect("open");
361        match out {
362            SourceOutput::Frames(mut f) => {
363                assert_eq!(f.params().codec_id.as_str(), "pcm_s16le");
364                let frame = f.next_frame().expect("first frame");
365                match frame {
366                    Frame::Audio(a) => assert_eq!(a.samples, 1),
367                    _ => panic!("expected audio frame"),
368                }
369                assert!(matches!(f.next_frame(), Err(Error::Eof)));
370            }
371            _ => panic!("expected SourceOutput::Frames"),
372        }
373    }
374
375    #[test]
376    fn unknown_scheme_falls_back_to_file_when_registered() {
377        let mut reg = SourceRegistry::new();
378        reg.register_bytes("file", open_bytes_mock);
379        // No `foo` driver — falls through to the `file` driver.
380        let out = reg.open("foo://x").expect("fallback open");
381        assert!(matches!(out, SourceOutput::Bytes(_)));
382    }
383
384    #[test]
385    fn unknown_scheme_with_no_file_driver_errors() {
386        let reg = SourceRegistry::new();
387        let r = reg.open("nope://x");
388        assert!(matches!(r, Err(Error::Unsupported(_))));
389    }
390
391    #[test]
392    fn register_overrides_prior_kind() {
393        // Registering `mock` first as bytes then as frames should leave
394        // only the frames opener active (last write wins).
395        let mut reg = SourceRegistry::new();
396        reg.register_bytes("mock", open_bytes_mock);
397        reg.register_frames("mock", open_frames_mock);
398        let out = reg.open("mock://x").expect("open");
399        assert!(matches!(out, SourceOutput::Frames(_)));
400    }
401
402    #[test]
403    fn schemes_iterator_lists_registered() {
404        let mut reg = SourceRegistry::new();
405        reg.register_bytes("mockb", open_bytes_mock);
406        reg.register_packets("mockp", open_packets_mock);
407        reg.register_frames("mockf", open_frames_mock);
408        let mut names: Vec<&str> = reg.schemes().collect();
409        names.sort();
410        assert_eq!(names, vec!["mockb", "mockf", "mockp"]);
411    }
412}