ez_ffmpeg/rtmp/
embed_rtmp_server.rs

1use crate::core::context::output::Output;
2use crate::error::Error::{RtmpCreateStream, RtmpStreamAlreadyExists};
3use crate::flv::flv_buffer::FlvBuffer;
4use crate::flv::flv_tag::FlvTag;
5use crate::rtmp::rtmp_connection::{ConnectionError, ReadResult, RtmpConnection};
6use crate::rtmp::rtmp_scheduler::{RtmpScheduler, ServerResult};
7use bytes::{BufMut, Bytes};
8use log::{debug, error, info, warn};
9use rml_rtmp::chunk_io::ChunkSerializer;
10use rml_rtmp::messages::{MessagePayload, RtmpMessage};
11use rml_rtmp::rml_amf0::Amf0Value;
12use rml_rtmp::time::RtmpTimestamp;
13use slab::Slab;
14use std::collections::HashMap;
15use std::marker::PhantomData;
16use std::net::{TcpListener, TcpStream};
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::Duration;
20use crate::core::scheduler::type_to_symbol;
21use crate::error::OpenDecoderOperationError;
22
23#[derive(Clone)]
24pub struct Initialization;
25#[derive(Clone)]
26pub struct Running;
27#[derive(Clone)]
28pub struct Ended;
29
30#[derive(Clone)]
31pub struct EmbedRtmpServer<S> {
32    address: String,
33    status: Arc<AtomicUsize>,
34    stream_keys: dashmap::DashSet<String>,
35    // stream_key bytes_receiver
36    publisher_sender: Option<crossbeam_channel::Sender<(String, crossbeam_channel::Receiver<Vec<u8>>)>>,
37    gop_limit: usize,
38    state: PhantomData<S>,
39}
40
41const STATUS_INIT: usize = 0;
42const STATUS_RUN: usize = 1;
43const STATUS_END: usize = 2;
44
45impl<S: 'static> EmbedRtmpServer<S> {
46    fn is_state<T: 'static>(&self) -> bool {
47        std::any::TypeId::of::<S>() == std::any::TypeId::of::<T>()
48    }
49
50    fn into_state<T>(self) -> EmbedRtmpServer<T> {
51        EmbedRtmpServer {
52            address: self.address,
53            status: self.status,
54            stream_keys: self.stream_keys,
55            publisher_sender: self.publisher_sender,
56            gop_limit: self.gop_limit,
57            state: Default::default(),
58        }
59    }
60
61    /// Checks whether the RTMP server has been stopped. This returns `true` after
62    /// [`stop`](EmbedRtmpServer<Running>::stop) has been called and the server has exited its main loop, otherwise `false`.
63    ///
64    /// # Returns
65    ///
66    /// * `true` if the server has been signaled to stop (and is no longer listening/accepting).
67    /// * `false` if the server is still running.
68    pub fn is_stopped(&self) -> bool {
69        self.status.load(Ordering::Acquire) == STATUS_END
70    }
71}
72
73impl EmbedRtmpServer<Initialization> {
74    /// Creates a new RTMP server instance that will listen on the specified address
75    /// when [`start`](EmbedRtmpServer<Initialization>::start) is called.
76    ///
77    /// # Parameters
78    ///
79    /// * `address` - A string slice representing the address (host:port) to bind the
80    ///   RTMP server socket.
81    ///
82    /// # Returns
83    ///
84    /// An [`EmbedRtmpServer`] configured to listen on the given address.
85    pub fn new(address: impl Into<String>) -> EmbedRtmpServer<Initialization> {
86        Self::new_with_gop_limit(address, 1)
87    }
88
89    /// Creates a new RTMP server instance that will listen on the specified address,
90    /// with a custom GOP limit.
91    ///
92    /// This method allows specifying the maximum number of GOPs to be cached.
93    /// A GOP (Group of Pictures) represents a sequence of video frames (I, P, B frames)
94    /// used for efficient video decoding and random access. The GOP limit defines
95    /// how many such groups are stored in the cache.
96    ///
97    /// # Parameters
98    ///
99    /// * `address` - A string slice representing the address (host:port) to bind the
100    ///   RTMP server socket.
101    /// * `gop_limit` - The maximum number of GOPs to cache.
102    ///
103    /// # Returns
104    ///
105    /// An [`EmbedRtmpServer`] instance configured to listen on the given address and
106    /// using the specified GOP limit.
107    pub fn new_with_gop_limit(address: impl Into<String>, gop_limit: usize) -> EmbedRtmpServer<Initialization> {
108        Self {
109            address: address.into(),
110            status: Arc::new(AtomicUsize::new(STATUS_INIT)),
111            stream_keys: Default::default(),
112            publisher_sender: None,
113            gop_limit,
114            state: Default::default(),
115        }
116    }
117
118    /// Starts the RTMP server on the configured address, entering a loop that
119    /// accepts incoming client connections. This method spawns background threads
120    /// to handle the connections and publish events.
121    ///
122    /// # Returns
123    ///
124    /// * `Ok(())` if the server successfully starts listening.
125    /// * An error variant if the socket could not be bound or other I/O errors occur.
126    pub fn start(mut self) -> crate::error::Result<EmbedRtmpServer<Running>> {
127        let listener = TcpListener::bind(self.address.clone())
128            .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
129
130        listener
131            .set_nonblocking(true)
132            .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
133
134        self.status.store(STATUS_RUN, Ordering::Release);
135
136        let (stream_sender, stream_receiver) = crossbeam_channel::unbounded();
137        let (publisher_sender, publisher_receiver) = crossbeam_channel::bounded(1024);
138        self.publisher_sender = Some(publisher_sender);
139        let stream_keys = self.stream_keys.clone();
140        let status = self.status.clone();
141        let result = std::thread::Builder::new()
142            .name("rtmp-server-worker".to_string())
143            .spawn(move || handle_connections(stream_receiver, publisher_receiver, stream_keys, self.gop_limit, status));
144        if let Err(e) = result {
145            error!("Thread[rtmp-server-worker] exited with error: {e}");
146            return Err(crate::error::Error::RtmpThreadExited);
147        }
148
149        info!(
150            "Embed rtmp server listening for connections on {}.",
151            &self.address
152        );
153
154        let status = self.status.clone();
155        let result = std::thread::Builder::new()
156            .name("rtmp-server-io".to_string())
157            .spawn(move || {
158            for stream in listener.incoming() {
159                match stream {
160                    Ok(stream) => {
161                        debug!("New rtmp connection.");
162                        if let Err(_) = stream_sender.send(stream) {
163                            error!("Error sending stream to rtmp connection handler");
164                            status.store(STATUS_END, Ordering::Release);
165                            return;
166                        }
167                    }
168                    Err(e) => {
169                        if e.kind() == std::io::ErrorKind::WouldBlock {
170                            if status.load(Ordering::Acquire) == STATUS_END {
171                                info!("Embed rtmp server stopped.");
172                                break;
173                            }
174                            std::thread::sleep(std::time::Duration::from_millis(100));
175                        } else {
176                            debug!("Rtmp connection error: {:?}", e);
177                        }
178                    }
179                }
180            }
181        });
182        if let Err(e) = result {
183            error!("Thread[rtmp-server-io] exited with error: {e}");
184            return Err(crate::error::Error::RtmpThreadExited);
185        }
186
187        Ok(self.into_state())
188    }
189}
190
191impl EmbedRtmpServer<Running> {
192    /// Creates an RTMP "input" endpoint for this server (from the server's perspective),
193    /// returning an [`Output`] that can be used by FFmpeg to push media data.
194    ///
195    /// From the FFmpeg standpoint, the returned [`Output`] is where media content is
196    /// sent (i.e., FFmpeg "outputs" to this RTMP server). After obtaining this [`Output`],
197    /// you can pass it to your FFmpeg job or scheduler to start streaming data into the server.
198    ///
199    /// # Parameters
200    ///
201    /// * `app_name` - The RTMP application name, typically corresponding to the `app` part
202    ///   of an RTMP URL (e.g., `rtmp://host:port/app/stream_key`).
203    /// * `stream_key` - The stream key (or "stream name"). If a stream with the same key
204    ///   already exists, an error will be returned.
205    ///
206    /// # Returns
207    ///
208    /// * [`Output`] - An output object preconfigured for streaming to this RTMP server.
209    ///   This can be passed to the FFmpeg SDK for actual data push.
210    /// * [`crate::error::Error`] - If a stream with the same key already exists, the server
211    ///   is not ready, or an internal error occurs, the corresponding error is returned.
212    ///
213    /// # Example
214    ///
215    /// ```rust
216    /// # // Assume there are definitions and initializations for FfmpegContext, FfmpegScheduler, etc.
217    ///
218    /// // 1. Create and start the RTMP server
219    /// let mut rtmp_server = EmbedRtmpServer::new("localhost:1935");
220    /// rtmp_server.start().expect("Failed to start RTMP server");
221    ///
222    /// // 2. Create an RTMP "input" with app_name="my-app" and stream_key="my-stream"
223    /// let output = rtmp_server
224    ///     .create_rtmp_input("my-app", "my-stream")
225    ///     .expect("Failed to create RTMP input");
226    ///
227    /// // 3. Prepare the FFmpeg context to push a local file to the newly created `Output`
228    /// let context = FfmpegContext::builder()
229    ///     .input("test.mp4")
230    ///     .output(output)
231    ///     .build()
232    ///     .expect("Failed to build Ffmpeg context");
233    ///
234    /// // 4. Start FFmpeg to push "test.mp4" to the local RTMP server on "my-app/my-stream"
235    /// FfmpegScheduler::new(context)
236    ///     .start()
237    ///     .expect("Failed to start Ffmpeg job");
238    /// ```
239    pub fn create_rtmp_input(
240        &self,
241        app_name: impl Into<String>,
242        stream_key: impl Into<String>,
243    ) -> crate::error::Result<Output> {
244        let message_sender = self.create_stream_sender(app_name, stream_key)?;
245
246        let mut flv_buffer = FlvBuffer::new();
247        let mut serializer = ChunkSerializer::new();
248        let write_callback: Box<dyn FnMut(&[u8]) -> i32> = Box::new(move |buf: &[u8]| -> i32 {
249            flv_buffer.write_data(buf);
250            if let Some(mut flv_tag) = flv_buffer.get_flv_tag() {
251                flv_tag.header.stream_id = 1;
252                let packet = serializer
253                    .serialize(&flv_tag_to_message_payload(flv_tag), false, true)
254                    .unwrap();
255                message_sender.send(packet.bytes).unwrap();
256            }
257            buf.len() as i32
258        });
259
260        let mut output: Output = write_callback.into();
261
262        Ok(output
263            .set_format("flv")
264            .set_video_codec("h264")
265            .set_audio_codec("aac")
266            .set_format_opt("flvflags", "no_duration_filesize"))
267    }
268
269    /// Creates a sender channel for an RTMP stream, identified by `app_name` and `stream_key`.
270    /// This method is used internally by [`create_rtmp_input`](EmbedRtmpServer<Running>::create_rtmp_input) but can also be called directly
271    /// if you need more control over how the stream is handled.
272    ///
273    /// # Parameters
274    ///
275    /// * `app_name` - The RTMP application name.
276    /// * `stream_key` - The unique name (or key) for this stream. Must not already be in use.
277    ///
278    /// # Returns
279    ///
280    /// * `crossbeam_channel::Sender<Vec<u8>>` - A sender that allows you to send raw RTMP bytes
281    ///   into the server's handling pipeline.
282    /// * [`crate::error::Error`] - If a stream with the same key already exists or other
283    ///   internal issues occur, an error is returned.
284    ///
285    /// # Notes
286    ///
287    /// * This function sets up the initial RTMP "connect" and "publish" commands automatically.
288    /// * If you manually send bytes to the resulting channel, they should already be properly
289    ///   packaged as RTMP chunks. Otherwise, the server might fail to parse them.
290    pub fn create_stream_sender(
291        &self,
292        app_name: impl Into<String>,
293        stream_key: impl Into<String>,
294    ) -> crate::error::Result<crossbeam_channel::Sender<Vec<u8>>> {
295        let stream_key = stream_key.into();
296        if self.stream_keys.contains(&stream_key) {
297            return Err(RtmpStreamAlreadyExists(stream_key));
298        }
299
300        let (sender, receiver) = crossbeam_channel::unbounded();
301
302        let publisher_sender = self.publisher_sender.as_ref().unwrap();
303        if let Err(_) = publisher_sender.send((stream_key.clone(), receiver)) {
304            if self.status.load(Ordering::Acquire) != STATUS_END {
305                warn!("Rtmp server worker already exited. Can't create stream sender.");
306            } else {
307                error!("Rtmp Server aborted. Can't create stream sender.");
308            }
309            return Err(RtmpCreateStream.into());
310        }
311
312        let mut serializer = ChunkSerializer::new();
313
314        // send connect
315        let mut properties: HashMap<String, Amf0Value> = HashMap::new();
316        properties.insert("app".to_string(), Amf0Value::Utf8String(app_name.into()));
317        let connect_cmd = RtmpMessage::Amf0Command {
318            command_name: "connect".to_string(),
319            transaction_id: 1.0,
320            command_object: Amf0Value::Object(properties),
321            additional_arguments: Vec::new(),
322        }
323        .into_message_payload(RtmpTimestamp { value: 0 }, 0)
324        .unwrap();
325
326        let connect_packet = serializer.serialize(&connect_cmd, false, true).unwrap();
327        if let Err(_) = sender.send(connect_packet.bytes) {
328            error!("Can't send connect command to rtmp server.");
329            return Err(RtmpCreateStream.into());
330        }
331
332        // send createStream
333        let create_stream_cmd = RtmpMessage::Amf0Command {
334            command_name: "createStream".to_string(),
335            transaction_id: 2.0,
336            command_object: Amf0Value::Null,
337            additional_arguments: Vec::new(),
338        }
339        .into_message_payload(RtmpTimestamp { value: 0 }, 1)
340        .unwrap();
341
342        let create_stream_packet = serializer
343            .serialize(&create_stream_cmd, false, true)
344            .unwrap();
345        if let Err(_) = sender.send(create_stream_packet.bytes) {
346            error!("Can't send createStream command to rtmp server.");
347            return Err(RtmpCreateStream.into());
348        }
349
350        // send publish
351        let mut arguments = Vec::new();
352        arguments.push(Amf0Value::Utf8String(stream_key));
353        arguments.push(Amf0Value::Utf8String("live".into()));
354        let create_stream_cmd = RtmpMessage::Amf0Command {
355            command_name: "publish".to_string(),
356            transaction_id: 3.0,
357            command_object: Amf0Value::Null,
358            additional_arguments: arguments,
359        }
360        .into_message_payload(RtmpTimestamp { value: 0 }, 1)
361        .unwrap();
362
363        let create_stream_packet = serializer
364            .serialize(&create_stream_cmd, false, true)
365            .unwrap();
366        if let Err(_) = sender.send(create_stream_packet.bytes) {
367            error!("Can't send publish command to rtmp server.");
368            return Err(RtmpCreateStream.into());
369        }
370        Ok(sender)
371    }
372
373    /// Stops the RTMP server by signaling the listening and connection-handling threads
374    /// to terminate. Once called, new incoming connections will be ignored, and existing
375    /// threads will exit gracefully.
376    ///
377    /// # Example
378    /// ```rust
379    /// let server = EmbedRtmpServer::new("localhost:1935");
380    /// // ... start and handle streaming
381    /// server.stop();
382    /// assert!(server.is_stopped());
383    /// ```
384    pub fn stop(self) -> EmbedRtmpServer<Ended> {
385        self.status.store(STATUS_END, Ordering::Release);
386        self.into_state()
387    }
388}
389
390fn handle_connections(
391    connection_receiver: crossbeam_channel::Receiver<TcpStream>,
392    publisher_receiver: crossbeam_channel::Receiver<(String, crossbeam_channel::Receiver<Vec<u8>>)>,
393    stream_keys: dashmap::DashSet<String>,
394    gop_limit: usize,
395    status: Arc<AtomicUsize>,
396) {
397    let mut connections = Slab::new();
398    let mut publishers = Slab::new();
399    let mut scheduler = RtmpScheduler::new(gop_limit);
400
401    loop {
402        crossbeam::channel::select! {
403            // receive new tcp connection
404            recv(connection_receiver) -> msg => match msg {
405                Ok(stream) => {
406                    let entry = connections.vacant_entry();
407                    let connection_id = entry.key();
408                    let result = RtmpConnection::new(connection_id, stream);
409                    match result {
410                        Ok(connection) => {
411                            entry.insert(connection);
412                            debug!("Rtmp connection {connection_id} started");
413                        }
414                        Err(e) => debug!("Rtmp connection error: {e:?}"),
415                    }
416                }
417                Err(_) => {
418                    debug!("Embed rtmp server disconnected.");
419                    return;
420                }
421            },
422            // receive new publisher
423            recv(publisher_receiver) -> msg => match msg {
424                Ok((stream_key, bytes_receiver)) => {
425                    let entry = publishers.vacant_entry();
426                    let connection_id = entry.key();
427
428                    if scheduler.new_channel(stream_key.clone(), connection_id) {
429                        entry.insert((stream_key, bytes_receiver));
430                        debug!("Publisher {connection_id} started");
431                    }
432                }
433                Err(_) => {
434                    error!("Embed rtmp server publisher_sender closed.");
435                    return;
436                }
437            },
438            default(Duration::from_millis(5)) => {}
439        }
440
441        if status.load(Ordering::Acquire) == STATUS_END {
442            info!("Embed rtmp server stopped.");
443            break;
444        }
445
446        let mut packets_to_write = Vec::new();
447        let mut publisher_ids_to_clear = Vec::new();
448        let mut ids_to_clear = Vec::new();
449        for (connection_id, (_stream_key, bytes_receiver)) in publishers.iter_mut() {
450            loop {
451                match bytes_receiver.try_recv() {
452                    Err(crossbeam_channel::TryRecvError::Disconnected) => {
453                        debug!("Rtmp publisher closed for id {connection_id}");
454                        publisher_ids_to_clear.push(connection_id);
455
456                        let mut arguments = Vec::new();
457                        arguments.push(Amf0Value::Number(1.0));
458                        let create_stream_cmd = RtmpMessage::Amf0Command {
459                            command_name: "deleteStream".to_string(),
460                            transaction_id: 4.0,
461                            command_object: Amf0Value::Null,
462                            additional_arguments: arguments,
463                        }
464                        .into_message_payload(RtmpTimestamp { value: 0 }, 1)
465                        .unwrap();
466
467                        let mut serializer = ChunkSerializer::new();
468                        let create_stream_packet = serializer
469                            .serialize(&create_stream_cmd, false, true)
470                            .unwrap();
471
472                        let server_results = match scheduler
473                            .publish_bytes_received(connection_id, create_stream_packet.bytes)
474                        {
475                            Ok(results) => results,
476                            Err(_) => {
477                                break;
478                            }
479                        };
480
481                        for result in server_results.into_iter() {
482                            match result {
483                                ServerResult::OutboundPacket {
484                                    target_connection_id,
485                                    packet,
486                                } => {
487                                    packets_to_write.push((target_connection_id, packet));
488                                }
489
490                                ServerResult::DisconnectConnection {
491                                    connection_id: id_to_close,
492                                } => {
493                                    ids_to_clear.push(id_to_close);
494                                }
495                            }
496                        }
497                        break;
498                    }
499                    Err(crossbeam_channel::TryRecvError::Empty) => break,
500                    Ok(bytes) => {
501                        let server_results =
502                            match scheduler.publish_bytes_received(connection_id, bytes) {
503                                Ok(results) => results,
504                                Err(error) => {
505                                    debug!("Input caused the following server error: {}", error);
506                                    publisher_ids_to_clear.push(connection_id);
507                                    break;
508                                }
509                            };
510
511                        for result in server_results.into_iter() {
512                            match result {
513                                ServerResult::OutboundPacket {
514                                    target_connection_id,
515                                    packet,
516                                } => {
517                                    packets_to_write.push((target_connection_id, packet));
518                                }
519
520                                ServerResult::DisconnectConnection {
521                                    connection_id: id_to_close,
522                                } => {
523                                    ids_to_clear.push(id_to_close);
524                                }
525                            }
526                        }
527                    }
528                }
529            }
530        }
531
532        for (connection_id, connection) in connections.iter_mut() {
533            loop {
534                match connection.read() {
535                    Err(ConnectionError::SocketClosed) => {
536                        debug!("Rtmp socket closed for id {connection_id}");
537                        ids_to_clear.push(connection_id);
538                        break;
539                    }
540                    Err(error) => {
541                        debug!(
542                            "I/O error while reading rtmp connection {connection_id}: {:?}",
543                            error
544                        );
545                        ids_to_clear.push(connection_id);
546                        break;
547                    }
548                    Ok(result) => match result {
549                        ReadResult::NoBytesReceived => break,
550                        ReadResult::HandshakingInProgress => break,
551                        ReadResult::BytesReceived { buffer, byte_count } => {
552                            let server_results =
553                                match scheduler.bytes_received(connection_id, &buffer[..byte_count]) {
554                                    Ok(results) => results,
555                                    Err(error) => {
556                                        debug!("Rtmp input caused the following server error: {error}");
557                                        ids_to_clear.push(connection_id);
558                                        break;
559                                    }
560                                };
561
562                            for result in server_results.into_iter() {
563                                match result {
564                                    ServerResult::OutboundPacket {
565                                        target_connection_id,
566                                        packet,
567                                    } => {
568                                        packets_to_write.push((target_connection_id, packet));
569                                    }
570
571                                    ServerResult::DisconnectConnection {
572                                        connection_id: id_to_close,
573                                    } => {
574                                        ids_to_clear.push(id_to_close);
575                                    }
576                                }
577                            }
578                        }
579                    },
580                }
581            }
582        }
583
584        for publisher_id in publisher_ids_to_clear {
585            debug!("Rtmp publisher {publisher_id} closed");
586            let (stream_key, _bytes_receiver) = publishers.remove(publisher_id);
587            scheduler.notify_publisher_closed(publisher_id);
588            stream_keys.remove(&stream_key);
589        }
590
591        for (connection_id, packet) in packets_to_write.into_iter() {
592            if let Some(connection) = connections.get_mut(connection_id) {
593                connection.write(packet.bytes);
594            }
595        }
596
597        for closed_id in ids_to_clear {
598            debug!("Rtmp connection {closed_id} closed");
599            let _ = connections.try_remove(closed_id);
600            scheduler.notify_connection_closed(closed_id);
601        }
602    }
603
604    if status.load(Ordering::Acquire) != STATUS_END {
605        error!("Rtmp Server aborted.");
606    }
607}
608
609pub fn flv_tag_to_message_payload(flv_tag: FlvTag) -> MessagePayload {
610    let timestamp = flv_tag.header.timestamp | ((flv_tag.header.timestamp_ext as u32) << 24);
611
612    let type_id = flv_tag.header.tag_type;
613    let message_stream_id = flv_tag.header.stream_id;
614
615    let data = if type_id == 0x12 {
616        wrap_metadata(flv_tag.data)
617    } else {
618        flv_tag.data
619    };
620
621    MessagePayload {
622        timestamp: RtmpTimestamp { value: timestamp },
623        type_id,
624        message_stream_id,
625        data,
626    }
627}
628
629fn wrap_metadata(data: Bytes) -> Bytes {
630    let s = "@setDataFrame";
631
632    let insert_len = 16;
633
634    let mut bytes = bytes::BytesMut::with_capacity(insert_len + data.len());
635
636    bytes.put_u8(0x02);
637    bytes.put_u16(s.len() as u16);
638    bytes.put(s.as_bytes());
639
640    bytes.put(data);
641
642    bytes.freeze()
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::core::context::ffmpeg_context::FfmpegContext;
649    use crate::core::context::input::Input;
650    use crate::core::context::output::Output;
651    use crate::core::scheduler::ffmpeg_scheduler::FfmpegScheduler;
652    use ffmpeg_next::time::current;
653    use std::thread::sleep;
654
655    #[test]
656    fn test_concat_stream_loop() {
657        let _ = env_logger::builder()
658            .filter_level(log::LevelFilter::Trace)
659            .is_test(true)
660            .try_init();
661
662        let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
663        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
664
665        let output = embed_rtmp_server
666            .create_rtmp_input("my-app", "my-stream")
667            .unwrap();
668
669        let start = current();
670
671        let result = FfmpegContext::builder()
672            .input(Input::from("test.mp4")
673                .set_readrate(1.0)
674                .set_stream_loop(3)
675            )
676            .input(
677                Input::from("test.mp4")
678                    .set_readrate(1.0)
679                    .set_stream_loop(3)
680            )
681            .input(
682                Input::from("test.mp4")
683                    .set_readrate(1.0)
684                    .set_stream_loop(3)
685            )
686            .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
687            .output(output)
688            .build()
689            .unwrap()
690            .start()
691            .unwrap()
692            .wait();
693
694        assert!(result.is_ok());
695        info!("elapsed time: {}", current() - start);
696    }
697
698    #[test]
699    fn test_stream_loop() {
700        let _ = env_logger::builder()
701            .filter_level(log::LevelFilter::Trace)
702            .is_test(true)
703            .try_init();
704
705        let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
706        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
707
708        let output = embed_rtmp_server
709            .create_rtmp_input("my-app", "my-stream")
710            .unwrap();
711
712        let start = current();
713
714        let result = FfmpegContext::builder()
715            .input(Input::from("test.mp4").set_readrate(1.0).set_stream_loop(-1))
716            // .filter_desc("hue=s=0")
717            .output(output.set_video_codec("h264_videotoolbox"))
718            .build()
719            .unwrap()
720            .start()
721            .unwrap()
722            .wait();
723
724        assert!(result.is_ok());
725
726        info!("elapsed time: {}", current() - start);
727    }
728
729    #[test]
730    fn test_concat_realtime() {
731        let _ = env_logger::builder()
732            .filter_level(log::LevelFilter::Trace)
733            .is_test(true)
734            .try_init();
735
736        let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
737        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
738
739        let output = embed_rtmp_server
740            .create_rtmp_input("my-app", "my-stream")
741            .unwrap();
742
743        let start = current();
744
745        let result = FfmpegContext::builder()
746            .independent_readrate()
747            .input(Input::from("test.mp4").set_readrate(1.0))
748            .input(
749                Input::from("test.mp4")
750                    .set_readrate(1.0)
751            )
752            .input(
753                Input::from("test.mp4")
754                    .set_readrate(1.0)
755            )
756            .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
757            .output(output)
758            .build()
759            .unwrap()
760            .start()
761            .unwrap()
762            .wait();
763
764        assert!(result.is_ok());
765
766        sleep(Duration::from_secs(1));
767        info!("elapsed time: {}", current() - start);
768    }
769
770    #[test]
771    fn test_realtime() {
772        let _ = env_logger::builder()
773            .filter_level(log::LevelFilter::Trace)
774            .is_test(true)
775            .try_init();
776
777        let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
778        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
779
780        let output = embed_rtmp_server
781            .create_rtmp_input("my-app", "my-stream")
782            .unwrap();
783
784        let start = current();
785
786        let result = FfmpegContext::builder()
787            .input(Input::from("test.mp4").set_readrate(1.0))
788            .output(output)
789            .build()
790            .unwrap()
791            .start()
792            .unwrap()
793            .wait();
794
795        assert!(result.is_ok());
796
797        info!("elapsed time: {}", current() - start);
798    }
799
800    #[test]
801    fn test_readrate() {
802        let _ = env_logger::builder()
803            .filter_level(log::LevelFilter::Trace)
804            .is_test(true)
805            .try_init();
806
807        let mut output: Output = "output.flv".into();
808        output.audio_codec = Some("adpcm_swf".to_string());
809
810        let mut input: Input = "test.mp4".into();
811        input.readrate = Some(1.0);
812
813        let context = FfmpegContext::builder()
814            .input(input)
815            .output(output)
816            .build()
817            .unwrap();
818
819        let result = FfmpegScheduler::new(context).start().unwrap().wait();
820        if let Err(error) = result {
821            println!("Error: {error}");
822        }
823    }
824
825    #[test]
826    fn test_embed_rtmp_server() {
827        let _ = env_logger::builder()
828            .filter_level(log::LevelFilter::Trace)
829            .is_test(true)
830            .try_init();
831
832        let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
833        let embed_rtmp_server = embed_rtmp_server.start().unwrap();
834
835        let output = embed_rtmp_server
836            .create_rtmp_input("my-app", "my-stream")
837            .unwrap();
838        let mut input: Input = "test.mp4".into();
839        input.readrate = Some(1.0);
840
841        let context = FfmpegContext::builder()
842            .input(input)
843            .output(output)
844            .build()
845            .unwrap();
846
847        let result = FfmpegScheduler::new(context).start().unwrap().wait();
848
849        assert!(result.is_ok());
850
851        sleep(Duration::from_secs(3));
852    }
853}