1use ff_format::{AudioCodec, AudioFrame, VideoCodec, VideoFrame};
28
29use crate::error::StreamError;
30use crate::output::StreamOutput;
31use crate::rtmp_inner::RtmpInner;
32
33pub struct RtmpOutput {
51 url: String,
52 video_width: Option<u32>,
53 video_height: Option<u32>,
54 fps: Option<f64>,
55 sample_rate: u32,
56 channels: u32,
57 video_codec: VideoCodec,
58 audio_codec: AudioCodec,
59 video_bitrate: u64,
60 audio_bitrate: u64,
61 inner: Option<RtmpInner>,
62 finished: bool,
63}
64
65impl RtmpOutput {
66 #[must_use]
79 pub fn new(url: &str) -> Self {
80 Self {
81 url: url.to_owned(),
82 video_width: None,
83 video_height: None,
84 fps: None,
85 sample_rate: 44100,
86 channels: 2,
87 video_codec: VideoCodec::H264,
88 audio_codec: AudioCodec::Aac,
89 video_bitrate: 4_000_000,
90 audio_bitrate: 128_000,
91 inner: None,
92 finished: false,
93 }
94 }
95
96 #[must_use]
100 pub fn video(mut self, width: u32, height: u32, fps: f64) -> Self {
101 self.video_width = Some(width);
102 self.video_height = Some(height);
103 self.fps = Some(fps);
104 self
105 }
106
107 #[must_use]
111 pub fn audio(mut self, sample_rate: u32, channels: u32) -> Self {
112 self.sample_rate = sample_rate;
113 self.channels = channels;
114 self
115 }
116
117 #[must_use]
123 pub fn video_codec(mut self, codec: VideoCodec) -> Self {
124 self.video_codec = codec;
125 self
126 }
127
128 #[must_use]
134 pub fn audio_codec(mut self, codec: AudioCodec) -> Self {
135 self.audio_codec = codec;
136 self
137 }
138
139 #[must_use]
143 pub fn video_bitrate(mut self, bitrate: u64) -> Self {
144 self.video_bitrate = bitrate;
145 self
146 }
147
148 #[must_use]
152 pub fn audio_bitrate(mut self, bitrate: u64) -> Self {
153 self.audio_bitrate = bitrate;
154 self
155 }
156
157 pub fn build(mut self) -> Result<Self, StreamError> {
172 if !self.url.starts_with("rtmp://") {
173 return Err(StreamError::InvalidConfig {
174 reason: "RtmpOutput URL must start with rtmp://".into(),
175 });
176 }
177
178 let (Some(width), Some(height), Some(fps)) =
179 (self.video_width, self.video_height, self.fps)
180 else {
181 return Err(StreamError::InvalidConfig {
182 reason: "video parameters not set; call .video(width, height, fps) before .build()"
183 .into(),
184 });
185 };
186
187 if self.video_codec != VideoCodec::H264 {
188 return Err(StreamError::UnsupportedCodec {
189 codec: format!("{:?}", self.video_codec),
190 reason: "RTMP/FLV requires H.264 video".into(),
191 });
192 }
193
194 if self.audio_codec != AudioCodec::Aac {
195 return Err(StreamError::UnsupportedCodec {
196 codec: format!("{:?}", self.audio_codec),
197 reason: "RTMP/FLV requires AAC audio".into(),
198 });
199 }
200
201 #[allow(clippy::cast_possible_truncation)]
202 let fps_int = fps.round().max(1.0) as i32;
203
204 let inner = RtmpInner::open(
205 &self.url,
206 width.cast_signed(),
207 height.cast_signed(),
208 fps_int,
209 self.video_bitrate,
210 self.sample_rate.cast_signed(),
211 self.channels.cast_signed(),
212 self.audio_bitrate.cast_signed(),
213 )?;
214
215 self.inner = Some(inner);
216 Ok(self)
217 }
218}
219
220impl StreamOutput for RtmpOutput {
225 fn push_video(&mut self, frame: &VideoFrame) -> Result<(), StreamError> {
226 if self.finished {
227 return Err(StreamError::InvalidConfig {
228 reason: "push_video called after finish()".into(),
229 });
230 }
231 let inner = self
232 .inner
233 .as_mut()
234 .ok_or_else(|| StreamError::InvalidConfig {
235 reason: "push_video called before build()".into(),
236 })?;
237 inner.push_video(frame)
238 }
239
240 fn push_audio(&mut self, frame: &AudioFrame) -> Result<(), StreamError> {
241 if self.finished {
242 return Err(StreamError::InvalidConfig {
243 reason: "push_audio called after finish()".into(),
244 });
245 }
246 let inner = self
247 .inner
248 .as_mut()
249 .ok_or_else(|| StreamError::InvalidConfig {
250 reason: "push_audio called before build()".into(),
251 })?;
252 inner.push_audio(frame);
253 Ok(())
254 }
255
256 fn finish(mut self: Box<Self>) -> Result<(), StreamError> {
257 if self.finished {
258 return Ok(());
259 }
260 self.finished = true;
261 let inner = self
262 .inner
263 .take()
264 .ok_or_else(|| StreamError::InvalidConfig {
265 reason: "finish() called before build()".into(),
266 })?;
267 inner.flush_and_close();
268 Ok(())
269 }
270}
271
272#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn build_without_rtmp_scheme_should_return_invalid_config() {
282 let result = RtmpOutput::new("http://example.com/live")
283 .video(1280, 720, 30.0)
284 .build();
285 assert!(matches!(result, Err(StreamError::InvalidConfig { .. })));
286 }
287
288 #[test]
289 fn build_without_video_should_return_invalid_config() {
290 let result = RtmpOutput::new("rtmp://localhost/live/key").build();
291 assert!(matches!(result, Err(StreamError::InvalidConfig { .. })));
292 }
293
294 #[test]
295 fn build_with_non_h264_video_codec_should_return_unsupported_codec() {
296 let result = RtmpOutput::new("rtmp://localhost/live/key")
297 .video(1280, 720, 30.0)
298 .video_codec(VideoCodec::Vp9)
299 .build();
300 assert!(matches!(result, Err(StreamError::UnsupportedCodec { .. })));
301 }
302
303 #[test]
304 fn build_with_non_aac_audio_codec_should_return_unsupported_codec() {
305 let result = RtmpOutput::new("rtmp://localhost/live/key")
306 .video(1280, 720, 30.0)
307 .audio_codec(AudioCodec::Mp3)
308 .build();
309 assert!(matches!(result, Err(StreamError::UnsupportedCodec { .. })));
310 }
311
312 #[test]
313 fn video_bitrate_default_should_be_four_megabits() {
314 let out = RtmpOutput::new("rtmp://localhost/live/key");
315 assert_eq!(out.video_bitrate, 4_000_000);
316 }
317
318 #[test]
319 fn audio_defaults_should_be_44100hz_stereo() {
320 let out = RtmpOutput::new("rtmp://localhost/live/key");
321 assert_eq!(out.sample_rate, 44100);
322 assert_eq!(out.channels, 2);
323 }
324}