Skip to main content

ff_stream/
fanout.rs

1//! Multi-target fan-out wrapper.
2//!
3//! [`FanoutOutput`] delivers each frame to multiple [`StreamOutput`] targets
4//! simultaneously. When one or more targets fail, all remaining targets in the
5//! list still receive the frame before any error is returned.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use ff_stream::{FanoutOutput, LiveHlsOutput, RtmpOutput, StreamOutput};
11//!
12//! let hls = LiveHlsOutput::new("/var/www/live")
13//!     .video(1280, 720, 30.0)
14//!     .build()?;
15//!
16//! let rtmp = RtmpOutput::new("rtmp://ingest.example.com/live/key")
17//!     .video(1280, 720, 30.0)
18//!     .build()?;
19//!
20//! let mut fanout = FanoutOutput::new(vec![
21//!     Box::new(hls),
22//!     Box::new(rtmp),
23//! ]);
24//!
25//! // for each decoded frame:
26//! fanout.push_video(&video_frame)?;
27//! fanout.push_audio(&audio_frame)?;
28//!
29//! // when done:
30//! Box::new(fanout).finish()?;
31//! ```
32
33use ff_format::{AudioFrame, VideoFrame};
34
35use crate::error::StreamError;
36use crate::output::StreamOutput;
37
38// ============================================================================
39// FanoutOutput — safe multi-target wrapper
40// ============================================================================
41
42/// A [`StreamOutput`] wrapper that fans frames out to multiple targets.
43///
44/// Create with [`FanoutOutput::new`], passing a `Vec<Box<dyn StreamOutput>>`.
45/// Every call to [`push_video`](Self::push_video), [`push_audio`](Self::push_audio),
46/// or [`finish`](StreamOutput::finish) is forwarded to **all** targets in order.
47///
48/// ## Failure behaviour
49///
50/// When one or more targets return an error, the remaining targets still
51/// receive the frame. All errors are collected and returned as a single
52/// [`StreamError::FanoutFailure`].
53pub struct FanoutOutput {
54    targets: Vec<Box<dyn StreamOutput>>,
55}
56
57impl FanoutOutput {
58    /// Create a new fanout output that delivers frames to all `targets`.
59    ///
60    /// # Example
61    ///
62    /// ```ignore
63    /// use ff_stream::{FanoutOutput, LiveHlsOutput, StreamOutput};
64    ///
65    /// let out = FanoutOutput::new(vec![Box::new(hls_output)]);
66    /// ```
67    #[must_use]
68    pub fn new(targets: Vec<Box<dyn StreamOutput>>) -> Self {
69        Self { targets }
70    }
71}
72
73// ============================================================================
74// StreamOutput impl
75// ============================================================================
76
77impl StreamOutput for FanoutOutput {
78    fn push_video(&mut self, frame: &VideoFrame) -> Result<(), StreamError> {
79        let total = self.targets.len();
80        let mut errors: Vec<(usize, StreamError)> = Vec::new();
81
82        for (i, target) in self.targets.iter_mut().enumerate() {
83            if let Err(e) = target.push_video(frame) {
84                errors.push((i, e));
85            }
86        }
87
88        if errors.is_empty() {
89            Ok(())
90        } else {
91            Err(StreamError::FanoutFailure {
92                failed: errors.len(),
93                total,
94                messages: errors
95                    .into_iter()
96                    .map(|(i, e)| format!("target[{i}]: {e}"))
97                    .collect(),
98            })
99        }
100    }
101
102    fn push_audio(&mut self, frame: &AudioFrame) -> Result<(), StreamError> {
103        let total = self.targets.len();
104        let mut errors: Vec<(usize, StreamError)> = Vec::new();
105
106        for (i, target) in self.targets.iter_mut().enumerate() {
107            if let Err(e) = target.push_audio(frame) {
108                errors.push((i, e));
109            }
110        }
111
112        if errors.is_empty() {
113            Ok(())
114        } else {
115            Err(StreamError::FanoutFailure {
116                failed: errors.len(),
117                total,
118                messages: errors
119                    .into_iter()
120                    .map(|(i, e)| format!("target[{i}]: {e}"))
121                    .collect(),
122            })
123        }
124    }
125
126    fn finish(self: Box<Self>) -> Result<(), StreamError> {
127        let total = self.targets.len();
128        let mut errors: Vec<(usize, StreamError)> = Vec::new();
129
130        for (i, target) in self.targets.into_iter().enumerate() {
131            if let Err(e) = target.finish() {
132                errors.push((i, e));
133            }
134        }
135
136        if errors.is_empty() {
137            Ok(())
138        } else {
139            Err(StreamError::FanoutFailure {
140                failed: errors.len(),
141                total,
142                messages: errors
143                    .into_iter()
144                    .map(|(i, e)| format!("target[{i}]: {e}"))
145                    .collect(),
146            })
147        }
148    }
149}
150
151// ============================================================================
152// Unit tests
153// ============================================================================
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use ff_format::{PixelFormat, SampleFormat, VideoFrame};
159
160    // ── Mock helpers ────────────────────────────────────────────────────────
161
162    struct OkOutput;
163
164    impl StreamOutput for OkOutput {
165        fn push_video(&mut self, _frame: &VideoFrame) -> Result<(), StreamError> {
166            Ok(())
167        }
168
169        fn push_audio(&mut self, _frame: &ff_format::AudioFrame) -> Result<(), StreamError> {
170            Ok(())
171        }
172
173        fn finish(self: Box<Self>) -> Result<(), StreamError> {
174            Ok(())
175        }
176    }
177
178    struct FailOutput;
179
180    impl StreamOutput for FailOutput {
181        fn push_video(&mut self, _frame: &VideoFrame) -> Result<(), StreamError> {
182            Err(StreamError::InvalidConfig {
183                reason: "forced failure".into(),
184            })
185        }
186
187        fn push_audio(&mut self, _frame: &ff_format::AudioFrame) -> Result<(), StreamError> {
188            Err(StreamError::InvalidConfig {
189                reason: "forced failure".into(),
190            })
191        }
192
193        fn finish(self: Box<Self>) -> Result<(), StreamError> {
194            Err(StreamError::InvalidConfig {
195                reason: "forced failure".into(),
196            })
197        }
198    }
199
200    fn dummy_video_frame() -> VideoFrame {
201        VideoFrame::empty(4, 4, PixelFormat::Yuv420p).expect("dummy frame allocation failed")
202    }
203
204    fn dummy_audio_frame() -> ff_format::AudioFrame {
205        use ff_format::AudioFrame;
206        AudioFrame::empty(1024, 2, 44100, SampleFormat::F32p)
207            .expect("dummy audio frame allocation failed")
208    }
209
210    // ── Tests ────────────────────────────────────────────────────────────────
211
212    #[test]
213    fn push_video_all_succeed_should_return_ok() {
214        let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
215        let frame = dummy_video_frame();
216        assert!(fanout.push_video(&frame).is_ok());
217    }
218
219    #[test]
220    fn push_video_one_fails_should_return_fanout_failure() {
221        let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
222        let frame = dummy_video_frame();
223        let result = fanout.push_video(&frame);
224
225        match result {
226            Err(StreamError::FanoutFailure {
227                failed,
228                total,
229                messages,
230            }) => {
231                assert_eq!(failed, 1, "expected 1 failure");
232                assert_eq!(total, 2, "expected total=2");
233                assert_eq!(messages.len(), 1);
234                assert!(messages[0].contains("target[1]"), "got: {:?}", messages);
235            }
236            other => panic!("expected FanoutFailure, got: {other:?}"),
237        }
238    }
239
240    #[test]
241    fn push_audio_all_succeed_should_return_ok() {
242        let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
243        let frame = dummy_audio_frame();
244        assert!(fanout.push_audio(&frame).is_ok());
245    }
246
247    #[test]
248    fn push_audio_one_fails_should_return_fanout_failure() {
249        let mut fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
250        let frame = dummy_audio_frame();
251        let result = fanout.push_audio(&frame);
252        assert!(
253            matches!(
254                result,
255                Err(StreamError::FanoutFailure {
256                    failed: 1,
257                    total: 2,
258                    ..
259                })
260            ),
261            "got: {result:?}"
262        );
263    }
264
265    #[test]
266    fn finish_all_succeed_should_return_ok() {
267        let fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(OkOutput)]);
268        assert!(Box::new(fanout).finish().is_ok());
269    }
270
271    #[test]
272    fn finish_one_fails_should_return_fanout_failure() {
273        let fanout = FanoutOutput::new(vec![Box::new(OkOutput), Box::new(FailOutput)]);
274        let result = Box::new(fanout).finish();
275        assert!(
276            matches!(
277                result,
278                Err(StreamError::FanoutFailure {
279                    failed: 1,
280                    total: 2,
281                    ..
282                })
283            ),
284            "got: {result:?}"
285        );
286    }
287
288    #[test]
289    fn new_with_empty_targets_push_video_should_return_ok() {
290        let mut fanout = FanoutOutput::new(vec![]);
291        let frame = dummy_video_frame();
292        assert!(fanout.push_video(&frame).is_ok());
293    }
294}