Skip to main content

oximedia_graph/filters/video/
split.rs

1//! Video split (tee/fanout) filter.
2//!
3//! The [`SplitFilter`] duplicates an incoming video frame to multiple output
4//! ports, enabling fan-out topologies where one source feeds several
5//! downstream processing branches simultaneously.
6
7#![forbid(unsafe_code)]
8#![allow(dead_code)]
9
10use crate::error::{GraphError, GraphResult};
11use crate::frame::FilterFrame;
12use crate::node::{Node, NodeId, NodeState, NodeType};
13use crate::port::{InputPort, OutputPort, PortFormat, PortId, PortType, VideoPortFormat};
14
15/// Configuration for the [`SplitFilter`].
16#[derive(Debug, Clone)]
17pub struct SplitConfig {
18    /// Number of output ports (fan-out factor).  Must be at least 1.
19    pub outputs: usize,
20}
21
22impl Default for SplitConfig {
23    fn default() -> Self {
24        Self { outputs: 2 }
25    }
26}
27
28impl SplitConfig {
29    /// Create a split configuration with `n` output ports.
30    ///
31    /// Clamps `n` to a minimum of 1.
32    #[must_use]
33    pub fn new(n: usize) -> Self {
34        Self { outputs: n.max(1) }
35    }
36}
37
38/// A video filter that duplicates an incoming frame to multiple output ports
39/// (tee / fan-out).
40///
41/// # Ports
42///
43/// - **Input 0** (`"input"`) – the source video stream.
44/// - **Output 0 … N-1** (`"output_0"` … `"output_N-1"`) – one port per fan-out
45///   branch.  All outputs receive a clone of the same frame.
46///
47/// # Example
48///
49/// ```
50/// use oximedia_graph::filters::video::SplitFilter;
51/// use oximedia_graph::node::{Node, NodeId};
52///
53/// let split = SplitFilter::new(NodeId(0), "tee", 3);
54/// assert_eq!(split.outputs().len(), 3);
55/// ```
56pub struct SplitFilter {
57    id: NodeId,
58    name: String,
59    state: NodeState,
60    inputs: Vec<InputPort>,
61    outputs: Vec<OutputPort>,
62    /// Cloned frames waiting to be pulled on each output port.
63    ///
64    /// The outer `Vec` is indexed by output-port index.  Each inner `Vec` is a
65    /// small queue (usually 0 or 1 entries) of frames ready on that port.
66    pending: Vec<Vec<FilterFrame>>,
67}
68
69impl SplitFilter {
70    /// Create a new [`SplitFilter`] with `n` output ports.
71    #[must_use]
72    pub fn new(id: NodeId, name: impl Into<String>, n: usize) -> Self {
73        let n = n.max(1);
74        let video_format = PortFormat::Video(VideoPortFormat::any());
75
76        let inputs =
77            vec![InputPort::new(PortId(0), "input", PortType::Video)
78                .with_format(video_format.clone())];
79
80        let outputs: Vec<OutputPort> = (0..n)
81            .map(|i| {
82                OutputPort::new(PortId(i as u32), format!("output_{i}"), PortType::Video)
83                    .with_format(video_format.clone())
84            })
85            .collect();
86
87        let pending = vec![Vec::new(); n];
88
89        Self {
90            id,
91            name: name.into(),
92            state: NodeState::Idle,
93            inputs,
94            outputs,
95            pending,
96        }
97    }
98
99    /// Create a [`SplitFilter`] from a [`SplitConfig`].
100    #[must_use]
101    pub fn from_config(id: NodeId, name: impl Into<String>, config: SplitConfig) -> Self {
102        Self::new(id, name, config.outputs)
103    }
104
105    /// Number of configured output ports.
106    #[must_use]
107    pub fn output_count(&self) -> usize {
108        self.outputs.len()
109    }
110
111    /// Drain one pending frame from the given output port index.
112    ///
113    /// Returns `None` if no frame is pending on that port.
114    pub fn pop_output(&mut self, port_index: usize) -> Option<FilterFrame> {
115        self.pending.get_mut(port_index).and_then(|q| {
116            if q.is_empty() {
117                None
118            } else {
119                Some(q.remove(0))
120            }
121        })
122    }
123
124    /// Push a frame to all output port queues.
125    fn fan_out(&mut self, frame: FilterFrame) {
126        let n = self.pending.len();
127        for i in 0..n {
128            // The last port can take ownership; all prior ports get a clone.
129            if i + 1 < n {
130                self.pending[i].push(frame.clone());
131            } else {
132                self.pending[i].push(frame.clone());
133            }
134        }
135    }
136}
137
138impl Node for SplitFilter {
139    fn id(&self) -> NodeId {
140        self.id
141    }
142
143    fn name(&self) -> &str {
144        &self.name
145    }
146
147    fn node_type(&self) -> NodeType {
148        NodeType::Filter
149    }
150
151    fn state(&self) -> NodeState {
152        self.state
153    }
154
155    fn set_state(&mut self, state: NodeState) -> GraphResult<()> {
156        if !self.state.can_transition_to(state) {
157            return Err(GraphError::InvalidStateTransition {
158                node: self.id,
159                from: self.state.to_string(),
160                to: state.to_string(),
161            });
162        }
163        self.state = state;
164        Ok(())
165    }
166
167    fn inputs(&self) -> &[InputPort] {
168        &self.inputs
169    }
170
171    fn outputs(&self) -> &[OutputPort] {
172        &self.outputs
173    }
174
175    /// Process an incoming video frame by fanning it out to all output port
176    /// queues.
177    ///
178    /// Returns the frame on **output port 0** immediately so that the single
179    /// `process` return value is usable by the first downstream node.  Frames
180    /// for output ports 1..N are stored in the internal pending queues and can
181    /// be retrieved via [`SplitFilter::pop_output`].
182    fn process(&mut self, input: Option<FilterFrame>) -> GraphResult<Option<FilterFrame>> {
183        match input {
184            None => Ok(None),
185            Some(frame) => {
186                if !frame.is_video() {
187                    return Err(GraphError::PortTypeMismatch {
188                        expected: "Video".to_string(),
189                        actual: "Audio".to_string(),
190                    });
191                }
192                // Fan out to all queues.
193                self.fan_out(frame);
194
195                // Return port-0 frame immediately; others stay in queue.
196                Ok(self.pending[0].pop())
197            }
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use oximedia_codec::VideoFrame;
206    use oximedia_core::PixelFormat;
207
208    fn make_video_frame() -> FilterFrame {
209        FilterFrame::Video(VideoFrame::new(PixelFormat::Yuv420p, 1920, 1080))
210    }
211
212    #[test]
213    fn test_split_creation_default_two_outputs() {
214        let split = SplitFilter::new(NodeId(0), "tee", 2);
215        assert_eq!(split.output_count(), 2);
216        assert_eq!(split.outputs().len(), 2);
217        assert_eq!(split.inputs().len(), 1);
218    }
219
220    #[test]
221    fn test_split_creation_n_outputs() {
222        let split = SplitFilter::new(NodeId(1), "tee4", 4);
223        assert_eq!(split.output_count(), 4);
224    }
225
226    #[test]
227    fn test_split_clamps_to_minimum_one() {
228        let split = SplitFilter::new(NodeId(2), "tee_min", 0);
229        assert_eq!(split.output_count(), 1);
230    }
231
232    #[test]
233    fn test_split_from_config() {
234        let config = SplitConfig::new(3);
235        let split = SplitFilter::from_config(NodeId(0), "cfg_tee", config);
236        assert_eq!(split.output_count(), 3);
237    }
238
239    #[test]
240    fn test_split_process_returns_port0_frame() {
241        let mut split = SplitFilter::new(NodeId(0), "tee", 2);
242        let frame = make_video_frame();
243        let result = split.process(Some(frame)).expect("process should succeed");
244        assert!(result.is_some());
245        assert!(result.expect("value should exist").is_video());
246    }
247
248    #[test]
249    fn test_split_process_none_returns_none() {
250        let mut split = SplitFilter::new(NodeId(0), "tee", 2);
251        let result = split.process(None).expect("process should succeed");
252        assert!(result.is_none());
253    }
254
255    #[test]
256    fn test_split_pending_on_additional_outputs() {
257        let mut split = SplitFilter::new(NodeId(0), "tee", 3);
258        let frame = make_video_frame();
259        split.process(Some(frame)).expect("process should succeed");
260        // Port 1 and port 2 should have a pending frame each.
261        assert!(split.pop_output(1).is_some());
262        assert!(split.pop_output(2).is_some());
263        // No more frames pending.
264        assert!(split.pop_output(1).is_none());
265    }
266
267    #[test]
268    fn test_split_port_names() {
269        let split = SplitFilter::new(NodeId(0), "tee", 3);
270        assert_eq!(split.outputs()[0].name, "output_0");
271        assert_eq!(split.outputs()[1].name, "output_1");
272        assert_eq!(split.outputs()[2].name, "output_2");
273    }
274
275    #[test]
276    fn test_split_node_type_is_filter() {
277        let split = SplitFilter::new(NodeId(0), "tee", 2);
278        assert_eq!(split.node_type(), NodeType::Filter);
279    }
280
281    #[test]
282    fn test_split_state_transitions() {
283        let mut split = SplitFilter::new(NodeId(0), "tee", 2);
284        assert_eq!(split.state(), NodeState::Idle);
285        split
286            .set_state(NodeState::Processing)
287            .expect("state transition should succeed");
288        assert_eq!(split.state(), NodeState::Processing);
289    }
290
291    #[test]
292    fn test_split_audio_frame_returns_error() {
293        use oximedia_audio::{AudioFrame, ChannelLayout};
294        use oximedia_core::SampleFormat;
295        let mut split = SplitFilter::new(NodeId(0), "tee", 2);
296        let audio_frame = AudioFrame::new(SampleFormat::F32, 48000, ChannelLayout::Stereo);
297        let result = split.process(Some(FilterFrame::Audio(audio_frame)));
298        assert!(result.is_err());
299    }
300
301    #[test]
302    fn test_split_config_default() {
303        let config = SplitConfig::default();
304        assert_eq!(config.outputs, 2);
305    }
306
307    #[test]
308    fn test_split_multiple_frames_queued() {
309        let mut split = SplitFilter::new(NodeId(0), "tee", 2);
310        for _ in 0..3 {
311            let frame = make_video_frame();
312            split.process(Some(frame)).expect("process should succeed");
313        }
314        // Port 1 should have 3 pending frames.
315        assert!(split.pop_output(1).is_some());
316        assert!(split.pop_output(1).is_some());
317        assert!(split.pop_output(1).is_some());
318        assert!(split.pop_output(1).is_none());
319    }
320}