#![forbid(unsafe_code)]
#![allow(dead_code)]
use crate::error::{GraphError, GraphResult};
use crate::frame::FilterFrame;
use crate::node::{Node, NodeId, NodeState, NodeType};
use crate::port::{InputPort, OutputPort, PortFormat, PortId, PortType, VideoPortFormat};
#[derive(Debug, Clone)]
pub struct SplitConfig {
pub outputs: usize,
}
impl Default for SplitConfig {
fn default() -> Self {
Self { outputs: 2 }
}
}
impl SplitConfig {
#[must_use]
pub fn new(n: usize) -> Self {
Self { outputs: n.max(1) }
}
}
pub struct SplitFilter {
id: NodeId,
name: String,
state: NodeState,
inputs: Vec<InputPort>,
outputs: Vec<OutputPort>,
pending: Vec<Vec<FilterFrame>>,
}
impl SplitFilter {
#[must_use]
pub fn new(id: NodeId, name: impl Into<String>, n: usize) -> Self {
let n = n.max(1);
let video_format = PortFormat::Video(VideoPortFormat::any());
let inputs =
vec![InputPort::new(PortId(0), "input", PortType::Video)
.with_format(video_format.clone())];
let outputs: Vec<OutputPort> = (0..n)
.map(|i| {
OutputPort::new(PortId(i as u32), format!("output_{i}"), PortType::Video)
.with_format(video_format.clone())
})
.collect();
let pending = vec![Vec::new(); n];
Self {
id,
name: name.into(),
state: NodeState::Idle,
inputs,
outputs,
pending,
}
}
#[must_use]
pub fn from_config(id: NodeId, name: impl Into<String>, config: SplitConfig) -> Self {
Self::new(id, name, config.outputs)
}
#[must_use]
pub fn output_count(&self) -> usize {
self.outputs.len()
}
pub fn pop_output(&mut self, port_index: usize) -> Option<FilterFrame> {
self.pending.get_mut(port_index).and_then(|q| {
if q.is_empty() {
None
} else {
Some(q.remove(0))
}
})
}
fn fan_out(&mut self, frame: FilterFrame) {
let n = self.pending.len();
for i in 0..n {
if i + 1 < n {
self.pending[i].push(frame.clone());
} else {
self.pending[i].push(frame.clone());
}
}
}
}
impl Node for SplitFilter {
fn id(&self) -> NodeId {
self.id
}
fn name(&self) -> &str {
&self.name
}
fn node_type(&self) -> NodeType {
NodeType::Filter
}
fn state(&self) -> NodeState {
self.state
}
fn set_state(&mut self, state: NodeState) -> GraphResult<()> {
if !self.state.can_transition_to(state) {
return Err(GraphError::InvalidStateTransition {
node: self.id,
from: self.state.to_string(),
to: state.to_string(),
});
}
self.state = state;
Ok(())
}
fn inputs(&self) -> &[InputPort] {
&self.inputs
}
fn outputs(&self) -> &[OutputPort] {
&self.outputs
}
fn process(&mut self, input: Option<FilterFrame>) -> GraphResult<Option<FilterFrame>> {
match input {
None => Ok(None),
Some(frame) => {
if !frame.is_video() {
return Err(GraphError::PortTypeMismatch {
expected: "Video".to_string(),
actual: "Audio".to_string(),
});
}
self.fan_out(frame);
Ok(self.pending[0].pop())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use oximedia_codec::VideoFrame;
use oximedia_core::PixelFormat;
fn make_video_frame() -> FilterFrame {
FilterFrame::Video(VideoFrame::new(PixelFormat::Yuv420p, 1920, 1080))
}
#[test]
fn test_split_creation_default_two_outputs() {
let split = SplitFilter::new(NodeId(0), "tee", 2);
assert_eq!(split.output_count(), 2);
assert_eq!(split.outputs().len(), 2);
assert_eq!(split.inputs().len(), 1);
}
#[test]
fn test_split_creation_n_outputs() {
let split = SplitFilter::new(NodeId(1), "tee4", 4);
assert_eq!(split.output_count(), 4);
}
#[test]
fn test_split_clamps_to_minimum_one() {
let split = SplitFilter::new(NodeId(2), "tee_min", 0);
assert_eq!(split.output_count(), 1);
}
#[test]
fn test_split_from_config() {
let config = SplitConfig::new(3);
let split = SplitFilter::from_config(NodeId(0), "cfg_tee", config);
assert_eq!(split.output_count(), 3);
}
#[test]
fn test_split_process_returns_port0_frame() {
let mut split = SplitFilter::new(NodeId(0), "tee", 2);
let frame = make_video_frame();
let result = split.process(Some(frame)).expect("process should succeed");
assert!(result.is_some());
assert!(result.expect("value should exist").is_video());
}
#[test]
fn test_split_process_none_returns_none() {
let mut split = SplitFilter::new(NodeId(0), "tee", 2);
let result = split.process(None).expect("process should succeed");
assert!(result.is_none());
}
#[test]
fn test_split_pending_on_additional_outputs() {
let mut split = SplitFilter::new(NodeId(0), "tee", 3);
let frame = make_video_frame();
split.process(Some(frame)).expect("process should succeed");
assert!(split.pop_output(1).is_some());
assert!(split.pop_output(2).is_some());
assert!(split.pop_output(1).is_none());
}
#[test]
fn test_split_port_names() {
let split = SplitFilter::new(NodeId(0), "tee", 3);
assert_eq!(split.outputs()[0].name, "output_0");
assert_eq!(split.outputs()[1].name, "output_1");
assert_eq!(split.outputs()[2].name, "output_2");
}
#[test]
fn test_split_node_type_is_filter() {
let split = SplitFilter::new(NodeId(0), "tee", 2);
assert_eq!(split.node_type(), NodeType::Filter);
}
#[test]
fn test_split_state_transitions() {
let mut split = SplitFilter::new(NodeId(0), "tee", 2);
assert_eq!(split.state(), NodeState::Idle);
split
.set_state(NodeState::Processing)
.expect("state transition should succeed");
assert_eq!(split.state(), NodeState::Processing);
}
#[test]
fn test_split_audio_frame_returns_error() {
use oximedia_audio::{AudioFrame, ChannelLayout};
use oximedia_core::SampleFormat;
let mut split = SplitFilter::new(NodeId(0), "tee", 2);
let audio_frame = AudioFrame::new(SampleFormat::F32, 48000, ChannelLayout::Stereo);
let result = split.process(Some(FilterFrame::Audio(audio_frame)));
assert!(result.is_err());
}
#[test]
fn test_split_config_default() {
let config = SplitConfig::default();
assert_eq!(config.outputs, 2);
}
#[test]
fn test_split_multiple_frames_queued() {
let mut split = SplitFilter::new(NodeId(0), "tee", 2);
for _ in 0..3 {
let frame = make_video_frame();
split.process(Some(frame)).expect("process should succeed");
}
assert!(split.pop_output(1).is_some());
assert!(split.pop_output(1).is_some());
assert!(split.pop_output(1).is_some());
assert!(split.pop_output(1).is_none());
}
}