oximedia_graph/filters/video/
split.rs1#![forbid(unsafe_code)]
8#![allow(dead_code)]
9
10use crate::error::{GraphError, GraphResult};
11use crate::frame::FilterFrame;
12use crate::node::{Node, NodeId, NodeState, NodeType};
13use crate::port::{InputPort, OutputPort, PortFormat, PortId, PortType, VideoPortFormat};
14
15#[derive(Debug, Clone)]
17pub struct SplitConfig {
18 pub outputs: usize,
20}
21
22impl Default for SplitConfig {
23 fn default() -> Self {
24 Self { outputs: 2 }
25 }
26}
27
28impl SplitConfig {
29 #[must_use]
33 pub fn new(n: usize) -> Self {
34 Self { outputs: n.max(1) }
35 }
36}
37
38pub struct SplitFilter {
57 id: NodeId,
58 name: String,
59 state: NodeState,
60 inputs: Vec<InputPort>,
61 outputs: Vec<OutputPort>,
62 pending: Vec<Vec<FilterFrame>>,
67}
68
69impl SplitFilter {
70 #[must_use]
72 pub fn new(id: NodeId, name: impl Into<String>, n: usize) -> Self {
73 let n = n.max(1);
74 let video_format = PortFormat::Video(VideoPortFormat::any());
75
76 let inputs =
77 vec![InputPort::new(PortId(0), "input", PortType::Video)
78 .with_format(video_format.clone())];
79
80 let outputs: Vec<OutputPort> = (0..n)
81 .map(|i| {
82 OutputPort::new(PortId(i as u32), format!("output_{i}"), PortType::Video)
83 .with_format(video_format.clone())
84 })
85 .collect();
86
87 let pending = vec![Vec::new(); n];
88
89 Self {
90 id,
91 name: name.into(),
92 state: NodeState::Idle,
93 inputs,
94 outputs,
95 pending,
96 }
97 }
98
99 #[must_use]
101 pub fn from_config(id: NodeId, name: impl Into<String>, config: SplitConfig) -> Self {
102 Self::new(id, name, config.outputs)
103 }
104
105 #[must_use]
107 pub fn output_count(&self) -> usize {
108 self.outputs.len()
109 }
110
111 pub fn pop_output(&mut self, port_index: usize) -> Option<FilterFrame> {
115 self.pending.get_mut(port_index).and_then(|q| {
116 if q.is_empty() {
117 None
118 } else {
119 Some(q.remove(0))
120 }
121 })
122 }
123
124 fn fan_out(&mut self, frame: FilterFrame) {
126 let n = self.pending.len();
127 for i in 0..n {
128 if i + 1 < n {
130 self.pending[i].push(frame.clone());
131 } else {
132 self.pending[i].push(frame.clone());
133 }
134 }
135 }
136}
137
138impl Node for SplitFilter {
139 fn id(&self) -> NodeId {
140 self.id
141 }
142
143 fn name(&self) -> &str {
144 &self.name
145 }
146
147 fn node_type(&self) -> NodeType {
148 NodeType::Filter
149 }
150
151 fn state(&self) -> NodeState {
152 self.state
153 }
154
155 fn set_state(&mut self, state: NodeState) -> GraphResult<()> {
156 if !self.state.can_transition_to(state) {
157 return Err(GraphError::InvalidStateTransition {
158 node: self.id,
159 from: self.state.to_string(),
160 to: state.to_string(),
161 });
162 }
163 self.state = state;
164 Ok(())
165 }
166
167 fn inputs(&self) -> &[InputPort] {
168 &self.inputs
169 }
170
171 fn outputs(&self) -> &[OutputPort] {
172 &self.outputs
173 }
174
175 fn process(&mut self, input: Option<FilterFrame>) -> GraphResult<Option<FilterFrame>> {
183 match input {
184 None => Ok(None),
185 Some(frame) => {
186 if !frame.is_video() {
187 return Err(GraphError::PortTypeMismatch {
188 expected: "Video".to_string(),
189 actual: "Audio".to_string(),
190 });
191 }
192 self.fan_out(frame);
194
195 Ok(self.pending[0].pop())
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use oximedia_codec::VideoFrame;
206 use oximedia_core::PixelFormat;
207
208 fn make_video_frame() -> FilterFrame {
209 FilterFrame::Video(VideoFrame::new(PixelFormat::Yuv420p, 1920, 1080))
210 }
211
212 #[test]
213 fn test_split_creation_default_two_outputs() {
214 let split = SplitFilter::new(NodeId(0), "tee", 2);
215 assert_eq!(split.output_count(), 2);
216 assert_eq!(split.outputs().len(), 2);
217 assert_eq!(split.inputs().len(), 1);
218 }
219
220 #[test]
221 fn test_split_creation_n_outputs() {
222 let split = SplitFilter::new(NodeId(1), "tee4", 4);
223 assert_eq!(split.output_count(), 4);
224 }
225
226 #[test]
227 fn test_split_clamps_to_minimum_one() {
228 let split = SplitFilter::new(NodeId(2), "tee_min", 0);
229 assert_eq!(split.output_count(), 1);
230 }
231
232 #[test]
233 fn test_split_from_config() {
234 let config = SplitConfig::new(3);
235 let split = SplitFilter::from_config(NodeId(0), "cfg_tee", config);
236 assert_eq!(split.output_count(), 3);
237 }
238
239 #[test]
240 fn test_split_process_returns_port0_frame() {
241 let mut split = SplitFilter::new(NodeId(0), "tee", 2);
242 let frame = make_video_frame();
243 let result = split.process(Some(frame)).expect("process should succeed");
244 assert!(result.is_some());
245 assert!(result.expect("value should exist").is_video());
246 }
247
248 #[test]
249 fn test_split_process_none_returns_none() {
250 let mut split = SplitFilter::new(NodeId(0), "tee", 2);
251 let result = split.process(None).expect("process should succeed");
252 assert!(result.is_none());
253 }
254
255 #[test]
256 fn test_split_pending_on_additional_outputs() {
257 let mut split = SplitFilter::new(NodeId(0), "tee", 3);
258 let frame = make_video_frame();
259 split.process(Some(frame)).expect("process should succeed");
260 assert!(split.pop_output(1).is_some());
262 assert!(split.pop_output(2).is_some());
263 assert!(split.pop_output(1).is_none());
265 }
266
267 #[test]
268 fn test_split_port_names() {
269 let split = SplitFilter::new(NodeId(0), "tee", 3);
270 assert_eq!(split.outputs()[0].name, "output_0");
271 assert_eq!(split.outputs()[1].name, "output_1");
272 assert_eq!(split.outputs()[2].name, "output_2");
273 }
274
275 #[test]
276 fn test_split_node_type_is_filter() {
277 let split = SplitFilter::new(NodeId(0), "tee", 2);
278 assert_eq!(split.node_type(), NodeType::Filter);
279 }
280
281 #[test]
282 fn test_split_state_transitions() {
283 let mut split = SplitFilter::new(NodeId(0), "tee", 2);
284 assert_eq!(split.state(), NodeState::Idle);
285 split
286 .set_state(NodeState::Processing)
287 .expect("state transition should succeed");
288 assert_eq!(split.state(), NodeState::Processing);
289 }
290
291 #[test]
292 fn test_split_audio_frame_returns_error() {
293 use oximedia_audio::{AudioFrame, ChannelLayout};
294 use oximedia_core::SampleFormat;
295 let mut split = SplitFilter::new(NodeId(0), "tee", 2);
296 let audio_frame = AudioFrame::new(SampleFormat::F32, 48000, ChannelLayout::Stereo);
297 let result = split.process(Some(FilterFrame::Audio(audio_frame)));
298 assert!(result.is_err());
299 }
300
301 #[test]
302 fn test_split_config_default() {
303 let config = SplitConfig::default();
304 assert_eq!(config.outputs, 2);
305 }
306
307 #[test]
308 fn test_split_multiple_frames_queued() {
309 let mut split = SplitFilter::new(NodeId(0), "tee", 2);
310 for _ in 0..3 {
311 let frame = make_video_frame();
312 split.process(Some(frame)).expect("process should succeed");
313 }
314 assert!(split.pop_output(1).is_some());
316 assert!(split.pop_output(1).is_some());
317 assert!(split.pop_output(1).is_some());
318 assert!(split.pop_output(1).is_none());
319 }
320}