Skip to main content

ff_stream/
rtmp.rs

1//! Frame-push RTMP output.
2//!
3//! [`RtmpOutput`] receives pre-decoded [`VideoFrame`] / [`AudioFrame`] values
4//! from the caller, encodes them with H.264/AAC, and pushes the stream to an
5//! RTMP ingest endpoint using `FFmpeg`'s built-in RTMP support.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use ff_stream::{RtmpOutput, StreamOutput};
11//!
12//! let mut out = RtmpOutput::new("rtmp://ingest.example.com/live/stream_key")
13//!     .video(1920, 1080, 30.0)
14//!     .audio(44100, 2)
15//!     .video_bitrate(4_000_000)
16//!     .audio_bitrate(128_000)
17//!     .build()?;
18//!
19//! // for each decoded frame:
20//! out.push_video(&video_frame)?;
21//! out.push_audio(&audio_frame)?;
22//!
23//! // when done:
24//! Box::new(out).finish()?;
25//! ```
26
27use ff_format::{AudioCodec, AudioFrame, VideoCodec, VideoFrame};
28
29use crate::error::StreamError;
30use crate::output::StreamOutput;
31use crate::rtmp_inner::RtmpInner;
32
33// ============================================================================
34// RtmpOutput — safe builder + StreamOutput impl
35// ============================================================================
36
37/// Live RTMP output: encodes frames and pushes them to an RTMP ingest endpoint.
38///
39/// Build with [`RtmpOutput::new`], chain setter methods, then call
40/// [`build`](Self::build) to open the `FFmpeg` context and establish the
41/// RTMP connection. After `build()`:
42///
43/// - [`push_video`](Self::push_video) and [`push_audio`](Self::push_audio) encode and
44///   transmit frames in real time.
45/// - [`StreamOutput::finish`] flushes all encoders, sends the FLV end-of-stream
46///   marker, and closes the RTMP connection.
47///
48/// RTMP/FLV requires H.264 video and AAC audio; [`build`](Self::build) returns
49/// [`StreamError::UnsupportedCodec`] for any other codec selection.
50pub struct RtmpOutput {
51    url: String,
52    video_width: Option<u32>,
53    video_height: Option<u32>,
54    fps: Option<f64>,
55    sample_rate: u32,
56    channels: u32,
57    video_codec: VideoCodec,
58    audio_codec: AudioCodec,
59    video_bitrate: u64,
60    audio_bitrate: u64,
61    inner: Option<RtmpInner>,
62    finished: bool,
63}
64
65impl RtmpOutput {
66    /// Create a new builder that streams to the given RTMP URL.
67    ///
68    /// The URL must begin with `rtmp://`; [`build`](Self::build) returns
69    /// [`StreamError::InvalidConfig`] otherwise.
70    ///
71    /// # Example
72    ///
73    /// ```ignore
74    /// use ff_stream::RtmpOutput;
75    ///
76    /// let out = RtmpOutput::new("rtmp://ingest.example.com/live/key");
77    /// ```
78    #[must_use]
79    pub fn new(url: &str) -> Self {
80        Self {
81            url: url.to_owned(),
82            video_width: None,
83            video_height: None,
84            fps: None,
85            sample_rate: 44100,
86            channels: 2,
87            video_codec: VideoCodec::H264,
88            audio_codec: AudioCodec::Aac,
89            video_bitrate: 4_000_000,
90            audio_bitrate: 128_000,
91            inner: None,
92            finished: false,
93        }
94    }
95
96    /// Set the video encoding parameters.
97    ///
98    /// This method **must** be called before [`build`](Self::build).
99    #[must_use]
100    pub fn video(mut self, width: u32, height: u32, fps: f64) -> Self {
101        self.video_width = Some(width);
102        self.video_height = Some(height);
103        self.fps = Some(fps);
104        self
105    }
106
107    /// Set the audio sample rate and channel count.
108    ///
109    /// Defaults: 44 100 Hz, 2 channels (stereo).
110    #[must_use]
111    pub fn audio(mut self, sample_rate: u32, channels: u32) -> Self {
112        self.sample_rate = sample_rate;
113        self.channels = channels;
114        self
115    }
116
117    /// Set the video codec.
118    ///
119    /// Default: [`VideoCodec::H264`]. Only `H264` is accepted by
120    /// [`build`](Self::build); any other value returns
121    /// [`StreamError::UnsupportedCodec`].
122    #[must_use]
123    pub fn video_codec(mut self, codec: VideoCodec) -> Self {
124        self.video_codec = codec;
125        self
126    }
127
128    /// Set the audio codec.
129    ///
130    /// Default: [`AudioCodec::Aac`]. Only `Aac` is accepted by
131    /// [`build`](Self::build); any other value returns
132    /// [`StreamError::UnsupportedCodec`].
133    #[must_use]
134    pub fn audio_codec(mut self, codec: AudioCodec) -> Self {
135        self.audio_codec = codec;
136        self
137    }
138
139    /// Set the video encoder target bit rate in bits/s.
140    ///
141    /// Default: 4 000 000 (4 Mbit/s).
142    #[must_use]
143    pub fn video_bitrate(mut self, bitrate: u64) -> Self {
144        self.video_bitrate = bitrate;
145        self
146    }
147
148    /// Set the audio encoder target bit rate in bits/s.
149    ///
150    /// Default: 128 000 (128 kbit/s).
151    #[must_use]
152    pub fn audio_bitrate(mut self, bitrate: u64) -> Self {
153        self.audio_bitrate = bitrate;
154        self
155    }
156
157    /// Open the `FFmpeg` context and establish the RTMP connection.
158    ///
159    /// # Errors
160    ///
161    /// Returns [`StreamError::InvalidConfig`] when:
162    /// - The URL does not start with `rtmp://`.
163    /// - [`video`](Self::video) was not called before `build`.
164    ///
165    /// Returns [`StreamError::UnsupportedCodec`] when:
166    /// - The video codec is not [`VideoCodec::H264`].
167    /// - The audio codec is not [`AudioCodec::Aac`].
168    ///
169    /// Returns [`StreamError::Ffmpeg`] when any `FFmpeg` operation fails
170    /// (including network connection errors).
171    pub fn build(mut self) -> Result<Self, StreamError> {
172        if !self.url.starts_with("rtmp://") {
173            return Err(StreamError::InvalidConfig {
174                reason: "RtmpOutput URL must start with rtmp://".into(),
175            });
176        }
177
178        let (Some(width), Some(height), Some(fps)) =
179            (self.video_width, self.video_height, self.fps)
180        else {
181            return Err(StreamError::InvalidConfig {
182                reason: "video parameters not set; call .video(width, height, fps) before .build()"
183                    .into(),
184            });
185        };
186
187        if self.video_codec != VideoCodec::H264 {
188            return Err(StreamError::UnsupportedCodec {
189                codec: format!("{:?}", self.video_codec),
190                reason: "RTMP/FLV requires H.264 video".into(),
191            });
192        }
193
194        if self.audio_codec != AudioCodec::Aac {
195            return Err(StreamError::UnsupportedCodec {
196                codec: format!("{:?}", self.audio_codec),
197                reason: "RTMP/FLV requires AAC audio".into(),
198            });
199        }
200
201        #[allow(clippy::cast_possible_truncation)]
202        let fps_int = fps.round().max(1.0) as i32;
203
204        let inner = RtmpInner::open(
205            &self.url,
206            width.cast_signed(),
207            height.cast_signed(),
208            fps_int,
209            self.video_bitrate,
210            self.sample_rate.cast_signed(),
211            self.channels.cast_signed(),
212            self.audio_bitrate.cast_signed(),
213        )?;
214
215        self.inner = Some(inner);
216        Ok(self)
217    }
218}
219
220// ============================================================================
221// StreamOutput impl
222// ============================================================================
223
224impl StreamOutput for RtmpOutput {
225    fn push_video(&mut self, frame: &VideoFrame) -> Result<(), StreamError> {
226        if self.finished {
227            return Err(StreamError::InvalidConfig {
228                reason: "push_video called after finish()".into(),
229            });
230        }
231        let inner = self
232            .inner
233            .as_mut()
234            .ok_or_else(|| StreamError::InvalidConfig {
235                reason: "push_video called before build()".into(),
236            })?;
237        inner.push_video(frame)
238    }
239
240    fn push_audio(&mut self, frame: &AudioFrame) -> Result<(), StreamError> {
241        if self.finished {
242            return Err(StreamError::InvalidConfig {
243                reason: "push_audio called after finish()".into(),
244            });
245        }
246        let inner = self
247            .inner
248            .as_mut()
249            .ok_or_else(|| StreamError::InvalidConfig {
250                reason: "push_audio called before build()".into(),
251            })?;
252        inner.push_audio(frame);
253        Ok(())
254    }
255
256    fn finish(mut self: Box<Self>) -> Result<(), StreamError> {
257        if self.finished {
258            return Ok(());
259        }
260        self.finished = true;
261        let inner = self
262            .inner
263            .take()
264            .ok_or_else(|| StreamError::InvalidConfig {
265                reason: "finish() called before build()".into(),
266            })?;
267        inner.flush_and_close();
268        Ok(())
269    }
270}
271
272// ============================================================================
273// Unit tests
274// ============================================================================
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn build_without_rtmp_scheme_should_return_invalid_config() {
282        let result = RtmpOutput::new("http://example.com/live")
283            .video(1280, 720, 30.0)
284            .build();
285        assert!(matches!(result, Err(StreamError::InvalidConfig { .. })));
286    }
287
288    #[test]
289    fn build_without_video_should_return_invalid_config() {
290        let result = RtmpOutput::new("rtmp://localhost/live/key").build();
291        assert!(matches!(result, Err(StreamError::InvalidConfig { .. })));
292    }
293
294    #[test]
295    fn build_with_non_h264_video_codec_should_return_unsupported_codec() {
296        let result = RtmpOutput::new("rtmp://localhost/live/key")
297            .video(1280, 720, 30.0)
298            .video_codec(VideoCodec::Vp9)
299            .build();
300        assert!(matches!(result, Err(StreamError::UnsupportedCodec { .. })));
301    }
302
303    #[test]
304    fn build_with_non_aac_audio_codec_should_return_unsupported_codec() {
305        let result = RtmpOutput::new("rtmp://localhost/live/key")
306            .video(1280, 720, 30.0)
307            .audio_codec(AudioCodec::Mp3)
308            .build();
309        assert!(matches!(result, Err(StreamError::UnsupportedCodec { .. })));
310    }
311
312    #[test]
313    fn video_bitrate_default_should_be_four_megabits() {
314        let out = RtmpOutput::new("rtmp://localhost/live/key");
315        assert_eq!(out.video_bitrate, 4_000_000);
316    }
317
318    #[test]
319    fn audio_defaults_should_be_44100hz_stereo() {
320        let out = RtmpOutput::new("rtmp://localhost/live/key");
321        assert_eq!(out.sample_rate, 44100);
322        assert_eq!(out.channels, 2);
323    }
324}