Skip to main content

ez_ffmpeg/rtmp/
embed_rtmp_server.rs

1use crate::core::context::output::Output;
2use crate::error::Error::{RtmpCreateStream, RtmpStreamAlreadyExists};
3use crate::flv::flv_buffer::FlvBuffer;
4use crate::flv::flv_tag::FlvTag;
5use crate::rtmp::reactor::{effective_max_connections, Reactor, CHANNEL_HEADROOM};
6use bytes::{BufMut, Bytes};
7use log::{debug, error, info, warn};
8use rml_rtmp::chunk_io::ChunkSerializer;
9use rml_rtmp::messages::{MessagePayload, RtmpMessage};
10use rml_rtmp::rml_amf0::Amf0Value;
11use rml_rtmp::time::RtmpTimestamp;
12use std::collections::HashMap;
13use std::marker::PhantomData;
14use std::net::{Shutdown, TcpListener, TcpStream};
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::Arc;
17
18#[derive(Clone)]
19pub struct Initialization;
20#[derive(Clone)]
21pub struct Running;
22#[derive(Clone)]
23pub struct Ended;
24
25#[derive(Clone)]
26pub struct EmbedRtmpServer<S> {
27    address: String,
28    bound_addr: Option<std::net::SocketAddr>,
29    status: Arc<AtomicUsize>,
30    stream_keys: dashmap::DashSet<String>,
31    // stream_key bytes_receiver
32    publisher_sender: Option<crossbeam_channel::Sender<(String, crossbeam_channel::Receiver<Vec<u8>>)>>,
33    gop_limit: usize,
34    max_connections: Option<usize>,
35    state: PhantomData<S>,
36}
37
38const STATUS_INIT: usize = 0;
39const STATUS_RUN: usize = 1;
40const STATUS_END: usize = 2;
41
42impl<S: 'static> EmbedRtmpServer<S> {
43    fn into_state<T>(self) -> EmbedRtmpServer<T> {
44        EmbedRtmpServer {
45            address: self.address,
46            bound_addr: self.bound_addr,
47            status: self.status,
48            stream_keys: self.stream_keys,
49            publisher_sender: self.publisher_sender,
50            gop_limit: self.gop_limit,
51            max_connections: self.max_connections,
52            state: Default::default(),
53        }
54    }
55
56    /// Checks whether the RTMP server has been stopped. This returns `true` after
57    /// [`stop`](EmbedRtmpServer<Running>::stop) has been called and the server has exited its main loop, otherwise `false`.
58    ///
59    /// # Returns
60    ///
61    /// * `true` if the server has been signaled to stop (and is no longer listening/accepting).
62    /// * `false` if the server is still running.
63    pub fn is_stopped(&self) -> bool {
64        self.status.load(Ordering::Acquire) == STATUS_END
65    }
66}
67
68impl EmbedRtmpServer<Initialization> {
69    /// Creates a new RTMP server instance that will listen on the specified address
70    /// when [`start`](EmbedRtmpServer<Initialization>::start) is called.
71    ///
72    /// # Parameters
73    ///
74    /// * `address` - A string slice representing the address (host:port) to bind the
75    ///   RTMP server socket.
76    ///
77    /// # Returns
78    ///
79    /// An [`EmbedRtmpServer`] configured to listen on the given address.
80    pub fn new(address: impl Into<String>) -> EmbedRtmpServer<Initialization> {
81        Self::new_with_gop_limit(address, 1)
82    }
83
84    /// Creates a new RTMP server instance that will listen on the specified address,
85    /// with a custom GOP limit.
86    ///
87    /// This method allows specifying the maximum number of GOPs to be cached.
88    /// A GOP (Group of Pictures) represents a sequence of video frames (I, P, B frames)
89    /// used for efficient video decoding and random access. The GOP limit defines
90    /// how many such groups are stored in the cache.
91    ///
92    /// # Parameters
93    ///
94    /// * `address` - A string slice representing the address (host:port) to bind the
95    ///   RTMP server socket.
96    /// * `gop_limit` - The maximum number of GOPs to cache.
97    ///
98    /// # Returns
99    ///
100    /// An [`EmbedRtmpServer`] instance configured to listen on the given address and
101    /// using the specified GOP limit.
102    pub fn new_with_gop_limit(address: impl Into<String>, gop_limit: usize) -> EmbedRtmpServer<Initialization> {
103        Self {
104            address: address.into(),
105            bound_addr: None,
106            status: Arc::new(AtomicUsize::new(STATUS_INIT)),
107            stream_keys: Default::default(),
108            publisher_sender: None,
109            gop_limit,
110            max_connections: None,
111            state: Default::default(),
112        }
113    }
114
115    /// Sets the maximum number of concurrent connections allowed.
116    ///
117    /// If not set, the limit is auto-detected based on system file descriptor limits
118    /// (default: 10000, capped at 80% of system FD limit).
119    ///
120    /// # Parameters
121    ///
122    /// * `max_connections` - Maximum number of concurrent connections
123    ///
124    /// # Returns
125    ///
126    /// Self for method chaining.
127    pub fn set_max_connections(mut self, max_connections: usize) -> Self {
128        self.max_connections = Some(max_connections);
129        self
130    }
131
132    /// Starts the RTMP server on the configured address, entering a loop that
133    /// accepts incoming client connections. This method spawns background threads
134    /// to handle the connections and publish events.
135    ///
136    /// # Returns
137    ///
138    /// * `Ok(())` if the server successfully starts listening.
139    /// * An error variant if the socket could not be bound or other I/O errors occur.
140    pub fn start(mut self) -> crate::error::Result<EmbedRtmpServer<Running>> {
141        let listener = TcpListener::bind(self.address.clone())
142            .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
143
144        // Get actual bound address (important for port 0)
145        let actual_addr = listener.local_addr()
146            .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
147        self.bound_addr = Some(actual_addr);
148
149        listener
150            .set_nonblocking(true)
151            .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
152
153        self.status.store(STATUS_RUN, Ordering::Release);
154
155        // Calculate effective max and create bounded channel with headroom
156        // This prevents unbounded queue growth when reactor is at capacity
157        let effective_max = effective_max_connections(self.max_connections);
158        let channel_capacity = effective_max.saturating_add(CHANNEL_HEADROOM);
159        let (stream_sender, stream_receiver) = crossbeam_channel::bounded(channel_capacity);
160        let (publisher_sender, publisher_receiver) = crossbeam_channel::bounded(1024);
161        self.publisher_sender = Some(publisher_sender);
162        let stream_keys = self.stream_keys.clone();
163        let status = self.status.clone();
164        let max_connections = self.max_connections;
165        let result = std::thread::Builder::new()
166            .name("rtmp-server-worker".to_string())
167            .spawn(move || handle_connections(stream_receiver, publisher_receiver, stream_keys, self.gop_limit, max_connections, status));
168        if let Err(e) = result {
169            error!("Thread[rtmp-server-worker] exited with error: {e}");
170            return Err(crate::error::Error::RtmpThreadExited);
171        }
172
173        info!(
174            "Embed rtmp server listening for connections on {} (actual: {}, max_connections: {}).",
175            &self.address, actual_addr, effective_max
176        );
177
178        let status = self.status.clone();
179        let result = std::thread::Builder::new()
180            .name("rtmp-server-io".to_string())
181            .spawn(move || {
182            for stream in listener.incoming() {
183                match stream {
184                    Ok(stream) => {
185                        // Use try_send to apply backpressure when channel is full
186                        match stream_sender.try_send(stream) {
187                            Ok(_) => {
188                                debug!("New rtmp connection accepted.");
189                            }
190                            Err(crossbeam_channel::TrySendError::Full(s)) => {
191                                // Channel full - server at capacity, reject connection immediately
192                                let _ = s.shutdown(Shutdown::Both);
193                                debug!("Connection rejected: server at capacity (channel full)");
194                            }
195                            Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
196                                error!("Connection channel disconnected");
197                                status.store(STATUS_END, Ordering::Release);
198                                return;
199                            }
200                        }
201                    }
202                    Err(e) => {
203                        if e.kind() == std::io::ErrorKind::WouldBlock {
204                            if status.load(Ordering::Acquire) == STATUS_END {
205                                info!("Embed rtmp server stopped.");
206                                break;
207                            }
208                            std::thread::sleep(std::time::Duration::from_millis(100));
209                        } else {
210                            debug!("Rtmp connection error: {:?}", e);
211                        }
212                    }
213                }
214            }
215        });
216        if let Err(e) = result {
217            error!("Thread[rtmp-server-io] exited with error: {e}");
218            return Err(crate::error::Error::RtmpThreadExited);
219        }
220
221        Ok(self.into_state())
222    }
223}
224
225impl EmbedRtmpServer<Running> {
226    /// Returns the actual bound socket address of the RTMP server.
227    ///
228    /// This is particularly useful when binding to port 0 (random port allocation),
229    /// as it allows you to discover which port the OS assigned.
230    ///
231    /// # Returns
232    ///
233    /// * `Option<std::net::SocketAddr>` - The actual bound address, or `None` if not available.
234    ///
235    /// # Example
236    ///
237    /// ```rust,ignore
238    /// let server = EmbedRtmpServer::new("127.0.0.1:0").start().unwrap();
239    /// let actual_port = server.local_addr().unwrap().port();
240    /// println!("Server listening on port: {}", actual_port);
241    /// ```
242    pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
243        self.bound_addr
244    }
245
246    /// Creates an RTMP "input" endpoint for this server (from the server's perspective),
247    /// returning an [`Output`] that can be used by FFmpeg to push media data.
248    ///
249    /// From the FFmpeg standpoint, the returned [`Output`] is where media content is
250    /// sent (i.e., FFmpeg "outputs" to this RTMP server). After obtaining this [`Output`],
251    /// you can pass it to your FFmpeg job or scheduler to start streaming data into the server.
252    ///
253    /// # Parameters
254    ///
255    /// * `app_name` - The RTMP application name, typically corresponding to the `app` part
256    ///   of an RTMP URL (e.g., `rtmp://host:port/app/stream_key`).
257    /// * `stream_key` - The stream key (or "stream name"). If a stream with the same key
258    ///   already exists, an error will be returned.
259    ///
260    /// # Returns
261    ///
262    /// * [`Output`] - An output object preconfigured for streaming to this RTMP server.
263    ///   This can be passed to the FFmpeg SDK for actual data push.
264    /// * [`crate::error::Error`] - If a stream with the same key already exists, the server
265    ///   is not ready, or an internal error occurs, the corresponding error is returned.
266    ///
267    /// # Example
268    ///
269    /// ```rust,ignore
270    /// # // Assume there are definitions and initializations for FfmpegContext, FfmpegScheduler, etc.
271    ///
272    /// // 1. Create and start the RTMP server
273    /// let mut rtmp_server = EmbedRtmpServer::new("localhost:1935");
274    /// rtmp_server.start().expect("Failed to start RTMP server");
275    ///
276    /// // 2. Create an RTMP "input" with app_name="my-app" and stream_key="my-stream"
277    /// let output = rtmp_server
278    ///     .create_rtmp_input("my-app", "my-stream")
279    ///     .expect("Failed to create RTMP input");
280    ///
281    /// // 3. Prepare the FFmpeg context to push a local file to the newly created `Output`
282    /// let context = FfmpegContext::builder()
283    ///     .input("test.mp4")
284    ///     .output(output)
285    ///     .build()
286    ///     .expect("Failed to build Ffmpeg context");
287    ///
288    /// // 4. Start FFmpeg to push "test.mp4" to the local RTMP server on "my-app/my-stream"
289    /// FfmpegScheduler::new(context)
290    ///     .start()
291    ///     .expect("Failed to start Ffmpeg job");
292    /// ```
293    pub fn create_rtmp_input(
294        &self,
295        app_name: impl Into<String>,
296        stream_key: impl Into<String>,
297    ) -> crate::error::Result<Output> {
298        let message_sender = self.create_stream_sender(app_name, stream_key)?;
299
300        let mut flv_buffer = FlvBuffer::new();
301        let mut serializer = ChunkSerializer::new();
302        let write_callback: Box<dyn FnMut(&[u8]) -> i32> = Box::new(move |buf: &[u8]| -> i32 {
303            flv_buffer.write_data(buf);
304            if let Some(mut flv_tag) = flv_buffer.get_flv_tag() {
305                flv_tag.header.stream_id = 1;
306                match serializer.serialize(&flv_tag_to_message_payload(flv_tag), false, true) {
307                    Ok(packet) => {
308                        if let Err(e) = message_sender.send(packet.bytes) {
309                            error!("Failed to send RTMP packet: {:?}", e);
310                            return -1;
311                        }
312                    }
313                    Err(e) => {
314                        error!("Failed to serialize RTMP message: {:?}", e);
315                        return -1;
316                    }
317                }
318            }
319            buf.len() as i32
320        });
321
322        let output: Output = write_callback.into();
323
324        Ok(output
325            .set_format("flv")
326            .set_video_codec("h264")
327            .set_audio_codec("aac")
328            .set_format_opt("flvflags", "no_duration_filesize"))
329    }
330
331    /// Creates a sender channel for an RTMP stream, identified by `app_name` and `stream_key`.
332    /// This method is used internally by [`create_rtmp_input`](EmbedRtmpServer<Running>::create_rtmp_input) but can also be called directly
333    /// if you need more control over how the stream is handled.
334    ///
335    /// # Parameters
336    ///
337    /// * `app_name` - The RTMP application name.
338    /// * `stream_key` - The unique name (or key) for this stream. Must not already be in use.
339    ///
340    /// # Returns
341    ///
342    /// * `crossbeam_channel::Sender<Vec<u8>>` - A sender that allows you to send raw RTMP bytes
343    ///   into the server's handling pipeline.
344    /// * [`crate::error::Error`] - If a stream with the same key already exists or other
345    ///   internal issues occur, an error is returned.
346    ///
347    /// # Notes
348    ///
349    /// * This function sets up the initial RTMP "connect" and "publish" commands automatically.
350    /// * If you manually send bytes to the resulting channel, they should already be properly
351    ///   packaged as RTMP chunks. Otherwise, the server might fail to parse them.
352    pub fn create_stream_sender(
353        &self,
354        app_name: impl Into<String>,
355        stream_key: impl Into<String>,
356    ) -> crate::error::Result<crossbeam_channel::Sender<Vec<u8>>> {
357        let stream_key = stream_key.into();
358        if self.stream_keys.contains(&stream_key) {
359            return Err(RtmpStreamAlreadyExists(stream_key));
360        }
361
362        let (sender, receiver) = crossbeam_channel::bounded(1024);
363
364        let publisher_sender = match self.publisher_sender.as_ref() {
365            Some(sender) => sender,
366            None => {
367                error!("Publisher sender not initialized");
368                return Err(RtmpCreateStream.into());
369            }
370        };
371
372        if let Err(_) = publisher_sender.send((stream_key.clone(), receiver)) {
373            if self.status.load(Ordering::Acquire) != STATUS_END {
374                warn!("Rtmp server worker already exited. Can't create stream sender.");
375            } else {
376                error!("Rtmp Server aborted. Can't create stream sender.");
377            }
378            return Err(RtmpCreateStream.into());
379        }
380
381        let mut serializer = ChunkSerializer::new();
382
383        // send connect
384        let mut properties: HashMap<String, Amf0Value> = HashMap::new();
385        properties.insert("app".to_string(), Amf0Value::Utf8String(app_name.into()));
386        let connect_cmd = RtmpMessage::Amf0Command {
387            command_name: "connect".to_string(),
388            transaction_id: 1.0,
389            command_object: Amf0Value::Object(properties),
390            additional_arguments: Vec::new(),
391        }
392        .into_message_payload(RtmpTimestamp { value: 0 }, 0);
393
394        let connect_cmd = match connect_cmd {
395            Ok(cmd) => cmd,
396            Err(e) => {
397                error!("Failed to create connect command: {:?}", e);
398                return Err(RtmpCreateStream.into());
399            }
400        };
401
402        let connect_packet = match serializer.serialize(&connect_cmd, false, true) {
403            Ok(packet) => packet,
404            Err(e) => {
405                error!("Failed to serialize connect command: {:?}", e);
406                return Err(RtmpCreateStream.into());
407            }
408        };
409
410        if let Err(_) = sender.send(connect_packet.bytes) {
411            error!("Can't send connect command to rtmp server.");
412            return Err(RtmpCreateStream.into());
413        }
414
415        // send createStream
416        let create_stream_cmd = RtmpMessage::Amf0Command {
417            command_name: "createStream".to_string(),
418            transaction_id: 2.0,
419            command_object: Amf0Value::Null,
420            additional_arguments: Vec::new(),
421        }
422        .into_message_payload(RtmpTimestamp { value: 0 }, 1);
423
424        let create_stream_cmd = match create_stream_cmd {
425            Ok(cmd) => cmd,
426            Err(e) => {
427                error!("Failed to create createStream command: {:?}", e);
428                return Err(RtmpCreateStream.into());
429            }
430        };
431
432        let create_stream_packet = match serializer.serialize(&create_stream_cmd, false, true) {
433            Ok(packet) => packet,
434            Err(e) => {
435                error!("Failed to serialize createStream command: {:?}", e);
436                return Err(RtmpCreateStream.into());
437            }
438        };
439
440        if let Err(_) = sender.send(create_stream_packet.bytes) {
441            error!("Can't send createStream command to rtmp server.");
442            return Err(RtmpCreateStream.into());
443        }
444
445        // send publish
446        let mut arguments = Vec::new();
447        arguments.push(Amf0Value::Utf8String(stream_key));
448        arguments.push(Amf0Value::Utf8String("live".into()));
449        let publish_cmd = RtmpMessage::Amf0Command {
450            command_name: "publish".to_string(),
451            transaction_id: 3.0,
452            command_object: Amf0Value::Null,
453            additional_arguments: arguments,
454        }
455        .into_message_payload(RtmpTimestamp { value: 0 }, 1);
456
457        let publish_cmd = match publish_cmd {
458            Ok(cmd) => cmd,
459            Err(e) => {
460                error!("Failed to create publish command: {:?}", e);
461                return Err(RtmpCreateStream.into());
462            }
463        };
464
465        let publish_packet = match serializer.serialize(&publish_cmd, false, true) {
466            Ok(packet) => packet,
467            Err(e) => {
468                error!("Failed to serialize publish command: {:?}", e);
469                return Err(RtmpCreateStream.into());
470            }
471        };
472
473        if let Err(_) = sender.send(publish_packet.bytes) {
474            error!("Can't send publish command to rtmp server.");
475            return Err(RtmpCreateStream.into());
476        }
477        Ok(sender)
478    }
479
480    /// Stops the RTMP server by signaling the listening and connection-handling threads
481    /// to terminate. Once called, new incoming connections will be ignored, and existing
482    /// threads will exit gracefully.
483    ///
484    /// # Example
485    /// ```rust,ignore
486    /// let server = EmbedRtmpServer::new("localhost:1935");
487    /// // ... start and handle streaming
488    /// server.stop();
489    /// assert!(server.is_stopped());
490    /// ```
491    pub fn stop(self) -> EmbedRtmpServer<Ended> {
492        self.status.store(STATUS_END, Ordering::Release);
493        self.into_state()
494    }
495}
496
497/// Handle connections using optimized Reactor
498///
499/// Replaces old multi-threaded handle_connections with single-threaded event-driven model:
500/// - Uses epoll/kqueue/WSAPoll for IO multiplexing
501/// - Write queue with backpressure management
502/// - Strict drain until WouldBlock semantics
503fn handle_connections(
504    connection_receiver: crossbeam_channel::Receiver<TcpStream>,
505    publisher_receiver: crossbeam_channel::Receiver<(String, crossbeam_channel::Receiver<Vec<u8>>)>,
506    stream_keys: dashmap::DashSet<String>,
507    gop_limit: usize,
508    max_connections: Option<usize>,
509    status: Arc<AtomicUsize>,
510) {
511    // Create Reactor
512    let mut reactor = match Reactor::new(gop_limit, max_connections, stream_keys, status.clone()) {
513        Ok(r) => r,
514        Err(e) => {
515            error!("Failed to create Reactor: {:?}", e);
516            status.store(STATUS_END, Ordering::Release);
517            return;
518        }
519    };
520
521    // Run Reactor main loop
522    reactor.run(connection_receiver, publisher_receiver);
523
524    if status.load(Ordering::Acquire) != STATUS_END {
525        error!("Rtmp Server aborted.");
526    }
527}
528
529pub fn flv_tag_to_message_payload(flv_tag: FlvTag) -> MessagePayload {
530    let timestamp = flv_tag.header.timestamp | ((flv_tag.header.timestamp_ext as u32) << 24);
531
532    let type_id = flv_tag.header.tag_type;
533    let message_stream_id = flv_tag.header.stream_id;
534
535    let data = if type_id == 0x12 {
536        wrap_metadata(flv_tag.data)
537    } else {
538        flv_tag.data
539    };
540
541    MessagePayload {
542        timestamp: RtmpTimestamp { value: timestamp },
543        type_id,
544        message_stream_id,
545        data,
546    }
547}
548
549fn wrap_metadata(data: Bytes) -> Bytes {
550    let s = "@setDataFrame";
551
552    let insert_len = 16;
553
554    let mut bytes = bytes::BytesMut::with_capacity(insert_len + data.len());
555
556    bytes.put_u8(0x02);
557    bytes.put_u16(s.len() as u16);
558    bytes.put(s.as_bytes());
559
560    bytes.put(data);
561
562    bytes.freeze()
563}
564
565
566// ============================================================================
567// StreamBuilder API - Simplified RTMP streaming interface
568// ============================================================================
569
570use crate::core::context::ffmpeg_context::FfmpegContext;
571use crate::core::context::input::Input;
572use crate::core::scheduler::ffmpeg_scheduler::{FfmpegScheduler, Running as SchedulerRunning};
573use crate::error::StreamError;
574use std::path::{Path, PathBuf};
575
576/// A builder for creating RTMP streaming sessions with a simplified API.
577///
578/// This builder provides a fluent interface for configuring and starting
579/// RTMP streaming without needing to manually manage the server lifecycle.
580///
581/// # Example
582///
583/// ```rust,ignore
584/// use ez_ffmpeg::rtmp::embed_rtmp_server::EmbedRtmpServer;
585///
586/// let handle = EmbedRtmpServer::stream_builder()
587///     .address("localhost:1935")
588///     .app_name("live")
589///     .stream_key("stream1")
590///     .input_file("video.mp4")
591///     // readrate defaults to 1.0 (realtime)
592///     .start()?;
593///
594/// handle.wait()?;
595/// ```
596pub struct StreamBuilder {
597    address: Option<String>,
598    app_name: Option<String>,
599    stream_key: Option<String>,
600    input_file: Option<PathBuf>,
601    readrate: Option<f32>,
602    gop_limit: Option<usize>,
603    max_connections: Option<usize>,
604}
605
606impl Default for StreamBuilder {
607    fn default() -> Self {
608        Self::new()
609    }
610}
611
612impl StreamBuilder {
613    /// Creates a new `StreamBuilder` with default settings.
614    ///
615    /// By default, `readrate` is set to `1.0` (real-time playback speed),
616    /// which is equivalent to FFmpeg's `-re` flag. This is the recommended
617    /// setting for live RTMP streaming scenarios.
618    pub fn new() -> Self {
619        Self {
620            address: None,
621            app_name: None,
622            stream_key: None,
623            input_file: None,
624            readrate: Some(1.0), // Default to real-time speed for live streaming
625            gop_limit: None,
626            max_connections: None,
627        }
628    }
629
630    /// Sets the address for the RTMP server (e.g., "localhost:1935").
631    pub fn address(mut self, address: impl Into<String>) -> Self {
632        self.address = Some(address.into());
633        self
634    }
635
636    /// Sets the RTMP application name.
637    pub fn app_name(mut self, app_name: impl Into<String>) -> Self {
638        self.app_name = Some(app_name.into());
639        self
640    }
641
642    /// Sets the stream key (publishing name).
643    pub fn stream_key(mut self, stream_key: impl Into<String>) -> Self {
644        self.stream_key = Some(stream_key.into());
645        self
646    }
647
648    /// Sets the input file path to stream.
649    pub fn input_file(mut self, path: impl AsRef<Path>) -> Self {
650        self.input_file = Some(path.as_ref().to_path_buf());
651        self
652    }
653
654    /// Sets the read rate for the input file.
655    ///
656    /// A value of 1.0 means realtime playback speed.
657    /// This is useful for simulating live streaming from a file.
658    pub fn readrate(mut self, rate: f32) -> Self {
659        self.readrate = Some(rate);
660        self
661    }
662
663    /// Sets the GOP (Group of Pictures) limit for the RTMP server.
664    ///
665    /// This controls how many GOPs are buffered for new subscribers.
666    pub fn gop_limit(mut self, limit: usize) -> Self {
667        self.gop_limit = Some(limit);
668        self
669    }
670
671    /// Sets the maximum number of connections the server will accept.
672    pub fn max_connections(mut self, max: usize) -> Self {
673        self.max_connections = Some(max);
674        self
675    }
676
677    /// Starts the RTMP streaming session.
678    ///
679    /// This method validates all required parameters, starts the RTMP server,
680    /// and begins streaming the input file.
681    ///
682    /// # Required Parameters
683    ///
684    /// - `address`: The server address
685    /// - `app_name`: The RTMP application name
686    /// - `stream_key`: The stream key (publishing name)
687    /// - `input_file`: The file to stream
688    ///
689    /// # Returns
690    ///
691    /// A `StreamHandle` that can be used to wait for completion or manage the stream.
692    ///
693    /// # Errors
694    ///
695    /// Returns `StreamError` if:
696    /// - Any required parameter is missing
697    /// - The input file does not exist
698    /// - The server fails to start
699    /// - FFmpeg context creation fails
700    pub fn start(self) -> Result<StreamHandle, StreamError> {
701        // Validate required parameters
702        let address = self
703            .address
704            .ok_or(StreamError::MissingParameter("address"))?;
705        let app_name = self
706            .app_name
707            .ok_or(StreamError::MissingParameter("app_name"))?;
708        let stream_key = self
709            .stream_key
710            .ok_or(StreamError::MissingParameter("stream_key"))?;
711        let input_file = self
712            .input_file
713            .ok_or(StreamError::MissingParameter("input_file"))?;
714
715        // Validate input file exists and is a file (not a directory)
716        if !input_file.is_file() {
717            return Err(StreamError::InputNotFound { path: input_file });
718        }
719
720        // Create and configure the server
721        let mut server = if let Some(gop_limit) = self.gop_limit {
722            EmbedRtmpServer::new_with_gop_limit(&address, gop_limit)
723        } else {
724            EmbedRtmpServer::new(&address)
725        };
726
727        if let Some(max_conn) = self.max_connections {
728            server = server.set_max_connections(max_conn);
729        }
730
731        // Start the server
732        let server = server.start().map_err(StreamError::Ffmpeg)?;
733        let server = Arc::new(server);
734
735        // Create the RTMP output
736        let output = server
737            .create_rtmp_input(&app_name, &stream_key)
738            .map_err(StreamError::Ffmpeg)?;
739
740        // Create the input with optional readrate
741        let input_path = input_file.to_string_lossy().to_string();
742        let mut input = Input::from(input_path);
743        if let Some(rate) = self.readrate {
744            input = input.set_readrate(rate);
745        }
746
747        // Build and start the FFmpeg context
748        let scheduler = FfmpegContext::builder()
749            .input(input)
750            .output(output)
751            .build()
752            .map_err(StreamError::Ffmpeg)?
753            .start()
754            .map_err(StreamError::Ffmpeg)?;
755
756        Ok(StreamHandle {
757            _server: server,
758            scheduler: Some(scheduler),
759        })
760    }
761}
762
763/// A handle to a running RTMP streaming session.
764///
765/// This handle manages the lifecycle of both the RTMP server and the FFmpeg
766/// streaming context. When dropped, it will attempt to clean up resources.
767///
768/// # Example
769///
770/// ```rust,ignore
771/// let handle = EmbedRtmpServer::stream_builder()
772///     .address("localhost:1935")
773///     .app_name("live")
774///     .stream_key("stream1")
775///     .input_file("video.mp4")
776///     .start()?;
777///
778/// // Wait for streaming to complete
779/// handle.wait()?;
780/// ```
781pub struct StreamHandle {
782    _server: Arc<EmbedRtmpServer<Running>>,
783    scheduler: Option<FfmpegScheduler<SchedulerRunning>>,
784}
785
786impl StreamHandle {
787    /// Waits for the streaming session to complete.
788    ///
789    /// This method blocks until the FFmpeg context finishes processing
790    /// (e.g., when the input file ends or an error occurs).
791    ///
792    /// # Returns
793    ///
794    /// Returns `Ok(())` if streaming completed successfully, or an error
795    /// if something went wrong during streaming.
796    pub fn wait(mut self) -> Result<(), StreamError> {
797        if let Some(scheduler) = self.scheduler.take() {
798            scheduler.wait().map_err(StreamError::Ffmpeg)?;
799        }
800        Ok(())
801    }
802}
803
804impl Drop for StreamHandle {
805    fn drop(&mut self) {
806        // Best-effort cleanup: if scheduler wasn't consumed by wait(),
807        // we attempt to stop it gracefully here.
808        // The server will be stopped when the Arc is dropped.
809        if let Some(scheduler) = self.scheduler.take() {
810            // Attempt to wait for graceful shutdown, but don't block forever
811            let _ = scheduler.wait();
812        }
813    }
814}
815
816impl EmbedRtmpServer<Initialization> {
817    /// Creates a new `StreamBuilder` for simplified RTMP streaming.
818    ///
819    /// This is the recommended entry point for simple streaming scenarios
820    /// where you want to stream a file to an embedded RTMP server.
821    ///
822    /// # Example
823    ///
824    /// ```rust,ignore
825    /// use ez_ffmpeg::rtmp::embed_rtmp_server::EmbedRtmpServer;
826    ///
827    /// let handle = EmbedRtmpServer::stream_builder()
828    ///     .address("localhost:1935")
829    ///     .app_name("live")
830    ///     .stream_key("stream1")
831    ///     .input_file("video.mp4")
832    ///     .start()?;
833    ///
834    /// handle.wait()?;
835    /// ```
836    ///
837    /// For more complex scenarios requiring full control over the server
838    /// and FFmpeg context, use the traditional API:
839    ///
840    /// ```rust,ignore
841    /// let server = EmbedRtmpServer::new("localhost:1935").start()?;
842    /// let output = server.create_rtmp_input("app", "stream")?;
843    /// // ... configure Input and FfmpegContext manually
844    /// ```
845    pub fn stream_builder() -> StreamBuilder {
846        StreamBuilder::new()
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use crate::core::context::ffmpeg_context::FfmpegContext;
854    use crate::core::context::input::Input;
855    use crate::core::context::output::Output;
856    use crate::core::scheduler::ffmpeg_scheduler::FfmpegScheduler;
857    use ffmpeg_next::time::current;
858    use std::thread::sleep;
859    use std::time::Duration;
860
861    #[test]
862    #[ignore] // Integration test: requires exclusive port 1935 and test.mp4
863    fn test_concat_stream_loop() {
864        let _ = env_logger::builder()
865            .filter_level(log::LevelFilter::Trace)
866            .is_test(true)
867            .try_init();
868
869        let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
870        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
871
872        let output = embed_rtmp_server
873            .create_rtmp_input("my-app", "my-stream")
874            .unwrap();
875
876        let start = current();
877
878        let result = FfmpegContext::builder()
879            .input(Input::from("test.mp4")
880                .set_readrate(1.0)
881                .set_stream_loop(3)
882            )
883            .input(
884                Input::from("test.mp4")
885                    .set_readrate(1.0)
886                    .set_stream_loop(3)
887            )
888            .input(
889                Input::from("test.mp4")
890                    .set_readrate(1.0)
891                    .set_stream_loop(3)
892            )
893            .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
894            .output(output)
895            .build()
896            .unwrap()
897            .start()
898            .unwrap()
899            .wait();
900
901        assert!(result.is_ok());
902        info!("elapsed time: {}", current() - start);
903    }
904
905    #[test]
906    #[ignore] // Integration test: requires exclusive port 1935 and test.mp4
907    fn test_stream_loop() {
908        let _ = env_logger::builder()
909            .filter_level(log::LevelFilter::Trace)
910            .is_test(true)
911            .try_init();
912
913        let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
914        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
915
916        let output = embed_rtmp_server
917            .create_rtmp_input("my-app", "my-stream")
918            .unwrap();
919
920        let start = current();
921
922        let result = FfmpegContext::builder()
923            .input(Input::from("test.mp4").set_readrate(1.0).set_stream_loop(-1))
924            // .filter_desc("hue=s=0")
925            .output(output.set_video_codec("h264_videotoolbox"))
926            .build()
927            .unwrap()
928            .start()
929            .unwrap()
930            .wait();
931
932        assert!(result.is_ok());
933
934        info!("elapsed time: {}", current() - start);
935    }
936
937    #[test]
938    #[ignore] // Integration test: requires exclusive port 1935 and test.mp4
939    fn test_concat_realtime() {
940        let _ = env_logger::builder()
941            .filter_level(log::LevelFilter::Trace)
942            .is_test(true)
943            .try_init();
944
945        let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
946        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
947
948        let output = embed_rtmp_server
949            .create_rtmp_input("my-app", "my-stream")
950            .unwrap();
951
952        let start = current();
953
954        let result = FfmpegContext::builder()
955            .independent_readrate()
956            .input(Input::from("test.mp4").set_readrate(1.0))
957            .input(
958                Input::from("test.mp4")
959                    .set_readrate(1.0)
960            )
961            .input(
962                Input::from("test.mp4")
963                    .set_readrate(1.0)
964            )
965            .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
966            .output(output)
967            .build()
968            .unwrap()
969            .start()
970            .unwrap()
971            .wait();
972
973        assert!(result.is_ok());
974
975        sleep(Duration::from_secs(1));
976        info!("elapsed time: {}", current() - start);
977    }
978
979    #[test]
980    #[ignore] // Integration test: requires exclusive port 1935 and test.mp4
981    fn test_realtime() {
982        let _ = env_logger::builder()
983            .filter_level(log::LevelFilter::Trace)
984            .is_test(true)
985            .try_init();
986
987        let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
988        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
989
990        let output = embed_rtmp_server
991            .create_rtmp_input("my-app", "my-stream")
992            .unwrap();
993
994        let start = current();
995
996        let result = FfmpegContext::builder()
997            .input(Input::from("test.mp4").set_readrate(1.0))
998            .output(output)
999            .build()
1000            .unwrap()
1001            .start()
1002            .unwrap()
1003            .wait();
1004
1005        assert!(result.is_ok());
1006
1007        info!("elapsed time: {}", current() - start);
1008    }
1009
1010    #[test]
1011    #[ignore] // Integration test: requires test.mp4
1012    fn test_readrate() {
1013        let _ = env_logger::builder()
1014            .filter_level(log::LevelFilter::Trace)
1015            .is_test(true)
1016            .try_init();
1017
1018        let mut output: Output = "output.flv".into();
1019        output.audio_codec = Some("adpcm_swf".to_string());
1020
1021        let mut input: Input = "test.mp4".into();
1022        input.readrate = Some(1.0);
1023
1024        let context = FfmpegContext::builder()
1025            .input(input)
1026            .output(output)
1027            .build()
1028            .unwrap();
1029
1030        let result = FfmpegScheduler::new(context).start().unwrap().wait();
1031        if let Err(error) = result {
1032            println!("Error: {error}");
1033        }
1034    }
1035
1036    #[test]
1037    #[ignore] // Integration test: requires exclusive port 1935 and test.mp4
1038    fn test_embed_rtmp_server() {
1039        let _ = env_logger::builder()
1040            .filter_level(log::LevelFilter::Trace)
1041            .is_test(true)
1042            .try_init();
1043
1044        let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
1045        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
1046
1047        let output = embed_rtmp_server
1048            .create_rtmp_input("my-app", "my-stream")
1049            .unwrap();
1050        let mut input: Input = "test.mp4".into();
1051        input.readrate = Some(1.0);
1052
1053        let context = FfmpegContext::builder()
1054            .input(input)
1055            .output(output)
1056            .build()
1057            .unwrap();
1058
1059        let result = FfmpegScheduler::new(context).start().unwrap().wait();
1060
1061        assert!(result.is_ok());
1062
1063        sleep(Duration::from_secs(3));
1064    }
1065}