1pub mod actor_types;
2mod connection_handler;
3
4#[cfg(test)]
5mod tests;
6
7use super::{
8 RtmpEndpointMediaData, RtmpEndpointPublisherMessage, RtmpEndpointRequest, StreamKeyRegistration,
9};
10use crate::endpoints::rtmp_server::actor::connection_handler::ConnectionResponse;
11use crate::endpoints::rtmp_server::actor::internal_futures::wait_for_validation;
12use crate::endpoints::rtmp_server::{
13 IpRestriction, RegistrationType, RtmpEndpointWatcherNotification, ValidationResponse,
14};
15use crate::net::tcp::{TcpSocketRequest, TcpSocketResponse};
16use crate::net::ConnectionId;
17use crate::reactors::ReactorWorkflowUpdate;
18use crate::StreamId;
19use actor_types::*;
20use connection_handler::{ConnectionRequest, RtmpServerConnectionHandler};
21use futures::future::{BoxFuture, FutureExt};
22use futures::StreamExt;
23use rml_rtmp::time::RtmpTimestamp;
24use std::collections::HashMap;
25use std::net::SocketAddr;
26use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
27use tokio::sync::oneshot::channel;
28use tracing::{error, info, instrument, warn};
29use uuid::Uuid;
30
31impl RtmpServerEndpointActor {
32 #[instrument(
33 name = "RtmpServer Endpoint Execution",
34 skip(self, endpoint_receiver, socket_request_sender)
35 )]
36 pub async fn run(
37 mut self,
38 endpoint_receiver: UnboundedReceiver<RtmpEndpointRequest>,
39 socket_request_sender: UnboundedSender<TcpSocketRequest>,
40 ) {
41 info!("Starting RTMP server endpoint");
42
43 self.futures
44 .push(internal_futures::wait_for_endpoint_request(endpoint_receiver).boxed());
45
46 self.futures.push(
47 internal_futures::notify_on_socket_manager_gone(socket_request_sender.clone()).boxed(),
48 );
49
50 while let Some(result) = self.futures.next().await {
51 match result {
52 FutureResult::NoMoreEndpointRequesters => {
53 info!("No endpoint requesters exist");
54 break;
55 }
56
57 FutureResult::SocketManagerClosed => {
58 info!("Socket manager closed");
59 break;
60 }
61
62 FutureResult::EndpointRequestReceived { request, receiver } => {
63 self.futures
64 .push(internal_futures::wait_for_endpoint_request(receiver).boxed());
65
66 self.handle_endpoint_request(request, socket_request_sender.clone());
67 }
68
69 FutureResult::PublishingRegistrantGone {
70 port,
71 app,
72 stream_key,
73 } => {
74 self.remove_publish_registration(port, app, stream_key);
75 }
76
77 FutureResult::WatcherRegistrantGone {
78 port,
79 app,
80 stream_key,
81 } => {
82 self.remove_watcher_registration(port, app, stream_key);
83 }
84
85 FutureResult::SocketResponseReceived {
86 port,
87 response,
88 receiver,
89 } => {
90 self.handle_socket_response(port, response);
91 self.futures
92 .push(internal_futures::wait_for_socket_response(receiver, port).boxed());
93 }
94
95 FutureResult::ConnectionHandlerRequestReceived {
96 port,
97 connection_id,
98 request,
99 receiver,
100 } => {
101 self.futures.push(
102 internal_futures::wait_for_connection_request(
103 port,
104 connection_id.clone(),
105 receiver,
106 )
107 .boxed(),
108 );
109
110 self.handle_connection_handler_request(port, connection_id, request);
111 }
112
113 FutureResult::ConnectionHandlerGone {
114 port,
115 connection_id,
116 } => {
117 let port_map = match self.ports.get_mut(&port) {
118 Some(x) => x,
119 None => continue,
120 };
121
122 clean_disconnected_connection(connection_id, port_map);
123 }
124
125 FutureResult::WatcherMediaDataReceived {
126 port,
127 app,
128 stream_key,
129 stream_key_registration,
130 data,
131 receiver,
132 } => {
133 self.futures.push(
134 internal_futures::wait_for_watcher_media(
135 receiver,
136 port,
137 app.clone(),
138 stream_key_registration,
139 )
140 .boxed(),
141 );
142
143 self.handle_watcher_media_received(port, app, stream_key, data);
144 }
145
146 FutureResult::ValidationApprovalResponseReceived(port, connection_id, response) => {
147 self.handle_validation_response(port, connection_id, response);
148 }
149
150 FutureResult::PortGone { port } => {
151 if let Some(_) = self.ports.remove(&port) {
152 warn!("Port {port}'s response sender suddenly closed");
153 }
154 }
155 }
156 }
157
158 info!("Rtmp server endpoint closing");
159 }
160
161 #[instrument(skip(self))]
162 fn handle_validation_response(
163 &mut self,
164 port: u16,
165 connection_id: ConnectionId,
166 response: ValidationResponse,
167 ) {
168 let port_map = match self.ports.get_mut(&port) {
169 Some(ports) => ports,
170 None => {
171 return;
172 } };
174
175 let connection = match port_map.connections.get_mut(&connection_id) {
176 Some(connection) => connection,
177 None => {
178 return;
179 } };
181
182 match response {
183 ValidationResponse::Approve {
184 reactor_update_channel,
185 } => {
186 match &connection.state {
187 ConnectionState::None => {
188 warn!("Unexpected approval for connection in None state");
189 }
190
191 ConnectionState::Watching { .. } => {
192 warn!("Unexpected approval for connection in the Watching state");
193 }
194
195 ConnectionState::Publishing { .. } => {
196 warn!("Unexpected approval for connection in the publishing state");
197 }
198
199 ConnectionState::WaitingForPublishValidation {
200 rtmp_app,
201 stream_key,
202 } => {
203 info!(
204 rtmp_app = %rtmp_app,
205 stream_key = %stream_key,
206 "Request to publish was approved"
207 );
208
209 let rtmp_app = rtmp_app.clone();
211 let stream_key = stream_key.clone();
212
213 connection.received_registrant_approval = true;
214 let future = handle_connection_request_publish(
215 &connection_id,
216 port_map,
217 port,
218 rtmp_app,
219 &stream_key,
220 Some(reactor_update_channel),
221 );
222
223 if let Some(future) = future {
224 self.futures.push(future);
225 }
226 }
227
228 ConnectionState::WaitingForWatchValidation {
229 rtmp_app,
230 stream_key,
231 } => {
232 info!(
233 rtmp_app = %rtmp_app,
234 stream_key = %stream_key,
235 "Request to watch was approved",
236 );
237
238 let rtmp_app = rtmp_app.clone();
240 let stream_key = stream_key.clone();
241
242 connection.received_registrant_approval = true;
243 let future = handle_connection_request_watch(
244 connection_id,
245 port_map,
246 port,
247 rtmp_app,
248 &stream_key,
249 Some(reactor_update_channel),
250 );
251
252 if let Some(future) = future {
253 self.futures.push(future);
254 }
255 }
256 }
257 }
258
259 ValidationResponse::Reject => {
260 match &connection.state {
261 ConnectionState::None => {
262 warn!("Unexpected approval for connection in None state");
263 }
264
265 ConnectionState::Watching { .. } => {
266 warn!("Unexpected approval for connection in the Watching state");
267 }
268
269 ConnectionState::Publishing { .. } => {
270 warn!("Unexpected approval for connection in the publishing state");
271 }
272
273 ConnectionState::WaitingForPublishValidation {
274 rtmp_app,
275 stream_key,
276 } => {
277 info!(
278 rtmp_app = %rtmp_app,
279 stream_key = %stream_key,
280 "Request to publish was rejected"
281 );
282 }
283
284 ConnectionState::WaitingForWatchValidation {
285 rtmp_app,
286 stream_key,
287 } => {
288 info!(
289 rtmp_app = %rtmp_app,
290 stream_key = %stream_key,
291 "Request to watch was rejected"
292 );
293 }
294 }
295
296 let _ = connection
297 .response_channel
298 .send(ConnectionResponse::RequestRejected);
299 }
300 }
301 }
302
303 fn handle_watcher_media_received(
304 &mut self,
305 port: u16,
306 app: String,
307 stream_key: String,
308 data: RtmpEndpointMediaData,
309 ) {
310 let port_map = match self.ports.get_mut(&port) {
311 Some(x) => x,
312 None => return,
313 };
314
315 let app_map = match port_map.rtmp_applications.get_mut(app.as_str()) {
316 Some(x) => x,
317 None => return,
318 };
319
320 let key_details = app_map
321 .active_stream_keys
322 .entry(stream_key.clone())
323 .or_insert(StreamKeyConnections {
324 watchers: HashMap::new(),
325 publisher: None,
326 latest_video_sequence_header: None,
327 latest_audio_sequence_header: None,
328 });
329
330 match &data {
331 RtmpEndpointMediaData::NewVideoData {
332 data,
333 codec,
334 is_sequence_header,
335 ..
336 } => {
337 if *is_sequence_header {
338 key_details.latest_video_sequence_header = Some(VideoSequenceHeader {
339 codec: codec.clone(),
340 data: data.clone(),
341 });
342 }
343 }
344
345 RtmpEndpointMediaData::NewAudioData {
346 data,
347 codec,
348 is_sequence_header,
349 ..
350 } => {
351 if *is_sequence_header {
352 key_details.latest_audio_sequence_header = Some(AudioSequenceHeader {
353 codec: codec.clone(),
354 data: data.clone(),
355 });
356 }
357 }
358
359 _ => (),
360 };
361
362 for (_, watcher_details) in &key_details.watchers {
363 let _ = watcher_details.media_sender.send(data.clone());
364 }
365 }
366
367 fn handle_endpoint_request(
368 &mut self,
369 request: RtmpEndpointRequest,
370 socket_request_sender: UnboundedSender<TcpSocketRequest>,
371 ) {
372 match request {
373 RtmpEndpointRequest::ListenForPublishers {
374 port,
375 rtmp_app,
376 rtmp_stream_key,
377 message_channel,
378 stream_id,
379 ip_restrictions: ip_restriction,
380 use_tls,
381 requires_registrant_approval,
382 } => {
383 self.register_listener(
384 port,
385 rtmp_app,
386 rtmp_stream_key,
387 socket_request_sender,
388 ListenerRequest::Publisher {
389 channel: message_channel,
390 stream_id,
391 requires_registrant_approval,
392 },
393 ip_restriction,
394 use_tls,
395 );
396 }
397
398 RtmpEndpointRequest::ListenForWatchers {
399 port,
400 rtmp_app,
401 rtmp_stream_key,
402 media_channel,
403 notification_channel,
404 ip_restrictions,
405 use_tls,
406 requires_registrant_approval,
407 } => {
408 self.register_listener(
409 port,
410 rtmp_app,
411 rtmp_stream_key,
412 socket_request_sender,
413 ListenerRequest::Watcher {
414 notification_channel,
415 media_channel,
416 requires_registrant_approval,
417 },
418 ip_restrictions,
419 use_tls,
420 );
421 }
422
423 RtmpEndpointRequest::RemoveRegistration {
424 registration_type,
425 port,
426 rtmp_app,
427 rtmp_stream_key,
428 } => {
429 info!(
430 port = %port,
431 rtmp_app = %rtmp_app,
432 stream_key = ?rtmp_stream_key,
433 registration_type = ?registration_type,
434 "{:?} Registration removal requested for port {}, app {}, and stream key {:?}",
435 registration_type, port, rtmp_app, rtmp_stream_key
436 );
437
438 match registration_type {
439 RegistrationType::Publisher => {
440 self.remove_publish_registration(port, rtmp_app, rtmp_stream_key)
441 }
442 RegistrationType::Watcher => {
443 self.remove_watcher_registration(port, rtmp_app, rtmp_stream_key)
444 }
445 }
446 }
447 }
448 }
449
450 #[instrument(skip(self, socket_sender, listener))]
451 fn register_listener(
452 &mut self,
453 port: u16,
454 rtmp_app: String,
455 stream_key: StreamKeyRegistration,
456 socket_sender: UnboundedSender<TcpSocketRequest>,
457 listener: ListenerRequest,
458 ip_restrictions: IpRestriction,
459 use_tls: bool,
460 ) {
461 let mut new_port_requested = false;
462 let port_map = self.ports.entry(port).or_insert_with(|| {
463 let port_map = PortMapping {
464 rtmp_applications: HashMap::new(),
465 status: PortStatus::Requested,
466 connections: HashMap::new(),
467 tls: use_tls,
468 };
469
470 new_port_requested = true;
471
472 port_map
473 });
474
475 if port_map.tls != use_tls {
476 error!(
477 "Request to open port {} with tls set to {} failed, as the port is already mapped \
478 with tls set to {}",
479 port, use_tls, port_map.tls
480 );
481
482 match listener {
483 ListenerRequest::Publisher { channel, .. } => {
484 let _ = channel.send(RtmpEndpointPublisherMessage::PublisherRegistrationFailed);
485 }
486
487 ListenerRequest::Watcher {
488 notification_channel,
489 ..
490 } => {
491 let _ = notification_channel
492 .send(RtmpEndpointWatcherNotification::WatcherRegistrationFailed);
493 }
494 }
495
496 return;
497 }
498
499 if new_port_requested {
500 let (sender, receiver) = unbounded_channel();
501 let request = TcpSocketRequest::OpenPort {
502 port,
503 response_channel: sender,
504 use_tls,
505 };
506
507 let _ = socket_sender.send(request);
508 self.futures
509 .push(internal_futures::wait_for_socket_response(receiver, port).boxed());
510 }
511
512 let app_map = port_map
513 .rtmp_applications
514 .entry(rtmp_app.clone())
515 .or_insert(RtmpAppMapping {
516 publisher_registrants: HashMap::new(),
517 watcher_registrants: HashMap::new(),
518 active_stream_keys: HashMap::new(),
519 });
520
521 match listener {
522 ListenerRequest::Publisher {
523 channel,
524 stream_id,
525 requires_registrant_approval,
526 } => {
527 let can_be_added = match &stream_key {
528 StreamKeyRegistration::Any => {
529 if !app_map.publisher_registrants.is_empty() {
530 warn!("Rtmp server publish request registration failed for port {}, app '{}', all stream keys': \
531 Another system is registered for at least one stream key on this port and app", port, rtmp_app);
532
533 false
534 } else {
535 true
536 }
537 }
538
539 StreamKeyRegistration::Exact(key) => {
540 if app_map
541 .publisher_registrants
542 .contains_key(&StreamKeyRegistration::Any)
543 {
544 warn!("Rtmp server publish request registration failed for port {}, app '{}', stream key '{}': \
545 Another system is registered for all stream keys on this port/app", port, rtmp_app, key);
546
547 false
548 } else if app_map
549 .publisher_registrants
550 .contains_key(&StreamKeyRegistration::Exact(key.clone()))
551 {
552 warn!("Rtmp server publish request registration failed for port {}, app '{}', stream key '{}': \
553 Another system is registered for this port/app/stream key combo", port, rtmp_app, key);
554
555 false
556 } else {
557 true
558 }
559 }
560 };
561
562 if !can_be_added {
563 let _ =
564 channel.send(RtmpEndpointPublisherMessage::PublisherRegistrationFailed {});
565
566 return;
567 }
568
569 let (cancel_sender, cancel_receiver) = unbounded_channel();
570 app_map.publisher_registrants.insert(
571 stream_key.clone(),
572 PublishingRegistrant {
573 response_channel: channel.clone(),
574 stream_id,
575 ip_restrictions,
576 requires_registrant_approval,
577 cancellation_notifier: cancel_receiver,
578 },
579 );
580
581 self.futures.push(
582 internal_futures::wait_for_publisher_channel_closed(
583 channel.clone(),
584 port,
585 rtmp_app,
586 stream_key,
587 cancel_sender,
588 )
589 .boxed(),
590 );
591
592 if port_map.status == PortStatus::Open {
595 let _ = channel
596 .send(RtmpEndpointPublisherMessage::PublisherRegistrationSuccessful {});
597 }
598 }
599
600 ListenerRequest::Watcher {
601 media_channel,
602 notification_channel,
603 requires_registrant_approval,
604 } => {
605 let can_be_added = match &stream_key {
606 StreamKeyRegistration::Any => {
607 if !app_map.watcher_registrants.is_empty() {
608 warn!("Rtmp server watcher registration failed for port {}, app '{}', all stream keys': \
609 Another system is registered for at least one stream key on this port and app", port, rtmp_app);
610
611 false
612 } else {
613 true
614 }
615 }
616
617 StreamKeyRegistration::Exact(key) => {
618 if app_map
619 .watcher_registrants
620 .contains_key(&StreamKeyRegistration::Any)
621 {
622 warn!("Rtmp server watcher registration failed for port {}, app '{}', stream key '{}': \
623 Another system is registered for all stream keys on this port/app", port, rtmp_app, key);
624
625 false
626 } else if app_map
627 .watcher_registrants
628 .contains_key(&StreamKeyRegistration::Exact(key.clone()))
629 {
630 warn!("Rtmp server watcher registration failed for port {}, app '{}', stream key '{}': \
631 Another system is registered for this port/app/stream key combo", port, rtmp_app, key);
632
633 false
634 } else {
635 true
636 }
637 }
638 };
639
640 if !can_be_added {
641 let _ = notification_channel
642 .send(RtmpEndpointWatcherNotification::WatcherRegistrationFailed);
643
644 return;
645 }
646
647 let (cancel_sender, cancel_receiver) = unbounded_channel();
648 app_map.watcher_registrants.insert(
649 stream_key.clone(),
650 WatcherRegistrant {
651 response_channel: notification_channel.clone(),
652 ip_restrictions,
653 requires_registrant_approval,
654 cancellation_notifier: cancel_receiver,
655 },
656 );
657
658 self.futures.push(
659 internal_futures::wait_for_watcher_notification_channel_closed(
660 notification_channel.clone(),
661 port,
662 rtmp_app.clone(),
663 stream_key.clone(),
664 cancel_sender,
665 )
666 .boxed(),
667 );
668
669 self.futures.push(
670 internal_futures::wait_for_watcher_media(
671 media_channel,
672 port,
673 rtmp_app,
674 stream_key,
675 )
676 .boxed(),
677 );
678
679 if port_map.status == PortStatus::Open {
681 let _ = notification_channel
682 .send(RtmpEndpointWatcherNotification::WatcherRegistrationSuccessful);
683 }
684 }
685 }
686 }
687
688 #[instrument(skip(self))]
689 fn handle_socket_response(&mut self, port: u16, response: TcpSocketResponse) {
690 let mut remove_port = false;
691 {
692 let port_map = match self.ports.get_mut(&port) {
693 Some(x) => x,
694 None => {
695 error!("Received socket response for port {} but that port has not been registered", port);
696
697 return;
698 }
699 };
700
701 match response {
702 TcpSocketResponse::RequestDenied { reason } => {
703 warn!("Port {} could not be opened: {:?}", port, reason);
704
705 for (_, app_map) in &port_map.rtmp_applications {
706 for (_, publisher) in &app_map.publisher_registrants {
707 let _ = publisher
708 .response_channel
709 .send(RtmpEndpointPublisherMessage::PublisherRegistrationFailed {});
710 }
711
712 for (_, watcher) in &app_map.watcher_registrants {
713 let _ = watcher
714 .response_channel
715 .send(RtmpEndpointWatcherNotification::WatcherRegistrationFailed);
716 }
717 }
718
719 remove_port = true;
720 }
721
722 TcpSocketResponse::PortForciblyClosed { port: _ } => {
723 warn!("Port {} closed", port);
724
725 remove_port = true;
726 }
727
728 TcpSocketResponse::RequestAccepted {} => {
729 info!("Port {} successfully opened", port);
730
731 for (_, app_map) in &port_map.rtmp_applications {
734 for (_, publisher) in &app_map.publisher_registrants {
735 let _ = publisher.response_channel.send(
736 RtmpEndpointPublisherMessage::PublisherRegistrationSuccessful {},
737 );
738 }
739
740 for (_, watcher) in &app_map.watcher_registrants {
741 let _ = watcher.response_channel.send(
742 RtmpEndpointWatcherNotification::WatcherRegistrationSuccessful,
743 );
744 }
745 }
746
747 port_map.status = PortStatus::Open;
748 }
749
750 TcpSocketResponse::NewConnection {
751 port: _,
752 connection_id,
753 outgoing_bytes,
754 incoming_bytes,
755 socket_address,
756 } => {
757 let (request_sender, request_receiver) = unbounded_channel();
758 let (response_sender, response_receiver) = unbounded_channel();
759 let handler = RtmpServerConnectionHandler::new(
760 connection_id.clone(),
761 outgoing_bytes,
762 request_sender,
763 );
764 tokio::spawn(handler.run_async(response_receiver, incoming_bytes));
765
766 port_map.connections.insert(
767 connection_id.clone(),
768 Connection {
769 response_channel: response_sender,
770 state: ConnectionState::None,
771 socket_address,
772 received_registrant_approval: false,
773 },
774 );
775
776 self.futures.push(
777 internal_futures::wait_for_connection_request(
778 port,
779 connection_id,
780 request_receiver,
781 )
782 .boxed(),
783 );
784 }
785
786 TcpSocketResponse::Disconnection { connection_id } => {
787 clean_disconnected_connection(connection_id, port_map);
789 }
790 }
791 }
792
793 if remove_port {
794 info!("Port {port} removed");
795 self.ports.remove(&port);
796 }
797 }
798
799 #[instrument(skip(self))]
800 fn handle_connection_handler_request(
801 &mut self,
802 port: u16,
803 connection_id: ConnectionId,
804 request: ConnectionRequest,
805 ) {
806 let port_map = match self.ports.get_mut(&port) {
807 Some(x) => x,
808 None => {
809 error!(
810 "Connection handler for connection {:?} sent {:?} on port {}, but that \
811 port isn't managed yet!",
812 connection_id, request, port
813 );
814
815 return;
816 }
817 };
818
819 match request {
820 ConnectionRequest::RequestConnectToApp { rtmp_app } => {
821 handle_connection_request_connect_to_app(&connection_id, port_map, port, rtmp_app);
822 }
823
824 ConnectionRequest::RequestPublish {
825 rtmp_app,
826 stream_key,
827 } => {
828 let future = handle_connection_request_publish(
829 &connection_id,
830 port_map,
831 port,
832 rtmp_app,
833 &stream_key,
834 None,
835 );
836
837 if let Some(future) = future {
838 self.futures.push(future);
839 }
840 }
841
842 ConnectionRequest::RequestWatch {
843 rtmp_app,
844 stream_key,
845 } => {
846 let future = handle_connection_request_watch(
847 connection_id,
848 port_map,
849 port,
850 rtmp_app,
851 &stream_key,
852 None,
853 );
854
855 if let Some(future) = future {
856 self.futures.push(future);
857 }
858 }
859
860 ConnectionRequest::PublishFinished => {
861 handle_connection_stop_publish(connection_id, port_map);
862 }
863
864 ConnectionRequest::PlaybackFinished => {
865 handle_connection_stop_watch(connection_id, port_map);
866 }
867 }
868 }
869
870 fn remove_publish_registration(
871 &mut self,
872 port: u16,
873 app: String,
874 stream_key: StreamKeyRegistration,
875 ) {
876 let port_map = match self.ports.get_mut(&port) {
877 Some(x) => x,
878 None => return,
879 };
880
881 let app_map = match port_map.rtmp_applications.get_mut(app.as_str()) {
882 Some(x) => x,
883 None => return,
884 };
885
886 if let None = app_map.publisher_registrants.remove(&stream_key) {
887 return;
888 }
889
890 let mut keys_to_remove = Vec::new();
892 if let StreamKeyRegistration::Exact(key) = stream_key {
893 keys_to_remove.push(key);
894 } else {
895 keys_to_remove.extend(app_map.active_stream_keys.keys().map(|x| x.clone()));
896 }
897
898 for key in keys_to_remove {
899 if let Some(connection) = app_map.active_stream_keys.get_mut(&key) {
900 if let Some(id) = &connection.publisher {
901 if let Some(connection) = port_map.connections.get(id) {
902 let _ = connection
903 .response_channel
904 .send(ConnectionResponse::Disconnect);
905 }
906 }
907
908 connection.publisher = None;
909 }
910 }
911
912 if app_map.publisher_registrants.is_empty() && app_map.watcher_registrants.is_empty() {
913 port_map.rtmp_applications.remove(&app);
914 }
915 }
916
917 fn remove_watcher_registration(
918 &mut self,
919 port: u16,
920 app: String,
921 stream_key: StreamKeyRegistration,
922 ) {
923 let port_map = match self.ports.get_mut(&port) {
924 Some(x) => x,
925 None => return,
926 };
927
928 let app_map = match port_map.rtmp_applications.get_mut(app.as_str()) {
929 Some(x) => x,
930 None => return,
931 };
932
933 if let None = app_map.watcher_registrants.remove(&stream_key) {
934 return;
935 }
936
937 let mut keys_to_remove = Vec::new();
939 if let StreamKeyRegistration::Exact(key) = stream_key {
940 keys_to_remove.push(key);
941 } else {
942 keys_to_remove.extend(app_map.active_stream_keys.keys().map(|x| x.clone()));
943 }
944
945 for key in keys_to_remove {
946 if let Some(connection) = app_map.active_stream_keys.get_mut(&key) {
947 for id in connection.watchers.keys() {
948 if let Some(connection) = port_map.connections.get(id) {
949 let _ = connection
950 .response_channel
951 .send(ConnectionResponse::Disconnect);
952 }
953 }
954
955 connection.watchers.clear();
956 }
957 }
958
959 if app_map.watcher_registrants.is_empty() && app_map.publisher_registrants.is_empty() {
960 port_map.rtmp_applications.remove(&app);
961 }
962 }
963}
964
965fn handle_connection_stop_watch(connection_id: ConnectionId, port_map: &mut PortMapping) {
966 let connection = match port_map.connections.get_mut(&connection_id) {
967 Some(connection) => connection,
968 None => {
969 warn!("Connection handler for connection {:?} a sent playback finished notification, but \
970 that connection isn't being tracked", connection_id);
971
972 return;
973 }
974 };
975
976 match &connection.state {
977 ConnectionState::Watching {
978 rtmp_app,
979 stream_key,
980 } => {
981 let rtmp_app = rtmp_app.clone();
982 let stream_key = stream_key.clone();
983 connection.state = ConnectionState::None;
984 match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
985 None => (),
986 Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
987 None => (),
988 Some(active_key) => {
989 active_key.watchers.remove(&connection_id);
990
991 if active_key.watchers.is_empty() {
992 let registrant = match app_map
993 .watcher_registrants
994 .get(&StreamKeyRegistration::Any)
995 {
996 Some(x) => Some(x),
997 None => app_map
998 .watcher_registrants
999 .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1000 };
1001
1002 if let Some(registrant) = registrant {
1003 let _ = registrant.response_channel.send(
1004 RtmpEndpointWatcherNotification::StreamKeyBecameInactive {
1005 stream_key,
1006 },
1007 );
1008 }
1009 }
1010 }
1011 },
1012 }
1013 }
1014
1015 _ => (),
1016 }
1017}
1018
1019fn handle_connection_stop_publish(connection_id: ConnectionId, port_map: &mut PortMapping) {
1020 let connection = match port_map.connections.get_mut(&connection_id) {
1021 Some(connection) => connection,
1022 None => {
1023 warn!(
1024 "Connection handler for connection {:?} a sent publish finished notification, but \
1025 that connection isn't being tracked",
1026 connection_id
1027 );
1028
1029 return;
1030 }
1031 };
1032
1033 match &connection.state {
1034 ConnectionState::Publishing {
1035 rtmp_app,
1036 stream_key,
1037 } => {
1038 let rtmp_app = rtmp_app.clone();
1039 let stream_key = stream_key.clone();
1040 connection.state = ConnectionState::None;
1041
1042 match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1043 None => (),
1044 Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
1045 None => (),
1046 Some(active_key) => {
1047 match &active_key.publisher {
1048 None => (),
1049 Some(publisher_id) => {
1050 if *publisher_id == connection_id {
1051 active_key.publisher = None;
1052 active_key.latest_video_sequence_header = None;
1053 active_key.latest_audio_sequence_header = None;
1054
1055 let registrant = match app_map
1056 .publisher_registrants
1057 .get(&StreamKeyRegistration::Any)
1058 {
1059 Some(x) => Some(x),
1060 None => app_map
1061 .publisher_registrants
1062 .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1063 };
1064
1065 if let Some(registrant) = registrant {
1066 let _ = registrant.response_channel.send(
1067 RtmpEndpointPublisherMessage::PublishingStopped {
1068 connection_id,
1069 },
1070 );
1071 }
1072 }
1073 }
1074 };
1075 }
1076 },
1077 }
1078 }
1079
1080 _ => (),
1081 }
1082}
1083
1084#[instrument(skip(port_map))]
1085fn handle_connection_request_watch(
1086 connection_id: ConnectionId,
1087 port_map: &mut PortMapping,
1088 port: u16,
1089 rtmp_app: String,
1090 stream_key: &String,
1091 reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
1092) -> Option<BoxFuture<'static, FutureResult>> {
1093 let connection = match port_map.connections.get_mut(&connection_id) {
1094 Some(x) => x,
1095 None => {
1096 warn!("Connection handler for connection {:?} sent request to watch on port {}, but that \
1097 connection isn't being tracked.", connection_id, port);
1098
1099 return None;
1100 }
1101 };
1102
1103 let application = match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1105 Some(x) => x,
1106 None => {
1107 info!(
1108 "Connection {} requested watching '{}/{}' but that app is not registered \
1109 to accept watchers",
1110 connection_id, rtmp_app, stream_key
1111 );
1112
1113 let _ = connection
1114 .response_channel
1115 .send(ConnectionResponse::RequestRejected);
1116
1117 return None;
1118 }
1119 };
1120
1121 let registrant = match application
1123 .watcher_registrants
1124 .get(&StreamKeyRegistration::Any)
1125 {
1126 Some(x) => x,
1127 None => {
1128 match application
1129 .watcher_registrants
1130 .get(&StreamKeyRegistration::Exact(stream_key.clone()))
1131 {
1132 Some(x) => x,
1133 None => {
1134 info!(
1135 "Connection {} requested watching '{}/{}' but that stream key is \
1136 not registered to accept watchers",
1137 connection_id, rtmp_app, stream_key
1138 );
1139
1140 let _ = connection
1141 .response_channel
1142 .send(ConnectionResponse::RequestRejected);
1143
1144 return None;
1145 }
1146 }
1147 }
1148 };
1149
1150 if !is_ip_allowed(&connection.socket_address, ®istrant.ip_restrictions) {
1151 error!(
1152 "Connection {} requested watching to '{}/{}', but the client's ip address of '{}' \
1153 is not allowed",
1154 connection_id,
1155 rtmp_app,
1156 stream_key,
1157 connection.socket_address.ip()
1158 );
1159
1160 let _ = connection
1161 .response_channel
1162 .send(ConnectionResponse::RequestRejected);
1163
1164 return None;
1165 }
1166
1167 if registrant.requires_registrant_approval && !connection.received_registrant_approval {
1168 info!(
1169 "Connection {} requested watching to '{}/{}' but requires approval from the \
1170 registrant first",
1171 connection_id, rtmp_app, stream_key
1172 );
1173
1174 connection.state = ConnectionState::WaitingForWatchValidation {
1175 rtmp_app,
1176 stream_key: stream_key.clone(),
1177 };
1178
1179 let (sender, receiver) = channel();
1180 let _ = registrant.response_channel.send(
1181 RtmpEndpointWatcherNotification::WatcherRequiringApproval {
1182 stream_key: stream_key.clone(),
1183 connection_id: connection_id.clone(),
1184 response_channel: sender,
1185 },
1186 );
1187
1188 let future = wait_for_validation(port, connection_id.clone(), receiver).boxed();
1189
1190 return Some(future);
1191 }
1192
1193 let active_stream_key = application
1194 .active_stream_keys
1195 .entry(stream_key.clone())
1196 .or_insert(StreamKeyConnections {
1197 watchers: HashMap::new(),
1198 publisher: None,
1199 latest_video_sequence_header: None,
1200 latest_audio_sequence_header: None,
1201 });
1202
1203 connection.state = ConnectionState::Watching {
1204 rtmp_app,
1205 stream_key: stream_key.clone(),
1206 };
1207
1208 if active_stream_key.watchers.is_empty() {
1209 let _ = registrant.response_channel.send(
1210 RtmpEndpointWatcherNotification::StreamKeyBecameActive {
1211 stream_key: stream_key.clone(),
1212 reactor_update_channel,
1213 },
1214 );
1215 }
1216
1217 let (media_sender, media_receiver) = unbounded_channel();
1218
1219 if let Some(sequence_header) = &active_stream_key.latest_video_sequence_header {
1222 let _ = media_sender.send(RtmpEndpointMediaData::NewVideoData {
1223 codec: sequence_header.codec.clone(),
1224 is_sequence_header: true,
1225 is_keyframe: true,
1226 data: sequence_header.data.clone(),
1227 timestamp: RtmpTimestamp::new(0),
1228 composition_time_offset: 0,
1229 });
1230 }
1231
1232 if let Some(sequence_header) = &active_stream_key.latest_audio_sequence_header {
1233 let _ = media_sender.send(RtmpEndpointMediaData::NewAudioData {
1234 codec: sequence_header.codec.clone(),
1235 data: sequence_header.data.clone(),
1236 is_sequence_header: true,
1237 timestamp: RtmpTimestamp::new(0),
1238 });
1239 }
1240
1241 active_stream_key
1242 .watchers
1243 .insert(connection_id, WatcherDetails { media_sender });
1244
1245 let _ = connection
1246 .response_channel
1247 .send(ConnectionResponse::WatchRequestAccepted {
1248 channel: media_receiver,
1249 });
1250
1251 return None;
1252}
1253
1254#[instrument(skip(port_map))]
1255fn handle_connection_request_publish(
1256 connection_id: &ConnectionId,
1257 port_map: &mut PortMapping,
1258 port: u16,
1259 rtmp_app: String,
1260 stream_key: &String,
1261 reactor_response_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
1262) -> Option<BoxFuture<'static, FutureResult>> {
1263 let connection = match port_map.connections.get_mut(&connection_id) {
1264 Some(x) => x,
1265 None => {
1266 warn!("Connection handler for connection {:?} sent a request to publish on port {}, but that \
1267 connection isn't being tracked.", connection_id, port);
1268
1269 return None;
1270 }
1271 };
1272
1273 let application = match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1275 Some(x) => x,
1276 None => {
1277 info!("Connection {} requested publishing to '{}/{}', but the RTMP app '{}' isn't registered yet",
1278 connection_id, rtmp_app, stream_key, rtmp_app);
1279
1280 let _ = connection
1281 .response_channel
1282 .send(ConnectionResponse::RequestRejected);
1283
1284 return None;
1285 }
1286 };
1287
1288 let registrant = match application
1290 .publisher_registrants
1291 .get(&StreamKeyRegistration::Any)
1292 {
1293 Some(x) => x,
1294 None => {
1295 match application
1296 .publisher_registrants
1297 .get(&StreamKeyRegistration::Exact(stream_key.clone()))
1298 {
1299 Some(x) => x,
1300 None => {
1301 error!(
1302 "Connection {} requested publishing to '{}/{}', but no one has registered \
1303 to support publishers on that stream key",
1304 connection_id, rtmp_app, stream_key
1305 );
1306
1307 let _ = connection
1308 .response_channel
1309 .send(ConnectionResponse::RequestRejected);
1310
1311 return None;
1312 }
1313 }
1314 }
1315 };
1316
1317 let stream_key_connections = application
1319 .active_stream_keys
1320 .entry(stream_key.clone())
1321 .or_insert(StreamKeyConnections {
1322 publisher: None,
1323 watchers: HashMap::new(),
1324 latest_video_sequence_header: None,
1325 latest_audio_sequence_header: None,
1326 });
1327
1328 if let Some(id) = &stream_key_connections.publisher {
1330 error!(
1331 "Connection {} requested publishing to '{}/{}', but connection {} is already \
1332 publishing to this stream key",
1333 connection_id, rtmp_app, stream_key, id
1334 );
1335
1336 let _ = connection
1337 .response_channel
1338 .send(ConnectionResponse::RequestRejected);
1339
1340 return None;
1341 }
1342
1343 if !is_ip_allowed(&connection.socket_address, ®istrant.ip_restrictions) {
1344 error!(
1345 "Connection {} requested publishing to '{}/{}', but the client's ip address of '{}' \
1346 is not allowed",
1347 connection_id,
1348 rtmp_app,
1349 stream_key,
1350 connection.socket_address.ip()
1351 );
1352
1353 let _ = connection
1354 .response_channel
1355 .send(ConnectionResponse::RequestRejected);
1356
1357 return None;
1358 }
1359
1360 if registrant.requires_registrant_approval && !connection.received_registrant_approval {
1361 info!(
1362 "Connection {} requested publishing to '{}/{}' but requires approval from the \
1363 registrant first",
1364 connection_id, rtmp_app, stream_key
1365 );
1366
1367 connection.state = ConnectionState::WaitingForPublishValidation {
1368 rtmp_app,
1369 stream_key: stream_key.clone(),
1370 };
1371
1372 let (sender, receiver) = channel();
1373 let _ = registrant.response_channel.send(
1374 RtmpEndpointPublisherMessage::PublisherRequiringApproval {
1375 stream_key: stream_key.clone(),
1376 connection_id: connection_id.clone(),
1377 response_channel: sender,
1378 },
1379 );
1380
1381 let future = wait_for_validation(port, connection_id.clone(), receiver).boxed();
1382
1383 return Some(future);
1384 }
1385
1386 stream_key_connections.publisher = Some(connection_id.clone());
1388 connection.state = ConnectionState::Publishing {
1389 rtmp_app: rtmp_app.clone(),
1390 stream_key: stream_key.clone(),
1391 };
1392
1393 let stream_id = if let Some(id) = ®istrant.stream_id {
1394 (*id).clone()
1395 } else {
1396 StreamId(Uuid::new_v4().to_string())
1397 };
1398
1399 let _ = connection
1400 .response_channel
1401 .send(ConnectionResponse::PublishRequestAccepted {
1402 channel: registrant.response_channel.clone(),
1403 });
1404
1405 let _ = registrant
1406 .response_channel
1407 .send(RtmpEndpointPublisherMessage::NewPublisherConnected {
1408 connection_id: connection_id.clone(),
1409 stream_key: stream_key.clone(),
1410 stream_id,
1411 reactor_update_channel: reactor_response_channel,
1412 });
1413
1414 return None;
1415}
1416
1417#[instrument(skip(port_map))]
1418fn handle_connection_request_connect_to_app(
1419 connection_id: &ConnectionId,
1420 port_map: &mut PortMapping,
1421 port: u16,
1422 rtmp_app: String,
1423) {
1424 let connection = match port_map.connections.get_mut(&connection_id) {
1425 Some(x) => x,
1426 None => {
1427 warn!("Connection handler for connection {} sent a request to connect to an rtmp app on port {}, \
1428 but that connection isn't being tracked.", connection_id, port);
1429
1430 return;
1431 }
1432 };
1433 let response = if !port_map.rtmp_applications.contains_key(rtmp_app.as_str()) {
1434 info!(
1435 "Connection {} requested connection to RTMP app '{}' which isn't registered yet",
1436 connection_id, rtmp_app
1437 );
1438
1439 ConnectionResponse::RequestRejected
1440 } else {
1441 info!(
1442 "Connection {} accepted connection for RTMP app '{}'",
1443 connection_id, rtmp_app
1444 );
1445
1446 ConnectionResponse::AppConnectRequestAccepted
1447 };
1448
1449 let _ = connection.response_channel.send(response);
1450}
1451
1452#[instrument(skip(port_map))]
1453fn clean_disconnected_connection(connection_id: ConnectionId, port_map: &mut PortMapping) {
1454 let connection = match port_map.connections.remove(&connection_id) {
1455 Some(x) => x,
1456 None => return,
1457 };
1458
1459 info!("Connection {} disconnected. Cleaning it up", connection_id);
1460 match connection.state {
1461 ConnectionState::None => (),
1462 ConnectionState::WaitingForPublishValidation { .. } => (),
1463 ConnectionState::WaitingForWatchValidation { .. } => (),
1464 ConnectionState::Publishing {
1465 rtmp_app,
1466 stream_key,
1467 } => match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1468 None => (),
1469 Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
1470 None => (),
1471 Some(active_key) => {
1472 match &active_key.publisher {
1473 None => (),
1474 Some(publisher_id) => {
1475 if *publisher_id == connection_id {
1476 active_key.publisher = None;
1477 active_key.latest_video_sequence_header = None;
1478 active_key.latest_audio_sequence_header = None;
1479
1480 let registrant = match app_map
1481 .publisher_registrants
1482 .get(&StreamKeyRegistration::Any)
1483 {
1484 Some(x) => Some(x),
1485 None => app_map
1486 .publisher_registrants
1487 .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1488 };
1489
1490 if let Some(registrant) = registrant {
1491 let _ = registrant.response_channel.send(
1492 RtmpEndpointPublisherMessage::PublishingStopped {
1493 connection_id,
1494 },
1495 );
1496 }
1497 }
1498 }
1499 };
1500 }
1501 },
1502 },
1503
1504 ConnectionState::Watching {
1505 rtmp_app,
1506 stream_key,
1507 } => match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1508 None => (),
1509 Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
1510 None => (),
1511 Some(active_key) => {
1512 active_key.watchers.remove(&connection_id);
1513
1514 if active_key.watchers.is_empty() {
1515 let registrant =
1516 match app_map.watcher_registrants.get(&StreamKeyRegistration::Any) {
1517 Some(x) => Some(x),
1518 None => app_map
1519 .watcher_registrants
1520 .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1521 };
1522
1523 if let Some(registrant) = registrant {
1524 let _ = registrant.response_channel.send(
1525 RtmpEndpointWatcherNotification::StreamKeyBecameInactive {
1526 stream_key,
1527 },
1528 );
1529 }
1530 }
1531 }
1532 },
1533 },
1534 };
1535}
1536
1537mod internal_futures {
1538 use super::{
1539 FutureResult, RtmpEndpointPublisherMessage, RtmpEndpointRequest, StreamKeyRegistration,
1540 };
1541 use crate::endpoints::rtmp_server::actor::connection_handler::ConnectionRequest;
1542 use crate::endpoints::rtmp_server::{
1543 RtmpEndpointMediaMessage, RtmpEndpointWatcherNotification, ValidationResponse,
1544 };
1545 use crate::net::tcp::{TcpSocketRequest, TcpSocketResponse};
1546 use crate::net::ConnectionId;
1547 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
1548 use tokio::sync::oneshot::Receiver;
1549
1550 pub(super) async fn wait_for_endpoint_request(
1551 mut endpoint_receiver: UnboundedReceiver<RtmpEndpointRequest>,
1552 ) -> FutureResult {
1553 match endpoint_receiver.recv().await {
1554 None => FutureResult::NoMoreEndpointRequesters,
1555 Some(request) => FutureResult::EndpointRequestReceived {
1556 request,
1557 receiver: endpoint_receiver,
1558 },
1559 }
1560 }
1561
1562 pub(super) async fn wait_for_socket_response(
1563 mut socket_receiver: UnboundedReceiver<TcpSocketResponse>,
1564 port: u16,
1565 ) -> FutureResult {
1566 match socket_receiver.recv().await {
1567 None => FutureResult::PortGone { port },
1568 Some(response) => FutureResult::SocketResponseReceived {
1569 port,
1570 response,
1571 receiver: socket_receiver,
1572 },
1573 }
1574 }
1575
1576 pub(super) async fn wait_for_publisher_channel_closed(
1577 sender: UnboundedSender<RtmpEndpointPublisherMessage>,
1578 port: u16,
1579 app_name: String,
1580 stream_key: StreamKeyRegistration,
1581 cancellation_receiver: UnboundedSender<()>,
1582 ) -> FutureResult {
1583 tokio::select! {
1584 _ = sender.closed() => (),
1585 _ = cancellation_receiver.closed() => (),
1586 }
1587
1588 FutureResult::PublishingRegistrantGone {
1589 port,
1590 app: app_name,
1591 stream_key,
1592 }
1593 }
1594
1595 pub(super) async fn wait_for_connection_request(
1596 port: u16,
1597 connection_id: ConnectionId,
1598 mut receiver: UnboundedReceiver<ConnectionRequest>,
1599 ) -> FutureResult {
1600 match receiver.recv().await {
1601 Some(request) => FutureResult::ConnectionHandlerRequestReceived {
1602 port,
1603 receiver,
1604 connection_id,
1605 request,
1606 },
1607
1608 None => FutureResult::ConnectionHandlerGone {
1609 port,
1610 connection_id,
1611 },
1612 }
1613 }
1614
1615 pub(super) async fn wait_for_watcher_notification_channel_closed(
1616 sender: UnboundedSender<RtmpEndpointWatcherNotification>,
1617 port: u16,
1618 app_name: String,
1619 stream_key: StreamKeyRegistration,
1620 cancellation_token: UnboundedSender<()>,
1621 ) -> FutureResult {
1622 tokio::select! {
1623 _ = sender.closed() => (),
1624 _ = cancellation_token.closed() => (),
1625 }
1626
1627 FutureResult::WatcherRegistrantGone {
1628 port,
1629 app: app_name,
1630 stream_key,
1631 }
1632 }
1633
1634 pub(super) async fn wait_for_watcher_media(
1635 mut receiver: UnboundedReceiver<RtmpEndpointMediaMessage>,
1636 port: u16,
1637 app_name: String,
1638 stream_key_registration: StreamKeyRegistration,
1639 ) -> FutureResult {
1640 match receiver.recv().await {
1641 None => FutureResult::WatcherRegistrantGone {
1642 port,
1643 app: app_name,
1644 stream_key: stream_key_registration,
1645 },
1646 Some(message) => FutureResult::WatcherMediaDataReceived {
1647 port,
1648 app: app_name,
1649 stream_key: message.stream_key,
1650 stream_key_registration,
1651 data: message.data,
1652 receiver,
1653 },
1654 }
1655 }
1656
1657 pub(super) async fn wait_for_validation(
1658 port: u16,
1659 connection_id: ConnectionId,
1660 receiver: Receiver<ValidationResponse>,
1661 ) -> FutureResult {
1662 match receiver.await {
1663 Ok(response) => {
1664 FutureResult::ValidationApprovalResponseReceived(port, connection_id, response)
1665 }
1666 Err(_) => FutureResult::ValidationApprovalResponseReceived(
1667 port,
1668 connection_id,
1669 ValidationResponse::Reject,
1670 ),
1671 }
1672 }
1673
1674 pub(super) async fn notify_on_socket_manager_gone(
1675 sender: UnboundedSender<TcpSocketRequest>,
1676 ) -> FutureResult {
1677 sender.closed().await;
1678
1679 FutureResult::SocketManagerClosed
1680 }
1681}
1682
1683fn is_ip_allowed(client_socket: &SocketAddr, ip_restrictions: &IpRestriction) -> bool {
1684 match ip_restrictions {
1685 IpRestriction::None => return true,
1686 IpRestriction::Allow(allowed_ips) => {
1687 if let SocketAddr::V4(client_ip) = client_socket {
1688 return allowed_ips.into_iter().any(|ip| ip.matches(client_ip.ip()));
1689 }
1690
1691 return false; }
1693
1694 IpRestriction::Deny(denied_ips) => {
1695 if let SocketAddr::V4(client_ip) = client_socket {
1696 return denied_ips.into_iter().all(|ip| !ip.matches(client_ip.ip()));
1697 }
1698
1699 return false; }
1701 };
1702}