1use crate::core::context::output::Output;
2use crate::error::Error::{RtmpCreateStream, RtmpStreamAlreadyExists};
3use crate::flv::flv_buffer::FlvBuffer;
4use crate::flv::flv_tag::FlvTag;
5use crate::rtmp::reactor::{effective_max_connections, Reactor, CHANNEL_HEADROOM};
6use bytes::{BufMut, Bytes};
7use log::{debug, error, info, warn};
8use rml_rtmp::chunk_io::ChunkSerializer;
9use rml_rtmp::messages::{MessagePayload, RtmpMessage};
10use rml_rtmp::rml_amf0::Amf0Value;
11use rml_rtmp::time::RtmpTimestamp;
12use std::collections::HashMap;
13use std::marker::PhantomData;
14use std::net::{Shutdown, TcpListener, TcpStream};
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::Arc;
17
18#[derive(Clone)]
19pub struct Initialization;
20#[derive(Clone)]
21pub struct Running;
22#[derive(Clone)]
23pub struct Ended;
24
25#[derive(Clone)]
26pub struct EmbedRtmpServer<S> {
27 address: String,
28 bound_addr: Option<std::net::SocketAddr>,
29 status: Arc<AtomicUsize>,
30 stream_keys: dashmap::DashSet<String>,
31 publisher_sender: Option<crossbeam_channel::Sender<(String, crossbeam_channel::Receiver<Vec<u8>>)>>,
33 gop_limit: usize,
34 max_connections: Option<usize>,
35 state: PhantomData<S>,
36}
37
38const STATUS_INIT: usize = 0;
39const STATUS_RUN: usize = 1;
40const STATUS_END: usize = 2;
41
42impl<S: 'static> EmbedRtmpServer<S> {
43 fn into_state<T>(self) -> EmbedRtmpServer<T> {
44 EmbedRtmpServer {
45 address: self.address,
46 bound_addr: self.bound_addr,
47 status: self.status,
48 stream_keys: self.stream_keys,
49 publisher_sender: self.publisher_sender,
50 gop_limit: self.gop_limit,
51 max_connections: self.max_connections,
52 state: Default::default(),
53 }
54 }
55
56 pub fn is_stopped(&self) -> bool {
64 self.status.load(Ordering::Acquire) == STATUS_END
65 }
66}
67
68impl EmbedRtmpServer<Initialization> {
69 pub fn new(address: impl Into<String>) -> EmbedRtmpServer<Initialization> {
81 Self::new_with_gop_limit(address, 1)
82 }
83
84 pub fn new_with_gop_limit(address: impl Into<String>, gop_limit: usize) -> EmbedRtmpServer<Initialization> {
103 Self {
104 address: address.into(),
105 bound_addr: None,
106 status: Arc::new(AtomicUsize::new(STATUS_INIT)),
107 stream_keys: Default::default(),
108 publisher_sender: None,
109 gop_limit,
110 max_connections: None,
111 state: Default::default(),
112 }
113 }
114
115 pub fn set_max_connections(mut self, max_connections: usize) -> Self {
128 self.max_connections = Some(max_connections);
129 self
130 }
131
132 pub fn start(mut self) -> crate::error::Result<EmbedRtmpServer<Running>> {
141 let listener = TcpListener::bind(self.address.clone())
142 .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
143
144 let actual_addr = listener.local_addr()
146 .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
147 self.bound_addr = Some(actual_addr);
148
149 listener
150 .set_nonblocking(true)
151 .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
152
153 self.status.store(STATUS_RUN, Ordering::Release);
154
155 let effective_max = effective_max_connections(self.max_connections);
158 let channel_capacity = effective_max.saturating_add(CHANNEL_HEADROOM);
159 let (stream_sender, stream_receiver) = crossbeam_channel::bounded(channel_capacity);
160 let (publisher_sender, publisher_receiver) = crossbeam_channel::bounded(1024);
161 self.publisher_sender = Some(publisher_sender);
162 let stream_keys = self.stream_keys.clone();
163 let status = self.status.clone();
164 let max_connections = self.max_connections;
165 let result = std::thread::Builder::new()
166 .name("rtmp-server-worker".to_string())
167 .spawn(move || handle_connections(stream_receiver, publisher_receiver, stream_keys, self.gop_limit, max_connections, status));
168 if let Err(e) = result {
169 error!("Thread[rtmp-server-worker] exited with error: {e}");
170 return Err(crate::error::Error::RtmpThreadExited);
171 }
172
173 info!(
174 "Embed rtmp server listening for connections on {} (actual: {}, max_connections: {}).",
175 &self.address, actual_addr, effective_max
176 );
177
178 let status = self.status.clone();
179 let result = std::thread::Builder::new()
180 .name("rtmp-server-io".to_string())
181 .spawn(move || {
182 for stream in listener.incoming() {
183 match stream {
184 Ok(stream) => {
185 match stream_sender.try_send(stream) {
187 Ok(_) => {
188 debug!("New rtmp connection accepted.");
189 }
190 Err(crossbeam_channel::TrySendError::Full(s)) => {
191 let _ = s.shutdown(Shutdown::Both);
193 debug!("Connection rejected: server at capacity (channel full)");
194 }
195 Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
196 error!("Connection channel disconnected");
197 status.store(STATUS_END, Ordering::Release);
198 return;
199 }
200 }
201 }
202 Err(e) => {
203 if e.kind() == std::io::ErrorKind::WouldBlock {
204 if status.load(Ordering::Acquire) == STATUS_END {
205 info!("Embed rtmp server stopped.");
206 break;
207 }
208 std::thread::sleep(std::time::Duration::from_millis(100));
209 } else {
210 debug!("Rtmp connection error: {:?}", e);
211 }
212 }
213 }
214 }
215 });
216 if let Err(e) = result {
217 error!("Thread[rtmp-server-io] exited with error: {e}");
218 return Err(crate::error::Error::RtmpThreadExited);
219 }
220
221 Ok(self.into_state())
222 }
223}
224
225impl EmbedRtmpServer<Running> {
226 pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
243 self.bound_addr
244 }
245
246 pub fn create_rtmp_input(
294 &self,
295 app_name: impl Into<String>,
296 stream_key: impl Into<String>,
297 ) -> crate::error::Result<Output> {
298 let message_sender = self.create_stream_sender(app_name, stream_key)?;
299
300 let mut flv_buffer = FlvBuffer::new();
301 let mut serializer = ChunkSerializer::new();
302 let write_callback: Box<dyn FnMut(&[u8]) -> i32> = Box::new(move |buf: &[u8]| -> i32 {
303 flv_buffer.write_data(buf);
304 if let Some(mut flv_tag) = flv_buffer.get_flv_tag() {
305 flv_tag.header.stream_id = 1;
306 match serializer.serialize(&flv_tag_to_message_payload(flv_tag), false, true) {
307 Ok(packet) => {
308 if let Err(e) = message_sender.send(packet.bytes) {
309 error!("Failed to send RTMP packet: {:?}", e);
310 return -1;
311 }
312 }
313 Err(e) => {
314 error!("Failed to serialize RTMP message: {:?}", e);
315 return -1;
316 }
317 }
318 }
319 buf.len() as i32
320 });
321
322 let output: Output = write_callback.into();
323
324 Ok(output
325 .set_format("flv")
326 .set_video_codec("h264")
327 .set_audio_codec("aac")
328 .set_format_opt("flvflags", "no_duration_filesize"))
329 }
330
331 pub fn create_stream_sender(
353 &self,
354 app_name: impl Into<String>,
355 stream_key: impl Into<String>,
356 ) -> crate::error::Result<crossbeam_channel::Sender<Vec<u8>>> {
357 let stream_key = stream_key.into();
358 if self.stream_keys.contains(&stream_key) {
359 return Err(RtmpStreamAlreadyExists(stream_key));
360 }
361
362 let (sender, receiver) = crossbeam_channel::bounded(1024);
363
364 let publisher_sender = match self.publisher_sender.as_ref() {
365 Some(sender) => sender,
366 None => {
367 error!("Publisher sender not initialized");
368 return Err(RtmpCreateStream.into());
369 }
370 };
371
372 if let Err(_) = publisher_sender.send((stream_key.clone(), receiver)) {
373 if self.status.load(Ordering::Acquire) != STATUS_END {
374 warn!("Rtmp server worker already exited. Can't create stream sender.");
375 } else {
376 error!("Rtmp Server aborted. Can't create stream sender.");
377 }
378 return Err(RtmpCreateStream.into());
379 }
380
381 let mut serializer = ChunkSerializer::new();
382
383 let mut properties: HashMap<String, Amf0Value> = HashMap::new();
385 properties.insert("app".to_string(), Amf0Value::Utf8String(app_name.into()));
386 let connect_cmd = RtmpMessage::Amf0Command {
387 command_name: "connect".to_string(),
388 transaction_id: 1.0,
389 command_object: Amf0Value::Object(properties),
390 additional_arguments: Vec::new(),
391 }
392 .into_message_payload(RtmpTimestamp { value: 0 }, 0);
393
394 let connect_cmd = match connect_cmd {
395 Ok(cmd) => cmd,
396 Err(e) => {
397 error!("Failed to create connect command: {:?}", e);
398 return Err(RtmpCreateStream.into());
399 }
400 };
401
402 let connect_packet = match serializer.serialize(&connect_cmd, false, true) {
403 Ok(packet) => packet,
404 Err(e) => {
405 error!("Failed to serialize connect command: {:?}", e);
406 return Err(RtmpCreateStream.into());
407 }
408 };
409
410 if let Err(_) = sender.send(connect_packet.bytes) {
411 error!("Can't send connect command to rtmp server.");
412 return Err(RtmpCreateStream.into());
413 }
414
415 let create_stream_cmd = RtmpMessage::Amf0Command {
417 command_name: "createStream".to_string(),
418 transaction_id: 2.0,
419 command_object: Amf0Value::Null,
420 additional_arguments: Vec::new(),
421 }
422 .into_message_payload(RtmpTimestamp { value: 0 }, 1);
423
424 let create_stream_cmd = match create_stream_cmd {
425 Ok(cmd) => cmd,
426 Err(e) => {
427 error!("Failed to create createStream command: {:?}", e);
428 return Err(RtmpCreateStream.into());
429 }
430 };
431
432 let create_stream_packet = match serializer.serialize(&create_stream_cmd, false, true) {
433 Ok(packet) => packet,
434 Err(e) => {
435 error!("Failed to serialize createStream command: {:?}", e);
436 return Err(RtmpCreateStream.into());
437 }
438 };
439
440 if let Err(_) = sender.send(create_stream_packet.bytes) {
441 error!("Can't send createStream command to rtmp server.");
442 return Err(RtmpCreateStream.into());
443 }
444
445 let mut arguments = Vec::new();
447 arguments.push(Amf0Value::Utf8String(stream_key));
448 arguments.push(Amf0Value::Utf8String("live".into()));
449 let publish_cmd = RtmpMessage::Amf0Command {
450 command_name: "publish".to_string(),
451 transaction_id: 3.0,
452 command_object: Amf0Value::Null,
453 additional_arguments: arguments,
454 }
455 .into_message_payload(RtmpTimestamp { value: 0 }, 1);
456
457 let publish_cmd = match publish_cmd {
458 Ok(cmd) => cmd,
459 Err(e) => {
460 error!("Failed to create publish command: {:?}", e);
461 return Err(RtmpCreateStream.into());
462 }
463 };
464
465 let publish_packet = match serializer.serialize(&publish_cmd, false, true) {
466 Ok(packet) => packet,
467 Err(e) => {
468 error!("Failed to serialize publish command: {:?}", e);
469 return Err(RtmpCreateStream.into());
470 }
471 };
472
473 if let Err(_) = sender.send(publish_packet.bytes) {
474 error!("Can't send publish command to rtmp server.");
475 return Err(RtmpCreateStream.into());
476 }
477 Ok(sender)
478 }
479
480 pub fn stop(self) -> EmbedRtmpServer<Ended> {
492 self.status.store(STATUS_END, Ordering::Release);
493 self.into_state()
494 }
495}
496
497fn handle_connections(
504 connection_receiver: crossbeam_channel::Receiver<TcpStream>,
505 publisher_receiver: crossbeam_channel::Receiver<(String, crossbeam_channel::Receiver<Vec<u8>>)>,
506 stream_keys: dashmap::DashSet<String>,
507 gop_limit: usize,
508 max_connections: Option<usize>,
509 status: Arc<AtomicUsize>,
510) {
511 let mut reactor = match Reactor::new(gop_limit, max_connections, stream_keys, status.clone()) {
513 Ok(r) => r,
514 Err(e) => {
515 error!("Failed to create Reactor: {:?}", e);
516 status.store(STATUS_END, Ordering::Release);
517 return;
518 }
519 };
520
521 reactor.run(connection_receiver, publisher_receiver);
523
524 if status.load(Ordering::Acquire) != STATUS_END {
525 error!("Rtmp Server aborted.");
526 }
527}
528
529pub fn flv_tag_to_message_payload(flv_tag: FlvTag) -> MessagePayload {
530 let timestamp = flv_tag.header.timestamp | ((flv_tag.header.timestamp_ext as u32) << 24);
531
532 let type_id = flv_tag.header.tag_type;
533 let message_stream_id = flv_tag.header.stream_id;
534
535 let data = if type_id == 0x12 {
536 wrap_metadata(flv_tag.data)
537 } else {
538 flv_tag.data
539 };
540
541 MessagePayload {
542 timestamp: RtmpTimestamp { value: timestamp },
543 type_id,
544 message_stream_id,
545 data,
546 }
547}
548
549fn wrap_metadata(data: Bytes) -> Bytes {
550 let s = "@setDataFrame";
551
552 let insert_len = 16;
553
554 let mut bytes = bytes::BytesMut::with_capacity(insert_len + data.len());
555
556 bytes.put_u8(0x02);
557 bytes.put_u16(s.len() as u16);
558 bytes.put(s.as_bytes());
559
560 bytes.put(data);
561
562 bytes.freeze()
563}
564
565
566use crate::core::context::ffmpeg_context::FfmpegContext;
571use crate::core::context::input::Input;
572use crate::core::scheduler::ffmpeg_scheduler::{FfmpegScheduler, Running as SchedulerRunning};
573use crate::error::StreamError;
574use std::path::{Path, PathBuf};
575
576pub struct StreamBuilder {
597 address: Option<String>,
598 app_name: Option<String>,
599 stream_key: Option<String>,
600 input_file: Option<PathBuf>,
601 readrate: Option<f32>,
602 gop_limit: Option<usize>,
603 max_connections: Option<usize>,
604}
605
606impl Default for StreamBuilder {
607 fn default() -> Self {
608 Self::new()
609 }
610}
611
612impl StreamBuilder {
613 pub fn new() -> Self {
619 Self {
620 address: None,
621 app_name: None,
622 stream_key: None,
623 input_file: None,
624 readrate: Some(1.0), gop_limit: None,
626 max_connections: None,
627 }
628 }
629
630 pub fn address(mut self, address: impl Into<String>) -> Self {
632 self.address = Some(address.into());
633 self
634 }
635
636 pub fn app_name(mut self, app_name: impl Into<String>) -> Self {
638 self.app_name = Some(app_name.into());
639 self
640 }
641
642 pub fn stream_key(mut self, stream_key: impl Into<String>) -> Self {
644 self.stream_key = Some(stream_key.into());
645 self
646 }
647
648 pub fn input_file(mut self, path: impl AsRef<Path>) -> Self {
650 self.input_file = Some(path.as_ref().to_path_buf());
651 self
652 }
653
654 pub fn readrate(mut self, rate: f32) -> Self {
659 self.readrate = Some(rate);
660 self
661 }
662
663 pub fn gop_limit(mut self, limit: usize) -> Self {
667 self.gop_limit = Some(limit);
668 self
669 }
670
671 pub fn max_connections(mut self, max: usize) -> Self {
673 self.max_connections = Some(max);
674 self
675 }
676
677 pub fn start(self) -> Result<StreamHandle, StreamError> {
701 let address = self
703 .address
704 .ok_or(StreamError::MissingParameter("address"))?;
705 let app_name = self
706 .app_name
707 .ok_or(StreamError::MissingParameter("app_name"))?;
708 let stream_key = self
709 .stream_key
710 .ok_or(StreamError::MissingParameter("stream_key"))?;
711 let input_file = self
712 .input_file
713 .ok_or(StreamError::MissingParameter("input_file"))?;
714
715 if !input_file.is_file() {
717 return Err(StreamError::InputNotFound { path: input_file });
718 }
719
720 let mut server = if let Some(gop_limit) = self.gop_limit {
722 EmbedRtmpServer::new_with_gop_limit(&address, gop_limit)
723 } else {
724 EmbedRtmpServer::new(&address)
725 };
726
727 if let Some(max_conn) = self.max_connections {
728 server = server.set_max_connections(max_conn);
729 }
730
731 let server = server.start().map_err(StreamError::Ffmpeg)?;
733 let server = Arc::new(server);
734
735 let output = server
737 .create_rtmp_input(&app_name, &stream_key)
738 .map_err(StreamError::Ffmpeg)?;
739
740 let input_path = input_file.to_string_lossy().to_string();
742 let mut input = Input::from(input_path);
743 if let Some(rate) = self.readrate {
744 input = input.set_readrate(rate);
745 }
746
747 let scheduler = FfmpegContext::builder()
749 .input(input)
750 .output(output)
751 .build()
752 .map_err(StreamError::Ffmpeg)?
753 .start()
754 .map_err(StreamError::Ffmpeg)?;
755
756 Ok(StreamHandle {
757 _server: server,
758 scheduler: Some(scheduler),
759 })
760 }
761}
762
763pub struct StreamHandle {
782 _server: Arc<EmbedRtmpServer<Running>>,
783 scheduler: Option<FfmpegScheduler<SchedulerRunning>>,
784}
785
786impl StreamHandle {
787 pub fn wait(mut self) -> Result<(), StreamError> {
797 if let Some(scheduler) = self.scheduler.take() {
798 scheduler.wait().map_err(StreamError::Ffmpeg)?;
799 }
800 Ok(())
801 }
802}
803
804impl Drop for StreamHandle {
805 fn drop(&mut self) {
806 if let Some(scheduler) = self.scheduler.take() {
810 let _ = scheduler.wait();
812 }
813 }
814}
815
816impl EmbedRtmpServer<Initialization> {
817 pub fn stream_builder() -> StreamBuilder {
846 StreamBuilder::new()
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use crate::core::context::ffmpeg_context::FfmpegContext;
854 use crate::core::context::input::Input;
855 use crate::core::context::output::Output;
856 use crate::core::scheduler::ffmpeg_scheduler::FfmpegScheduler;
857 use ffmpeg_next::time::current;
858 use std::thread::sleep;
859 use std::time::Duration;
860
861 #[test]
862 #[ignore] fn test_concat_stream_loop() {
864 let _ = env_logger::builder()
865 .filter_level(log::LevelFilter::Trace)
866 .is_test(true)
867 .try_init();
868
869 let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
870 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
871
872 let output = embed_rtmp_server
873 .create_rtmp_input("my-app", "my-stream")
874 .unwrap();
875
876 let start = current();
877
878 let result = FfmpegContext::builder()
879 .input(Input::from("test.mp4")
880 .set_readrate(1.0)
881 .set_stream_loop(3)
882 )
883 .input(
884 Input::from("test.mp4")
885 .set_readrate(1.0)
886 .set_stream_loop(3)
887 )
888 .input(
889 Input::from("test.mp4")
890 .set_readrate(1.0)
891 .set_stream_loop(3)
892 )
893 .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
894 .output(output)
895 .build()
896 .unwrap()
897 .start()
898 .unwrap()
899 .wait();
900
901 assert!(result.is_ok());
902 info!("elapsed time: {}", current() - start);
903 }
904
905 #[test]
906 #[ignore] fn test_stream_loop() {
908 let _ = env_logger::builder()
909 .filter_level(log::LevelFilter::Trace)
910 .is_test(true)
911 .try_init();
912
913 let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
914 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
915
916 let output = embed_rtmp_server
917 .create_rtmp_input("my-app", "my-stream")
918 .unwrap();
919
920 let start = current();
921
922 let result = FfmpegContext::builder()
923 .input(Input::from("test.mp4").set_readrate(1.0).set_stream_loop(-1))
924 .output(output.set_video_codec("h264_videotoolbox"))
926 .build()
927 .unwrap()
928 .start()
929 .unwrap()
930 .wait();
931
932 assert!(result.is_ok());
933
934 info!("elapsed time: {}", current() - start);
935 }
936
937 #[test]
938 #[ignore] fn test_concat_realtime() {
940 let _ = env_logger::builder()
941 .filter_level(log::LevelFilter::Trace)
942 .is_test(true)
943 .try_init();
944
945 let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
946 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
947
948 let output = embed_rtmp_server
949 .create_rtmp_input("my-app", "my-stream")
950 .unwrap();
951
952 let start = current();
953
954 let result = FfmpegContext::builder()
955 .independent_readrate()
956 .input(Input::from("test.mp4").set_readrate(1.0))
957 .input(
958 Input::from("test.mp4")
959 .set_readrate(1.0)
960 )
961 .input(
962 Input::from("test.mp4")
963 .set_readrate(1.0)
964 )
965 .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
966 .output(output)
967 .build()
968 .unwrap()
969 .start()
970 .unwrap()
971 .wait();
972
973 assert!(result.is_ok());
974
975 sleep(Duration::from_secs(1));
976 info!("elapsed time: {}", current() - start);
977 }
978
979 #[test]
980 #[ignore] fn test_realtime() {
982 let _ = env_logger::builder()
983 .filter_level(log::LevelFilter::Trace)
984 .is_test(true)
985 .try_init();
986
987 let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
988 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
989
990 let output = embed_rtmp_server
991 .create_rtmp_input("my-app", "my-stream")
992 .unwrap();
993
994 let start = current();
995
996 let result = FfmpegContext::builder()
997 .input(Input::from("test.mp4").set_readrate(1.0))
998 .output(output)
999 .build()
1000 .unwrap()
1001 .start()
1002 .unwrap()
1003 .wait();
1004
1005 assert!(result.is_ok());
1006
1007 info!("elapsed time: {}", current() - start);
1008 }
1009
1010 #[test]
1011 #[ignore] fn test_readrate() {
1013 let _ = env_logger::builder()
1014 .filter_level(log::LevelFilter::Trace)
1015 .is_test(true)
1016 .try_init();
1017
1018 let mut output: Output = "output.flv".into();
1019 output.audio_codec = Some("adpcm_swf".to_string());
1020
1021 let mut input: Input = "test.mp4".into();
1022 input.readrate = Some(1.0);
1023
1024 let context = FfmpegContext::builder()
1025 .input(input)
1026 .output(output)
1027 .build()
1028 .unwrap();
1029
1030 let result = FfmpegScheduler::new(context).start().unwrap().wait();
1031 if let Err(error) = result {
1032 println!("Error: {error}");
1033 }
1034 }
1035
1036 #[test]
1037 #[ignore] fn test_embed_rtmp_server() {
1039 let _ = env_logger::builder()
1040 .filter_level(log::LevelFilter::Trace)
1041 .is_test(true)
1042 .try_init();
1043
1044 let embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
1045 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
1046
1047 let output = embed_rtmp_server
1048 .create_rtmp_input("my-app", "my-stream")
1049 .unwrap();
1050 let mut input: Input = "test.mp4".into();
1051 input.readrate = Some(1.0);
1052
1053 let context = FfmpegContext::builder()
1054 .input(input)
1055 .output(output)
1056 .build()
1057 .unwrap();
1058
1059 let result = FfmpegScheduler::new(context).start().unwrap().wait();
1060
1061 assert!(result.is_ok());
1062
1063 sleep(Duration::from_secs(3));
1064 }
1065}