1use crate::core::context::output::Output;
2use crate::error::Error::{RtmpCreateStream, RtmpStreamAlreadyExists};
3use crate::flv::flv_buffer::FlvBuffer;
4use crate::flv::flv_tag::FlvTag;
5use crate::rtmp::rtmp_connection::{ConnectionError, ReadResult, RtmpConnection};
6use crate::rtmp::rtmp_scheduler::{RtmpScheduler, ServerResult};
7use bytes::{BufMut, Bytes};
8use log::{debug, error, info, warn};
9use rml_rtmp::chunk_io::ChunkSerializer;
10use rml_rtmp::messages::{MessagePayload, RtmpMessage};
11use rml_rtmp::rml_amf0::Amf0Value;
12use rml_rtmp::time::RtmpTimestamp;
13use slab::Slab;
14use std::collections::HashMap;
15use std::marker::PhantomData;
16use std::net::{TcpListener, TcpStream};
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::Duration;
20use crate::core::scheduler::type_to_symbol;
21use crate::error::OpenDecoderOperationError;
22
23#[derive(Clone)]
24pub struct Initialization;
25#[derive(Clone)]
26pub struct Running;
27#[derive(Clone)]
28pub struct Ended;
29
30#[derive(Clone)]
31pub struct EmbedRtmpServer<S> {
32 address: String,
33 status: Arc<AtomicUsize>,
34 stream_keys: dashmap::DashSet<String>,
35 publisher_sender: Option<crossbeam_channel::Sender<(String, crossbeam_channel::Receiver<Vec<u8>>)>>,
37 gop_limit: usize,
38 state: PhantomData<S>,
39}
40
41const STATUS_INIT: usize = 0;
42const STATUS_RUN: usize = 1;
43const STATUS_END: usize = 2;
44
45impl<S: 'static> EmbedRtmpServer<S> {
46 fn is_state<T: 'static>(&self) -> bool {
47 std::any::TypeId::of::<S>() == std::any::TypeId::of::<T>()
48 }
49
50 fn into_state<T>(self) -> EmbedRtmpServer<T> {
51 EmbedRtmpServer {
52 address: self.address,
53 status: self.status,
54 stream_keys: self.stream_keys,
55 publisher_sender: self.publisher_sender,
56 gop_limit: self.gop_limit,
57 state: Default::default(),
58 }
59 }
60
61 pub fn is_stopped(&self) -> bool {
69 self.status.load(Ordering::Acquire) == STATUS_END
70 }
71}
72
73impl EmbedRtmpServer<Initialization> {
74 pub fn new(address: impl Into<String>) -> EmbedRtmpServer<Initialization> {
86 Self::new_with_gop_limit(address, 1)
87 }
88
89 pub fn new_with_gop_limit(address: impl Into<String>, gop_limit: usize) -> EmbedRtmpServer<Initialization> {
108 Self {
109 address: address.into(),
110 status: Arc::new(AtomicUsize::new(STATUS_INIT)),
111 stream_keys: Default::default(),
112 publisher_sender: None,
113 gop_limit,
114 state: Default::default(),
115 }
116 }
117
118 pub fn start(mut self) -> crate::error::Result<EmbedRtmpServer<Running>> {
127 let listener = TcpListener::bind(self.address.clone())
128 .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
129
130 listener
131 .set_nonblocking(true)
132 .map_err(|e| <std::io::Error as Into<crate::error::Error>>::into(e))?;
133
134 self.status.store(STATUS_RUN, Ordering::Release);
135
136 let (stream_sender, stream_receiver) = crossbeam_channel::unbounded();
137 let (publisher_sender, publisher_receiver) = crossbeam_channel::bounded(1024);
138 self.publisher_sender = Some(publisher_sender);
139 let stream_keys = self.stream_keys.clone();
140 let status = self.status.clone();
141 let result = std::thread::Builder::new()
142 .name("rtmp-server-worker".to_string())
143 .spawn(move || handle_connections(stream_receiver, publisher_receiver, stream_keys, self.gop_limit, status));
144 if let Err(e) = result {
145 error!("Thread[rtmp-server-worker] exited with error: {e}");
146 return Err(crate::error::Error::RtmpThreadExited);
147 }
148
149 info!(
150 "Embed rtmp server listening for connections on {}.",
151 &self.address
152 );
153
154 let status = self.status.clone();
155 let result = std::thread::Builder::new()
156 .name("rtmp-server-io".to_string())
157 .spawn(move || {
158 for stream in listener.incoming() {
159 match stream {
160 Ok(stream) => {
161 debug!("New rtmp connection.");
162 if let Err(_) = stream_sender.send(stream) {
163 error!("Error sending stream to rtmp connection handler");
164 status.store(STATUS_END, Ordering::Release);
165 return;
166 }
167 }
168 Err(e) => {
169 if e.kind() == std::io::ErrorKind::WouldBlock {
170 if status.load(Ordering::Acquire) == STATUS_END {
171 info!("Embed rtmp server stopped.");
172 break;
173 }
174 std::thread::sleep(std::time::Duration::from_millis(100));
175 } else {
176 debug!("Rtmp connection error: {:?}", e);
177 }
178 }
179 }
180 }
181 });
182 if let Err(e) = result {
183 error!("Thread[rtmp-server-io] exited with error: {e}");
184 return Err(crate::error::Error::RtmpThreadExited);
185 }
186
187 Ok(self.into_state())
188 }
189}
190
191impl EmbedRtmpServer<Running> {
192 pub fn create_rtmp_input(
240 &self,
241 app_name: impl Into<String>,
242 stream_key: impl Into<String>,
243 ) -> crate::error::Result<Output> {
244 let message_sender = self.create_stream_sender(app_name, stream_key)?;
245
246 let mut flv_buffer = FlvBuffer::new();
247 let mut serializer = ChunkSerializer::new();
248 let write_callback: Box<dyn FnMut(&[u8]) -> i32> = Box::new(move |buf: &[u8]| -> i32 {
249 flv_buffer.write_data(buf);
250 if let Some(mut flv_tag) = flv_buffer.get_flv_tag() {
251 flv_tag.header.stream_id = 1;
252 let packet = serializer
253 .serialize(&flv_tag_to_message_payload(flv_tag), false, true)
254 .unwrap();
255 message_sender.send(packet.bytes).unwrap();
256 }
257 buf.len() as i32
258 });
259
260 let mut output: Output = write_callback.into();
261
262 Ok(output
263 .set_format("flv")
264 .set_video_codec("h264")
265 .set_audio_codec("aac")
266 .set_format_opt("flvflags", "no_duration_filesize"))
267 }
268
269 pub fn create_stream_sender(
291 &self,
292 app_name: impl Into<String>,
293 stream_key: impl Into<String>,
294 ) -> crate::error::Result<crossbeam_channel::Sender<Vec<u8>>> {
295 let stream_key = stream_key.into();
296 if self.stream_keys.contains(&stream_key) {
297 return Err(RtmpStreamAlreadyExists(stream_key));
298 }
299
300 let (sender, receiver) = crossbeam_channel::unbounded();
301
302 let publisher_sender = self.publisher_sender.as_ref().unwrap();
303 if let Err(_) = publisher_sender.send((stream_key.clone(), receiver)) {
304 if self.status.load(Ordering::Acquire) != STATUS_END {
305 warn!("Rtmp server worker already exited. Can't create stream sender.");
306 } else {
307 error!("Rtmp Server aborted. Can't create stream sender.");
308 }
309 return Err(RtmpCreateStream.into());
310 }
311
312 let mut serializer = ChunkSerializer::new();
313
314 let mut properties: HashMap<String, Amf0Value> = HashMap::new();
316 properties.insert("app".to_string(), Amf0Value::Utf8String(app_name.into()));
317 let connect_cmd = RtmpMessage::Amf0Command {
318 command_name: "connect".to_string(),
319 transaction_id: 1.0,
320 command_object: Amf0Value::Object(properties),
321 additional_arguments: Vec::new(),
322 }
323 .into_message_payload(RtmpTimestamp { value: 0 }, 0)
324 .unwrap();
325
326 let connect_packet = serializer.serialize(&connect_cmd, false, true).unwrap();
327 if let Err(_) = sender.send(connect_packet.bytes) {
328 error!("Can't send connect command to rtmp server.");
329 return Err(RtmpCreateStream.into());
330 }
331
332 let create_stream_cmd = RtmpMessage::Amf0Command {
334 command_name: "createStream".to_string(),
335 transaction_id: 2.0,
336 command_object: Amf0Value::Null,
337 additional_arguments: Vec::new(),
338 }
339 .into_message_payload(RtmpTimestamp { value: 0 }, 1)
340 .unwrap();
341
342 let create_stream_packet = serializer
343 .serialize(&create_stream_cmd, false, true)
344 .unwrap();
345 if let Err(_) = sender.send(create_stream_packet.bytes) {
346 error!("Can't send createStream command to rtmp server.");
347 return Err(RtmpCreateStream.into());
348 }
349
350 let mut arguments = Vec::new();
352 arguments.push(Amf0Value::Utf8String(stream_key));
353 arguments.push(Amf0Value::Utf8String("live".into()));
354 let create_stream_cmd = RtmpMessage::Amf0Command {
355 command_name: "publish".to_string(),
356 transaction_id: 3.0,
357 command_object: Amf0Value::Null,
358 additional_arguments: arguments,
359 }
360 .into_message_payload(RtmpTimestamp { value: 0 }, 1)
361 .unwrap();
362
363 let create_stream_packet = serializer
364 .serialize(&create_stream_cmd, false, true)
365 .unwrap();
366 if let Err(_) = sender.send(create_stream_packet.bytes) {
367 error!("Can't send publish command to rtmp server.");
368 return Err(RtmpCreateStream.into());
369 }
370 Ok(sender)
371 }
372
373 pub fn stop(self) -> EmbedRtmpServer<Ended> {
385 self.status.store(STATUS_END, Ordering::Release);
386 self.into_state()
387 }
388}
389
390fn handle_connections(
391 connection_receiver: crossbeam_channel::Receiver<TcpStream>,
392 publisher_receiver: crossbeam_channel::Receiver<(String, crossbeam_channel::Receiver<Vec<u8>>)>,
393 stream_keys: dashmap::DashSet<String>,
394 gop_limit: usize,
395 status: Arc<AtomicUsize>,
396) {
397 let mut connections = Slab::new();
398 let mut publishers = Slab::new();
399 let mut scheduler = RtmpScheduler::new(gop_limit);
400
401 loop {
402 crossbeam::channel::select! {
403 recv(connection_receiver) -> msg => match msg {
405 Ok(stream) => {
406 let entry = connections.vacant_entry();
407 let connection_id = entry.key();
408 let result = RtmpConnection::new(connection_id, stream);
409 match result {
410 Ok(connection) => {
411 entry.insert(connection);
412 debug!("Rtmp connection {connection_id} started");
413 }
414 Err(e) => debug!("Rtmp connection error: {e:?}"),
415 }
416 }
417 Err(_) => {
418 debug!("Embed rtmp server disconnected.");
419 return;
420 }
421 },
422 recv(publisher_receiver) -> msg => match msg {
424 Ok((stream_key, bytes_receiver)) => {
425 let entry = publishers.vacant_entry();
426 let connection_id = entry.key();
427
428 if scheduler.new_channel(stream_key.clone(), connection_id) {
429 entry.insert((stream_key, bytes_receiver));
430 debug!("Publisher {connection_id} started");
431 }
432 }
433 Err(_) => {
434 error!("Embed rtmp server publisher_sender closed.");
435 return;
436 }
437 },
438 default(Duration::from_millis(5)) => {}
439 }
440
441 if status.load(Ordering::Acquire) == STATUS_END {
442 info!("Embed rtmp server stopped.");
443 break;
444 }
445
446 let mut packets_to_write = Vec::new();
447 let mut publisher_ids_to_clear = Vec::new();
448 let mut ids_to_clear = Vec::new();
449 for (connection_id, (_stream_key, bytes_receiver)) in publishers.iter_mut() {
450 loop {
451 match bytes_receiver.try_recv() {
452 Err(crossbeam_channel::TryRecvError::Disconnected) => {
453 debug!("Rtmp publisher closed for id {connection_id}");
454 publisher_ids_to_clear.push(connection_id);
455
456 let mut arguments = Vec::new();
457 arguments.push(Amf0Value::Number(1.0));
458 let create_stream_cmd = RtmpMessage::Amf0Command {
459 command_name: "deleteStream".to_string(),
460 transaction_id: 4.0,
461 command_object: Amf0Value::Null,
462 additional_arguments: arguments,
463 }
464 .into_message_payload(RtmpTimestamp { value: 0 }, 1)
465 .unwrap();
466
467 let mut serializer = ChunkSerializer::new();
468 let create_stream_packet = serializer
469 .serialize(&create_stream_cmd, false, true)
470 .unwrap();
471
472 let server_results = match scheduler
473 .publish_bytes_received(connection_id, create_stream_packet.bytes)
474 {
475 Ok(results) => results,
476 Err(_) => {
477 break;
478 }
479 };
480
481 for result in server_results.into_iter() {
482 match result {
483 ServerResult::OutboundPacket {
484 target_connection_id,
485 packet,
486 } => {
487 packets_to_write.push((target_connection_id, packet));
488 }
489
490 ServerResult::DisconnectConnection {
491 connection_id: id_to_close,
492 } => {
493 ids_to_clear.push(id_to_close);
494 }
495 }
496 }
497 break;
498 }
499 Err(crossbeam_channel::TryRecvError::Empty) => break,
500 Ok(bytes) => {
501 let server_results =
502 match scheduler.publish_bytes_received(connection_id, bytes) {
503 Ok(results) => results,
504 Err(error) => {
505 debug!("Input caused the following server error: {}", error);
506 publisher_ids_to_clear.push(connection_id);
507 break;
508 }
509 };
510
511 for result in server_results.into_iter() {
512 match result {
513 ServerResult::OutboundPacket {
514 target_connection_id,
515 packet,
516 } => {
517 packets_to_write.push((target_connection_id, packet));
518 }
519
520 ServerResult::DisconnectConnection {
521 connection_id: id_to_close,
522 } => {
523 ids_to_clear.push(id_to_close);
524 }
525 }
526 }
527 }
528 }
529 }
530 }
531
532 for (connection_id, connection) in connections.iter_mut() {
533 loop {
534 match connection.read() {
535 Err(ConnectionError::SocketClosed) => {
536 debug!("Rtmp socket closed for id {connection_id}");
537 ids_to_clear.push(connection_id);
538 break;
539 }
540 Err(error) => {
541 debug!(
542 "I/O error while reading rtmp connection {connection_id}: {:?}",
543 error
544 );
545 ids_to_clear.push(connection_id);
546 break;
547 }
548 Ok(result) => match result {
549 ReadResult::NoBytesReceived => break,
550 ReadResult::HandshakingInProgress => break,
551 ReadResult::BytesReceived { buffer, byte_count } => {
552 let server_results =
553 match scheduler.bytes_received(connection_id, &buffer[..byte_count]) {
554 Ok(results) => results,
555 Err(error) => {
556 debug!("Rtmp input caused the following server error: {error}");
557 ids_to_clear.push(connection_id);
558 break;
559 }
560 };
561
562 for result in server_results.into_iter() {
563 match result {
564 ServerResult::OutboundPacket {
565 target_connection_id,
566 packet,
567 } => {
568 packets_to_write.push((target_connection_id, packet));
569 }
570
571 ServerResult::DisconnectConnection {
572 connection_id: id_to_close,
573 } => {
574 ids_to_clear.push(id_to_close);
575 }
576 }
577 }
578 }
579 },
580 }
581 }
582 }
583
584 for publisher_id in publisher_ids_to_clear {
585 debug!("Rtmp publisher {publisher_id} closed");
586 let (stream_key, _bytes_receiver) = publishers.remove(publisher_id);
587 scheduler.notify_publisher_closed(publisher_id);
588 stream_keys.remove(&stream_key);
589 }
590
591 for (connection_id, packet) in packets_to_write.into_iter() {
592 if let Some(connection) = connections.get_mut(connection_id) {
593 connection.write(packet.bytes);
594 }
595 }
596
597 for closed_id in ids_to_clear {
598 debug!("Rtmp connection {closed_id} closed");
599 let _ = connections.try_remove(closed_id);
600 scheduler.notify_connection_closed(closed_id);
601 }
602 }
603
604 if status.load(Ordering::Acquire) != STATUS_END {
605 error!("Rtmp Server aborted.");
606 }
607}
608
609pub fn flv_tag_to_message_payload(flv_tag: FlvTag) -> MessagePayload {
610 let timestamp = flv_tag.header.timestamp | ((flv_tag.header.timestamp_ext as u32) << 24);
611
612 let type_id = flv_tag.header.tag_type;
613 let message_stream_id = flv_tag.header.stream_id;
614
615 let data = if type_id == 0x12 {
616 wrap_metadata(flv_tag.data)
617 } else {
618 flv_tag.data
619 };
620
621 MessagePayload {
622 timestamp: RtmpTimestamp { value: timestamp },
623 type_id,
624 message_stream_id,
625 data,
626 }
627}
628
629fn wrap_metadata(data: Bytes) -> Bytes {
630 let s = "@setDataFrame";
631
632 let insert_len = 16;
633
634 let mut bytes = bytes::BytesMut::with_capacity(insert_len + data.len());
635
636 bytes.put_u8(0x02);
637 bytes.put_u16(s.len() as u16);
638 bytes.put(s.as_bytes());
639
640 bytes.put(data);
641
642 bytes.freeze()
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use crate::core::context::ffmpeg_context::FfmpegContext;
649 use crate::core::context::input::Input;
650 use crate::core::context::output::Output;
651 use crate::core::scheduler::ffmpeg_scheduler::FfmpegScheduler;
652 use ffmpeg_next::time::current;
653 use std::thread::sleep;
654
655 #[test]
656 fn test_concat_stream_loop() {
657 let _ = env_logger::builder()
658 .filter_level(log::LevelFilter::Trace)
659 .is_test(true)
660 .try_init();
661
662 let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
663 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
664
665 let output = embed_rtmp_server
666 .create_rtmp_input("my-app", "my-stream")
667 .unwrap();
668
669 let start = current();
670
671 let result = FfmpegContext::builder()
672 .input(Input::from("test.mp4")
673 .set_readrate(1.0)
674 .set_stream_loop(3)
675 )
676 .input(
677 Input::from("test.mp4")
678 .set_readrate(1.0)
679 .set_stream_loop(3)
680 )
681 .input(
682 Input::from("test.mp4")
683 .set_readrate(1.0)
684 .set_stream_loop(3)
685 )
686 .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
687 .output(output)
688 .build()
689 .unwrap()
690 .start()
691 .unwrap()
692 .wait();
693
694 assert!(result.is_ok());
695 info!("elapsed time: {}", current() - start);
696 }
697
698 #[test]
699 fn test_stream_loop() {
700 let _ = env_logger::builder()
701 .filter_level(log::LevelFilter::Trace)
702 .is_test(true)
703 .try_init();
704
705 let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
706 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
707
708 let output = embed_rtmp_server
709 .create_rtmp_input("my-app", "my-stream")
710 .unwrap();
711
712 let start = current();
713
714 let result = FfmpegContext::builder()
715 .input(Input::from("test.mp4").set_readrate(1.0).set_stream_loop(-1))
716 .output(output.set_video_codec("h264_videotoolbox"))
718 .build()
719 .unwrap()
720 .start()
721 .unwrap()
722 .wait();
723
724 assert!(result.is_ok());
725
726 info!("elapsed time: {}", current() - start);
727 }
728
729 #[test]
730 fn test_concat_realtime() {
731 let _ = env_logger::builder()
732 .filter_level(log::LevelFilter::Trace)
733 .is_test(true)
734 .try_init();
735
736 let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
737 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
738
739 let output = embed_rtmp_server
740 .create_rtmp_input("my-app", "my-stream")
741 .unwrap();
742
743 let start = current();
744
745 let result = FfmpegContext::builder()
746 .independent_readrate()
747 .input(Input::from("test.mp4").set_readrate(1.0))
748 .input(
749 Input::from("test.mp4")
750 .set_readrate(1.0)
751 )
752 .input(
753 Input::from("test.mp4")
754 .set_readrate(1.0)
755 )
756 .filter_desc("[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1")
757 .output(output)
758 .build()
759 .unwrap()
760 .start()
761 .unwrap()
762 .wait();
763
764 assert!(result.is_ok());
765
766 sleep(Duration::from_secs(1));
767 info!("elapsed time: {}", current() - start);
768 }
769
770 #[test]
771 fn test_realtime() {
772 let _ = env_logger::builder()
773 .filter_level(log::LevelFilter::Trace)
774 .is_test(true)
775 .try_init();
776
777 let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
778 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
779
780 let output = embed_rtmp_server
781 .create_rtmp_input("my-app", "my-stream")
782 .unwrap();
783
784 let start = current();
785
786 let result = FfmpegContext::builder()
787 .input(Input::from("test.mp4").set_readrate(1.0))
788 .output(output)
789 .build()
790 .unwrap()
791 .start()
792 .unwrap()
793 .wait();
794
795 assert!(result.is_ok());
796
797 info!("elapsed time: {}", current() - start);
798 }
799
800 #[test]
801 fn test_readrate() {
802 let _ = env_logger::builder()
803 .filter_level(log::LevelFilter::Trace)
804 .is_test(true)
805 .try_init();
806
807 let mut output: Output = "output.flv".into();
808 output.audio_codec = Some("adpcm_swf".to_string());
809
810 let mut input: Input = "test.mp4".into();
811 input.readrate = Some(1.0);
812
813 let context = FfmpegContext::builder()
814 .input(input)
815 .output(output)
816 .build()
817 .unwrap();
818
819 let result = FfmpegScheduler::new(context).start().unwrap().wait();
820 if let Err(error) = result {
821 println!("Error: {error}");
822 }
823 }
824
825 #[test]
826 fn test_embed_rtmp_server() {
827 let _ = env_logger::builder()
828 .filter_level(log::LevelFilter::Trace)
829 .is_test(true)
830 .try_init();
831
832 let mut embed_rtmp_server = EmbedRtmpServer::new("localhost:1935");
833 let embed_rtmp_server = embed_rtmp_server.start().unwrap();
834
835 let output = embed_rtmp_server
836 .create_rtmp_input("my-app", "my-stream")
837 .unwrap();
838 let mut input: Input = "test.mp4".into();
839 input.readrate = Some(1.0);
840
841 let context = FfmpegContext::builder()
842 .input(input)
843 .output(output)
844 .build()
845 .unwrap();
846
847 let result = FfmpegScheduler::new(context).start().unwrap().wait();
848
849 assert!(result.is_ok());
850
851 sleep(Duration::from_secs(3));
852 }
853}