1use ff_format::{AudioFrame, VideoFrame};
34
35use crate::error::StreamError;
36use crate::output::StreamOutput;
37
38pub struct FanoutOutput {
54 targets: Vec<Box<dyn StreamOutput>>,
55}
56
57impl FanoutOutput {
58 #[must_use]
68 pub fn new(targets: Vec<Box<dyn StreamOutput>>) -> Self {
69 Self { targets }
70 }
71}
72
73impl StreamOutput for FanoutOutput {
78 fn push_video(&mut self, frame: &VideoFrame) -> Result<(), StreamError> {
79 let total = self.targets.len();
80 let mut errors: Vec<(usize, StreamError)> = Vec::new();
81
82 for (i, target) in self.targets.iter_mut().enumerate() {
83 if let Err(e) = target.push_video(frame) {
84 errors.push((i, e));
85 }
86 }
87
88 if errors.is_empty() {
89 Ok(())
90 } else {
91 Err(StreamError::FanoutFailure {
92 failed: errors.len(),
93 total,
94 messages: errors
95 .into_iter()
96 .map(|(i, e)| format!("target[{i}]: {e}"))
97 .collect(),
98 })
99 }
100 }
101
102 fn push_audio(&mut self, frame: &AudioFrame) -> Result<(), StreamError> {
103 let total = self.targets.len();
104 let mut errors: Vec<(usize, StreamError)> = Vec::new();
105
106 for (i, target) in self.targets.iter_mut().enumerate() {
107 if let Err(e) = target.push_audio(frame) {
108 errors.push((i, e));
109 }
110 }
111
112 if errors.is_empty() {
113 Ok(())
114 } else {
115 Err(StreamError::FanoutFailure {
116 failed: errors.len(),
117 total,
118 messages: errors
119 .into_iter()
120 .map(|(i, e)| format!("target[{i}]: {e}"))
121 .collect(),
122 })
123 }
124 }
125
126 fn finish(self: Box<Self>) -> Result<(), StreamError> {
127 let total = self.targets.len();
128 let mut errors: Vec<(usize, StreamError)> = Vec::new();
129
130 for (i, target) in self.targets.into_iter().enumerate() {
131 if let Err(e) = target.finish() {
132 errors.push((i, e));
133 }
134 }
135
136 if errors.is_empty() {
137 Ok(())
138 } else {
139 Err(StreamError::FanoutFailure {
140 failed: errors.len(),
141 total,
142 messages: errors
143 .into_iter()
144 .map(|(i, e)| format!("target[{i}]: {e}"))
145 .collect(),
146 })
147 }
148 }
149}
150
151#[cfg(test)]
156mod tests {
157 use super::*;
158 use ff_format::{PixelFormat, SampleFormat, VideoFrame};
159
160 struct OkOutput;
163
164 impl StreamOutput for OkOutput {
165 fn push_video(&mut self, _frame: &VideoFrame) -> Result<(), StreamError> {
166 Ok(())
167 }
168
169 fn push_audio(&mut self, _frame: &ff_format::AudioFrame) -> Result<(), StreamError> {
170 Ok(())
171 }
172
173 fn finish(self: Box<Self>) -> Result<(), StreamError> {
174 Ok(())
175 }
176 }
177
178 struct FailOutput;
179
180 impl StreamOutput for FailOutput {
181 fn push_video(&mut self, _frame: &VideoFrame) -> Result<(), StreamError> {
182 Err(StreamError::InvalidConfig {
183 reason: "forced failure".into(),
184 })
185 }
186
187 fn push_audio(&mut self, _frame: &ff_format::AudioFrame) -> Result<(), StreamError> {
188 Err(StreamError::InvalidConfig {
189 reason: "forced failure".into(),
190 })
191 }
192
193 fn finish(self: Box<Self>) -> Result<(), StreamError> {
194 Err(StreamError::InvalidConfig {
195 reason: "forced failure".into(),
196 })
197 }
198 }
199
200 fn dummy_video_frame() -> VideoFrame {
201 VideoFrame::empty(4, 4, PixelFormat::Yuv420p).expect("dummy frame allocation failed")
202 }
203
204 fn dummy_audio_frame() -> ff_format::AudioFrame {
205 use ff_format::AudioFrame;
206 AudioFrame::empty(1024, 2, 44100, SampleFormat::F32p)
207 .expect("dummy audio frame allocation failed")
208 }
209
210 #[test]
213 fn push_video_all_succeed_should_return_ok() {
214 let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
215 let frame = dummy_video_frame();
216 assert!(fanout.push_video(&frame).is_ok());
217 }
218
219 #[test]
220 fn push_video_one_fails_should_return_fanout_failure() {
221 let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
222 let frame = dummy_video_frame();
223 let result = fanout.push_video(&frame);
224
225 match result {
226 Err(StreamError::FanoutFailure {
227 failed,
228 total,
229 messages,
230 }) => {
231 assert_eq!(failed, 1, "expected 1 failure");
232 assert_eq!(total, 2, "expected total=2");
233 assert_eq!(messages.len(), 1);
234 assert!(messages[0].contains("target[1]"), "got: {:?}", messages);
235 }
236 other => panic!("expected FanoutFailure, got: {other:?}"),
237 }
238 }
239
240 #[test]
241 fn push_audio_all_succeed_should_return_ok() {
242 let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
243 let frame = dummy_audio_frame();
244 assert!(fanout.push_audio(&frame).is_ok());
245 }
246
247 #[test]
248 fn push_audio_one_fails_should_return_fanout_failure() {
249 let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
250 let frame = dummy_audio_frame();
251 let result = fanout.push_audio(&frame);
252 assert!(
253 matches!(
254 result,
255 Err(StreamError::FanoutFailure {
256 failed: 1,
257 total: 2,
258 ..
259 })
260 ),
261 "got: {result:?}"
262 );
263 }
264
265 #[test]
266 fn finish_all_succeed_should_return_ok() {
267 let fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
268 assert!(Box::new(fanout).finish().is_ok());
269 }
270
271 #[test]
272 fn finish_one_fails_should_return_fanout_failure() {
273 let fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
274 let result = Box::new(fanout).finish();
275 assert!(
276 matches!(
277 result,
278 Err(StreamError::FanoutFailure {
279 failed: 1,
280 total: 2,
281 ..
282 })
283 ),
284 "got: {result:?}"
285 );
286 }
287
288 #[test]
289 fn new_with_empty_targets_push_video_should_return_ok() {
290 let mut fanout = FanoutOutput::new(vec![]);
291 let frame = dummy_video_frame();
292 assert!(fanout.push_video(&frame).is_ok());
293 }
294}