use ff_format::{AudioFrame, VideoFrame};
use crate::error::StreamError;
use crate::output::StreamOutput;
pub struct FanoutOutput {
targets: Vec<Box<dyn StreamOutput>>,
}
impl FanoutOutput {
#[must_use]
pub fn new(targets: Vec<Box<dyn StreamOutput>>) -> Self {
Self { targets }
}
}
impl StreamOutput for FanoutOutput {
fn push_video(&mut self, frame: &VideoFrame) -> Result<(), StreamError> {
let total = self.targets.len();
let mut errors: Vec<(usize, StreamError)> = Vec::new();
for (i, target) in self.targets.iter_mut().enumerate() {
if let Err(e) = target.push_video(frame) {
errors.push((i, e));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(StreamError::FanoutFailure {
failed: errors.len(),
total,
messages: errors
.into_iter()
.map(|(i, e)| format!("target[{i}]: {e}"))
.collect(),
})
}
}
fn push_audio(&mut self, frame: &AudioFrame) -> Result<(), StreamError> {
let total = self.targets.len();
let mut errors: Vec<(usize, StreamError)> = Vec::new();
for (i, target) in self.targets.iter_mut().enumerate() {
if let Err(e) = target.push_audio(frame) {
errors.push((i, e));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(StreamError::FanoutFailure {
failed: errors.len(),
total,
messages: errors
.into_iter()
.map(|(i, e)| format!("target[{i}]: {e}"))
.collect(),
})
}
}
fn finish(self: Box<Self>) -> Result<(), StreamError> {
let total = self.targets.len();
let mut errors: Vec<(usize, StreamError)> = Vec::new();
for (i, target) in self.targets.into_iter().enumerate() {
if let Err(e) = target.finish() {
errors.push((i, e));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(StreamError::FanoutFailure {
failed: errors.len(),
total,
messages: errors
.into_iter()
.map(|(i, e)| format!("target[{i}]: {e}"))
.collect(),
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ff_format::{PixelFormat, SampleFormat, VideoFrame};
struct OkOutput;
impl StreamOutput for OkOutput {
fn push_video(&mut self, _frame: &VideoFrame) -> Result<(), StreamError> {
Ok(())
}
fn push_audio(&mut self, _frame: &ff_format::AudioFrame) -> Result<(), StreamError> {
Ok(())
}
fn finish(self: Box<Self>) -> Result<(), StreamError> {
Ok(())
}
}
struct FailOutput;
impl StreamOutput for FailOutput {
fn push_video(&mut self, _frame: &VideoFrame) -> Result<(), StreamError> {
Err(StreamError::InvalidConfig {
reason: "forced failure".into(),
})
}
fn push_audio(&mut self, _frame: &ff_format::AudioFrame) -> Result<(), StreamError> {
Err(StreamError::InvalidConfig {
reason: "forced failure".into(),
})
}
fn finish(self: Box<Self>) -> Result<(), StreamError> {
Err(StreamError::InvalidConfig {
reason: "forced failure".into(),
})
}
}
fn dummy_video_frame() -> VideoFrame {
VideoFrame::empty(4, 4, PixelFormat::Yuv420p).expect("dummy frame allocation failed")
}
fn dummy_audio_frame() -> ff_format::AudioFrame {
use ff_format::AudioFrame;
AudioFrame::empty(1024, 2, 44100, SampleFormat::F32p)
.expect("dummy audio frame allocation failed")
}
#[test]
fn push_video_all_succeed_should_return_ok() {
let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
let frame = dummy_video_frame();
assert!(fanout.push_video(&frame).is_ok());
}
#[test]
fn push_video_one_fails_should_return_fanout_failure() {
let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
let frame = dummy_video_frame();
let result = fanout.push_video(&frame);
match result {
Err(StreamError::FanoutFailure {
failed,
total,
messages,
}) => {
assert_eq!(failed, 1, "expected 1 failure");
assert_eq!(total, 2, "expected total=2");
assert_eq!(messages.len(), 1);
assert!(messages[0].contains("target[1]"), "got: {:?}", messages);
}
other => panic!("expected FanoutFailure, got: {other:?}"),
}
}
#[test]
fn push_audio_all_succeed_should_return_ok() {
let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
let frame = dummy_audio_frame();
assert!(fanout.push_audio(&frame).is_ok());
}
#[test]
fn push_audio_one_fails_should_return_fanout_failure() {
let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
let frame = dummy_audio_frame();
let result = fanout.push_audio(&frame);
assert!(
matches!(
result,
Err(StreamError::FanoutFailure {
failed: 1,
total: 2,
..
})
),
"got: {result:?}"
);
}
#[test]
fn finish_all_succeed_should_return_ok() {
let fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
assert!(Box::new(fanout).finish().is_ok());
}
#[test]
fn finish_one_fails_should_return_fanout_failure() {
let fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
let result = Box::new(fanout).finish();
assert!(
matches!(
result,
Err(StreamError::FanoutFailure {
failed: 1,
total: 2,
..
})
),
"got: {result:?}"
);
}
#[test]
fn new_with_empty_targets_push_video_should_return_ok() {
let mut fanout = FanoutOutput::new(vec![]);
let frame = dummy_video_frame();
assert!(fanout.push_video(&frame).is_ok());
}
}