mmids_core/endpoints/rtmp_server/actor/
mod.rs

1pub mod actor_types;
2mod connection_handler;
3
4#[cfg(test)]
5mod tests;
6
7use super::{
8    RtmpEndpointMediaData, RtmpEndpointPublisherMessage, RtmpEndpointRequest, StreamKeyRegistration,
9};
10use crate::endpoints::rtmp_server::actor::connection_handler::ConnectionResponse;
11use crate::endpoints::rtmp_server::actor::internal_futures::wait_for_validation;
12use crate::endpoints::rtmp_server::{
13    IpRestriction, RegistrationType, RtmpEndpointWatcherNotification, ValidationResponse,
14};
15use crate::net::tcp::{TcpSocketRequest, TcpSocketResponse};
16use crate::net::ConnectionId;
17use crate::reactors::ReactorWorkflowUpdate;
18use crate::StreamId;
19use actor_types::*;
20use connection_handler::{ConnectionRequest, RtmpServerConnectionHandler};
21use futures::future::{BoxFuture, FutureExt};
22use futures::StreamExt;
23use rml_rtmp::time::RtmpTimestamp;
24use std::collections::HashMap;
25use std::net::SocketAddr;
26use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
27use tokio::sync::oneshot::channel;
28use tracing::{error, info, instrument, warn};
29use uuid::Uuid;
30
31impl RtmpServerEndpointActor {
32    #[instrument(
33        name = "RtmpServer Endpoint Execution",
34        skip(self, endpoint_receiver, socket_request_sender)
35    )]
36    pub async fn run(
37        mut self,
38        endpoint_receiver: UnboundedReceiver<RtmpEndpointRequest>,
39        socket_request_sender: UnboundedSender<TcpSocketRequest>,
40    ) {
41        info!("Starting RTMP server endpoint");
42
43        self.futures
44            .push(internal_futures::wait_for_endpoint_request(endpoint_receiver).boxed());
45
46        self.futures.push(
47            internal_futures::notify_on_socket_manager_gone(socket_request_sender.clone()).boxed(),
48        );
49
50        while let Some(result) = self.futures.next().await {
51            match result {
52                FutureResult::NoMoreEndpointRequesters => {
53                    info!("No endpoint requesters exist");
54                    break;
55                }
56
57                FutureResult::SocketManagerClosed => {
58                    info!("Socket manager closed");
59                    break;
60                }
61
62                FutureResult::EndpointRequestReceived { request, receiver } => {
63                    self.futures
64                        .push(internal_futures::wait_for_endpoint_request(receiver).boxed());
65
66                    self.handle_endpoint_request(request, socket_request_sender.clone());
67                }
68
69                FutureResult::PublishingRegistrantGone {
70                    port,
71                    app,
72                    stream_key,
73                } => {
74                    self.remove_publish_registration(port, app, stream_key);
75                }
76
77                FutureResult::WatcherRegistrantGone {
78                    port,
79                    app,
80                    stream_key,
81                } => {
82                    self.remove_watcher_registration(port, app, stream_key);
83                }
84
85                FutureResult::SocketResponseReceived {
86                    port,
87                    response,
88                    receiver,
89                } => {
90                    self.handle_socket_response(port, response);
91                    self.futures
92                        .push(internal_futures::wait_for_socket_response(receiver, port).boxed());
93                }
94
95                FutureResult::ConnectionHandlerRequestReceived {
96                    port,
97                    connection_id,
98                    request,
99                    receiver,
100                } => {
101                    self.futures.push(
102                        internal_futures::wait_for_connection_request(
103                            port,
104                            connection_id.clone(),
105                            receiver,
106                        )
107                        .boxed(),
108                    );
109
110                    self.handle_connection_handler_request(port, connection_id, request);
111                }
112
113                FutureResult::ConnectionHandlerGone {
114                    port,
115                    connection_id,
116                } => {
117                    let port_map = match self.ports.get_mut(&port) {
118                        Some(x) => x,
119                        None => continue,
120                    };
121
122                    clean_disconnected_connection(connection_id, port_map);
123                }
124
125                FutureResult::WatcherMediaDataReceived {
126                    port,
127                    app,
128                    stream_key,
129                    stream_key_registration,
130                    data,
131                    receiver,
132                } => {
133                    self.futures.push(
134                        internal_futures::wait_for_watcher_media(
135                            receiver,
136                            port,
137                            app.clone(),
138                            stream_key_registration,
139                        )
140                        .boxed(),
141                    );
142
143                    self.handle_watcher_media_received(port, app, stream_key, data);
144                }
145
146                FutureResult::ValidationApprovalResponseReceived(port, connection_id, response) => {
147                    self.handle_validation_response(port, connection_id, response);
148                }
149
150                FutureResult::PortGone { port } => {
151                    if let Some(_) = self.ports.remove(&port) {
152                        warn!("Port {port}'s response sender suddenly closed");
153                    }
154                }
155            }
156        }
157
158        info!("Rtmp server endpoint closing");
159    }
160
161    #[instrument(skip(self))]
162    fn handle_validation_response(
163        &mut self,
164        port: u16,
165        connection_id: ConnectionId,
166        response: ValidationResponse,
167    ) {
168        let port_map = match self.ports.get_mut(&port) {
169            Some(ports) => ports,
170            None => {
171                return;
172            } // Port has been closed prior to this response
173        };
174
175        let connection = match port_map.connections.get_mut(&connection_id) {
176            Some(connection) => connection,
177            None => {
178                return;
179            } // Disconnected before this response came in
180        };
181
182        match response {
183            ValidationResponse::Approve {
184                reactor_update_channel,
185            } => {
186                match &connection.state {
187                    ConnectionState::None => {
188                        warn!("Unexpected approval for connection in None state");
189                    }
190
191                    ConnectionState::Watching { .. } => {
192                        warn!("Unexpected approval for connection in the Watching state");
193                    }
194
195                    ConnectionState::Publishing { .. } => {
196                        warn!("Unexpected approval for connection in the publishing state");
197                    }
198
199                    ConnectionState::WaitingForPublishValidation {
200                        rtmp_app,
201                        stream_key,
202                    } => {
203                        info!(
204                            rtmp_app = %rtmp_app,
205                            stream_key = %stream_key,
206                            "Request to publish was approved"
207                        );
208
209                        // Redefine as clones due to borrow checker
210                        let rtmp_app = rtmp_app.clone();
211                        let stream_key = stream_key.clone();
212
213                        connection.received_registrant_approval = true;
214                        let future = handle_connection_request_publish(
215                            &connection_id,
216                            port_map,
217                            port,
218                            rtmp_app,
219                            &stream_key,
220                            Some(reactor_update_channel),
221                        );
222
223                        if let Some(future) = future {
224                            self.futures.push(future);
225                        }
226                    }
227
228                    ConnectionState::WaitingForWatchValidation {
229                        rtmp_app,
230                        stream_key,
231                    } => {
232                        info!(
233                            rtmp_app = %rtmp_app,
234                            stream_key = %stream_key,
235                            "Request to watch was approved",
236                        );
237
238                        // Redefine with clones due to borrow checker
239                        let rtmp_app = rtmp_app.clone();
240                        let stream_key = stream_key.clone();
241
242                        connection.received_registrant_approval = true;
243                        let future = handle_connection_request_watch(
244                            connection_id,
245                            port_map,
246                            port,
247                            rtmp_app,
248                            &stream_key,
249                            Some(reactor_update_channel),
250                        );
251
252                        if let Some(future) = future {
253                            self.futures.push(future);
254                        }
255                    }
256                }
257            }
258
259            ValidationResponse::Reject => {
260                match &connection.state {
261                    ConnectionState::None => {
262                        warn!("Unexpected approval for connection in None state");
263                    }
264
265                    ConnectionState::Watching { .. } => {
266                        warn!("Unexpected approval for connection in the Watching state");
267                    }
268
269                    ConnectionState::Publishing { .. } => {
270                        warn!("Unexpected approval for connection in the publishing state");
271                    }
272
273                    ConnectionState::WaitingForPublishValidation {
274                        rtmp_app,
275                        stream_key,
276                    } => {
277                        info!(
278                            rtmp_app = %rtmp_app,
279                            stream_key = %stream_key,
280                            "Request to publish was rejected"
281                        );
282                    }
283
284                    ConnectionState::WaitingForWatchValidation {
285                        rtmp_app,
286                        stream_key,
287                    } => {
288                        info!(
289                            rtmp_app = %rtmp_app,
290                            stream_key = %stream_key,
291                            "Request to watch was rejected"
292                        );
293                    }
294                }
295
296                let _ = connection
297                    .response_channel
298                    .send(ConnectionResponse::RequestRejected);
299            }
300        }
301    }
302
303    fn handle_watcher_media_received(
304        &mut self,
305        port: u16,
306        app: String,
307        stream_key: String,
308        data: RtmpEndpointMediaData,
309    ) {
310        let port_map = match self.ports.get_mut(&port) {
311            Some(x) => x,
312            None => return,
313        };
314
315        let app_map = match port_map.rtmp_applications.get_mut(app.as_str()) {
316            Some(x) => x,
317            None => return,
318        };
319
320        let key_details = app_map
321            .active_stream_keys
322            .entry(stream_key.clone())
323            .or_insert(StreamKeyConnections {
324                watchers: HashMap::new(),
325                publisher: None,
326                latest_video_sequence_header: None,
327                latest_audio_sequence_header: None,
328            });
329
330        match &data {
331            RtmpEndpointMediaData::NewVideoData {
332                data,
333                codec,
334                is_sequence_header,
335                ..
336            } => {
337                if *is_sequence_header {
338                    key_details.latest_video_sequence_header = Some(VideoSequenceHeader {
339                        codec: codec.clone(),
340                        data: data.clone(),
341                    });
342                }
343            }
344
345            RtmpEndpointMediaData::NewAudioData {
346                data,
347                codec,
348                is_sequence_header,
349                ..
350            } => {
351                if *is_sequence_header {
352                    key_details.latest_audio_sequence_header = Some(AudioSequenceHeader {
353                        codec: codec.clone(),
354                        data: data.clone(),
355                    });
356                }
357            }
358
359            _ => (),
360        };
361
362        for (_, watcher_details) in &key_details.watchers {
363            let _ = watcher_details.media_sender.send(data.clone());
364        }
365    }
366
367    fn handle_endpoint_request(
368        &mut self,
369        request: RtmpEndpointRequest,
370        socket_request_sender: UnboundedSender<TcpSocketRequest>,
371    ) {
372        match request {
373            RtmpEndpointRequest::ListenForPublishers {
374                port,
375                rtmp_app,
376                rtmp_stream_key,
377                message_channel,
378                stream_id,
379                ip_restrictions: ip_restriction,
380                use_tls,
381                requires_registrant_approval,
382            } => {
383                self.register_listener(
384                    port,
385                    rtmp_app,
386                    rtmp_stream_key,
387                    socket_request_sender,
388                    ListenerRequest::Publisher {
389                        channel: message_channel,
390                        stream_id,
391                        requires_registrant_approval,
392                    },
393                    ip_restriction,
394                    use_tls,
395                );
396            }
397
398            RtmpEndpointRequest::ListenForWatchers {
399                port,
400                rtmp_app,
401                rtmp_stream_key,
402                media_channel,
403                notification_channel,
404                ip_restrictions,
405                use_tls,
406                requires_registrant_approval,
407            } => {
408                self.register_listener(
409                    port,
410                    rtmp_app,
411                    rtmp_stream_key,
412                    socket_request_sender,
413                    ListenerRequest::Watcher {
414                        notification_channel,
415                        media_channel,
416                        requires_registrant_approval,
417                    },
418                    ip_restrictions,
419                    use_tls,
420                );
421            }
422
423            RtmpEndpointRequest::RemoveRegistration {
424                registration_type,
425                port,
426                rtmp_app,
427                rtmp_stream_key,
428            } => {
429                info!(
430                    port = %port,
431                    rtmp_app = %rtmp_app,
432                    stream_key = ?rtmp_stream_key,
433                    registration_type = ?registration_type,
434                    "{:?} Registration removal requested for port {}, app {}, and stream key {:?}",
435                    registration_type, port, rtmp_app, rtmp_stream_key
436                );
437
438                match registration_type {
439                    RegistrationType::Publisher => {
440                        self.remove_publish_registration(port, rtmp_app, rtmp_stream_key)
441                    }
442                    RegistrationType::Watcher => {
443                        self.remove_watcher_registration(port, rtmp_app, rtmp_stream_key)
444                    }
445                }
446            }
447        }
448    }
449
450    #[instrument(skip(self, socket_sender, listener))]
451    fn register_listener(
452        &mut self,
453        port: u16,
454        rtmp_app: String,
455        stream_key: StreamKeyRegistration,
456        socket_sender: UnboundedSender<TcpSocketRequest>,
457        listener: ListenerRequest,
458        ip_restrictions: IpRestriction,
459        use_tls: bool,
460    ) {
461        let mut new_port_requested = false;
462        let port_map = self.ports.entry(port).or_insert_with(|| {
463            let port_map = PortMapping {
464                rtmp_applications: HashMap::new(),
465                status: PortStatus::Requested,
466                connections: HashMap::new(),
467                tls: use_tls,
468            };
469
470            new_port_requested = true;
471
472            port_map
473        });
474
475        if port_map.tls != use_tls {
476            error!(
477                "Request to open port {} with tls set to {} failed, as the port is already mapped \
478            with tls set to {}",
479                port, use_tls, port_map.tls
480            );
481
482            match listener {
483                ListenerRequest::Publisher { channel, .. } => {
484                    let _ = channel.send(RtmpEndpointPublisherMessage::PublisherRegistrationFailed);
485                }
486
487                ListenerRequest::Watcher {
488                    notification_channel,
489                    ..
490                } => {
491                    let _ = notification_channel
492                        .send(RtmpEndpointWatcherNotification::WatcherRegistrationFailed);
493                }
494            }
495
496            return;
497        }
498
499        if new_port_requested {
500            let (sender, receiver) = unbounded_channel();
501            let request = TcpSocketRequest::OpenPort {
502                port,
503                response_channel: sender,
504                use_tls,
505            };
506
507            let _ = socket_sender.send(request);
508            self.futures
509                .push(internal_futures::wait_for_socket_response(receiver, port).boxed());
510        }
511
512        let app_map = port_map
513            .rtmp_applications
514            .entry(rtmp_app.clone())
515            .or_insert(RtmpAppMapping {
516                publisher_registrants: HashMap::new(),
517                watcher_registrants: HashMap::new(),
518                active_stream_keys: HashMap::new(),
519            });
520
521        match listener {
522            ListenerRequest::Publisher {
523                channel,
524                stream_id,
525                requires_registrant_approval,
526            } => {
527                let can_be_added = match &stream_key {
528                    StreamKeyRegistration::Any => {
529                        if !app_map.publisher_registrants.is_empty() {
530                            warn!("Rtmp server publish request registration failed for port {}, app '{}', all stream keys': \
531                                    Another system is registered for at least one stream key on this port and app", port, rtmp_app);
532
533                            false
534                        } else {
535                            true
536                        }
537                    }
538
539                    StreamKeyRegistration::Exact(key) => {
540                        if app_map
541                            .publisher_registrants
542                            .contains_key(&StreamKeyRegistration::Any)
543                        {
544                            warn!("Rtmp server publish request registration failed for port {}, app '{}', stream key '{}': \
545                                    Another system is registered for all stream keys on this port/app", port, rtmp_app, key);
546
547                            false
548                        } else if app_map
549                            .publisher_registrants
550                            .contains_key(&StreamKeyRegistration::Exact(key.clone()))
551                        {
552                            warn!("Rtmp server publish request registration failed for port {}, app '{}', stream key '{}': \
553                                    Another system is registered for this port/app/stream key combo", port, rtmp_app, key);
554
555                            false
556                        } else {
557                            true
558                        }
559                    }
560                };
561
562                if !can_be_added {
563                    let _ =
564                        channel.send(RtmpEndpointPublisherMessage::PublisherRegistrationFailed {});
565
566                    return;
567                }
568
569                let (cancel_sender, cancel_receiver) = unbounded_channel();
570                app_map.publisher_registrants.insert(
571                    stream_key.clone(),
572                    PublishingRegistrant {
573                        response_channel: channel.clone(),
574                        stream_id,
575                        ip_restrictions,
576                        requires_registrant_approval,
577                        cancellation_notifier: cancel_receiver,
578                    },
579                );
580
581                self.futures.push(
582                    internal_futures::wait_for_publisher_channel_closed(
583                        channel.clone(),
584                        port,
585                        rtmp_app,
586                        stream_key,
587                        cancel_sender,
588                    )
589                    .boxed(),
590                );
591
592                // If the port isn't in a listening mode, we don't want to claim that
593                // registration was successful yet
594                if port_map.status == PortStatus::Open {
595                    let _ = channel
596                        .send(RtmpEndpointPublisherMessage::PublisherRegistrationSuccessful {});
597                }
598            }
599
600            ListenerRequest::Watcher {
601                media_channel,
602                notification_channel,
603                requires_registrant_approval,
604            } => {
605                let can_be_added = match &stream_key {
606                    StreamKeyRegistration::Any => {
607                        if !app_map.watcher_registrants.is_empty() {
608                            warn!("Rtmp server watcher registration failed for port {}, app '{}', all stream keys': \
609                                    Another system is registered for at least one stream key on this port and app", port, rtmp_app);
610
611                            false
612                        } else {
613                            true
614                        }
615                    }
616
617                    StreamKeyRegistration::Exact(key) => {
618                        if app_map
619                            .watcher_registrants
620                            .contains_key(&StreamKeyRegistration::Any)
621                        {
622                            warn!("Rtmp server watcher registration failed for port {}, app '{}', stream key '{}': \
623                                    Another system is registered for all stream keys on this port/app", port, rtmp_app, key);
624
625                            false
626                        } else if app_map
627                            .watcher_registrants
628                            .contains_key(&StreamKeyRegistration::Exact(key.clone()))
629                        {
630                            warn!("Rtmp server watcher registration failed for port {}, app '{}', stream key '{}': \
631                                    Another system is registered for this port/app/stream key combo", port, rtmp_app, key);
632
633                            false
634                        } else {
635                            true
636                        }
637                    }
638                };
639
640                if !can_be_added {
641                    let _ = notification_channel
642                        .send(RtmpEndpointWatcherNotification::WatcherRegistrationFailed);
643
644                    return;
645                }
646
647                let (cancel_sender, cancel_receiver) = unbounded_channel();
648                app_map.watcher_registrants.insert(
649                    stream_key.clone(),
650                    WatcherRegistrant {
651                        response_channel: notification_channel.clone(),
652                        ip_restrictions,
653                        requires_registrant_approval,
654                        cancellation_notifier: cancel_receiver,
655                    },
656                );
657
658                self.futures.push(
659                    internal_futures::wait_for_watcher_notification_channel_closed(
660                        notification_channel.clone(),
661                        port,
662                        rtmp_app.clone(),
663                        stream_key.clone(),
664                        cancel_sender,
665                    )
666                    .boxed(),
667                );
668
669                self.futures.push(
670                    internal_futures::wait_for_watcher_media(
671                        media_channel,
672                        port,
673                        rtmp_app,
674                        stream_key,
675                    )
676                    .boxed(),
677                );
678
679                // If the port isn't open yet, we don't want to claim registration was successful yet
680                if port_map.status == PortStatus::Open {
681                    let _ = notification_channel
682                        .send(RtmpEndpointWatcherNotification::WatcherRegistrationSuccessful);
683                }
684            }
685        }
686    }
687
688    #[instrument(skip(self))]
689    fn handle_socket_response(&mut self, port: u16, response: TcpSocketResponse) {
690        let mut remove_port = false;
691        {
692            let port_map = match self.ports.get_mut(&port) {
693                Some(x) => x,
694                None => {
695                    error!("Received socket response for port {} but that port has not been registered", port);
696
697                    return;
698                }
699            };
700
701            match response {
702                TcpSocketResponse::RequestDenied { reason } => {
703                    warn!("Port {} could not be opened: {:?}", port, reason);
704
705                    for (_, app_map) in &port_map.rtmp_applications {
706                        for (_, publisher) in &app_map.publisher_registrants {
707                            let _ = publisher
708                                .response_channel
709                                .send(RtmpEndpointPublisherMessage::PublisherRegistrationFailed {});
710                        }
711
712                        for (_, watcher) in &app_map.watcher_registrants {
713                            let _ = watcher
714                                .response_channel
715                                .send(RtmpEndpointWatcherNotification::WatcherRegistrationFailed);
716                        }
717                    }
718
719                    remove_port = true;
720                }
721
722                TcpSocketResponse::PortForciblyClosed { port: _ } => {
723                    warn!("Port {} closed", port);
724
725                    remove_port = true;
726                }
727
728                TcpSocketResponse::RequestAccepted {} => {
729                    info!("Port {} successfully opened", port);
730
731                    // Since the port was successfully opened, any pending registrants need to be
732                    // informed that their registration has now been successful
733                    for (_, app_map) in &port_map.rtmp_applications {
734                        for (_, publisher) in &app_map.publisher_registrants {
735                            let _ = publisher.response_channel.send(
736                                RtmpEndpointPublisherMessage::PublisherRegistrationSuccessful {},
737                            );
738                        }
739
740                        for (_, watcher) in &app_map.watcher_registrants {
741                            let _ = watcher.response_channel.send(
742                                RtmpEndpointWatcherNotification::WatcherRegistrationSuccessful,
743                            );
744                        }
745                    }
746
747                    port_map.status = PortStatus::Open;
748                }
749
750                TcpSocketResponse::NewConnection {
751                    port: _,
752                    connection_id,
753                    outgoing_bytes,
754                    incoming_bytes,
755                    socket_address,
756                } => {
757                    let (request_sender, request_receiver) = unbounded_channel();
758                    let (response_sender, response_receiver) = unbounded_channel();
759                    let handler = RtmpServerConnectionHandler::new(
760                        connection_id.clone(),
761                        outgoing_bytes,
762                        request_sender,
763                    );
764                    tokio::spawn(handler.run_async(response_receiver, incoming_bytes));
765
766                    port_map.connections.insert(
767                        connection_id.clone(),
768                        Connection {
769                            response_channel: response_sender,
770                            state: ConnectionState::None,
771                            socket_address,
772                            received_registrant_approval: false,
773                        },
774                    );
775
776                    self.futures.push(
777                        internal_futures::wait_for_connection_request(
778                            port,
779                            connection_id,
780                            request_receiver,
781                        )
782                        .boxed(),
783                    );
784                }
785
786                TcpSocketResponse::Disconnection { connection_id } => {
787                    // Clean this connection up
788                    clean_disconnected_connection(connection_id, port_map);
789                }
790            }
791        }
792
793        if remove_port {
794            info!("Port {port} removed");
795            self.ports.remove(&port);
796        }
797    }
798
799    #[instrument(skip(self))]
800    fn handle_connection_handler_request(
801        &mut self,
802        port: u16,
803        connection_id: ConnectionId,
804        request: ConnectionRequest,
805    ) {
806        let port_map = match self.ports.get_mut(&port) {
807            Some(x) => x,
808            None => {
809                error!(
810                    "Connection handler for connection {:?} sent {:?} on port {}, but that \
811                port isn't managed yet!",
812                    connection_id, request, port
813                );
814
815                return;
816            }
817        };
818
819        match request {
820            ConnectionRequest::RequestConnectToApp { rtmp_app } => {
821                handle_connection_request_connect_to_app(&connection_id, port_map, port, rtmp_app);
822            }
823
824            ConnectionRequest::RequestPublish {
825                rtmp_app,
826                stream_key,
827            } => {
828                let future = handle_connection_request_publish(
829                    &connection_id,
830                    port_map,
831                    port,
832                    rtmp_app,
833                    &stream_key,
834                    None,
835                );
836
837                if let Some(future) = future {
838                    self.futures.push(future);
839                }
840            }
841
842            ConnectionRequest::RequestWatch {
843                rtmp_app,
844                stream_key,
845            } => {
846                let future = handle_connection_request_watch(
847                    connection_id,
848                    port_map,
849                    port,
850                    rtmp_app,
851                    &stream_key,
852                    None,
853                );
854
855                if let Some(future) = future {
856                    self.futures.push(future);
857                }
858            }
859
860            ConnectionRequest::PublishFinished => {
861                handle_connection_stop_publish(connection_id, port_map);
862            }
863
864            ConnectionRequest::PlaybackFinished => {
865                handle_connection_stop_watch(connection_id, port_map);
866            }
867        }
868    }
869
870    fn remove_publish_registration(
871        &mut self,
872        port: u16,
873        app: String,
874        stream_key: StreamKeyRegistration,
875    ) {
876        let port_map = match self.ports.get_mut(&port) {
877            Some(x) => x,
878            None => return,
879        };
880
881        let app_map = match port_map.rtmp_applications.get_mut(app.as_str()) {
882            Some(x) => x,
883            None => return,
884        };
885
886        if let None = app_map.publisher_registrants.remove(&stream_key) {
887            return;
888        }
889
890        // Remove all publishers tied to this registrant
891        let mut keys_to_remove = Vec::new();
892        if let StreamKeyRegistration::Exact(key) = stream_key {
893            keys_to_remove.push(key);
894        } else {
895            keys_to_remove.extend(app_map.active_stream_keys.keys().map(|x| x.clone()));
896        }
897
898        for key in keys_to_remove {
899            if let Some(connection) = app_map.active_stream_keys.get_mut(&key) {
900                if let Some(id) = &connection.publisher {
901                    if let Some(connection) = port_map.connections.get(id) {
902                        let _ = connection
903                            .response_channel
904                            .send(ConnectionResponse::Disconnect);
905                    }
906                }
907
908                connection.publisher = None;
909            }
910        }
911
912        if app_map.publisher_registrants.is_empty() && app_map.watcher_registrants.is_empty() {
913            port_map.rtmp_applications.remove(&app);
914        }
915    }
916
917    fn remove_watcher_registration(
918        &mut self,
919        port: u16,
920        app: String,
921        stream_key: StreamKeyRegistration,
922    ) {
923        let port_map = match self.ports.get_mut(&port) {
924            Some(x) => x,
925            None => return,
926        };
927
928        let app_map = match port_map.rtmp_applications.get_mut(app.as_str()) {
929            Some(x) => x,
930            None => return,
931        };
932
933        if let None = app_map.watcher_registrants.remove(&stream_key) {
934            return;
935        }
936
937        // Remove all watchers tied to this registrant
938        let mut keys_to_remove = Vec::new();
939        if let StreamKeyRegistration::Exact(key) = stream_key {
940            keys_to_remove.push(key);
941        } else {
942            keys_to_remove.extend(app_map.active_stream_keys.keys().map(|x| x.clone()));
943        }
944
945        for key in keys_to_remove {
946            if let Some(connection) = app_map.active_stream_keys.get_mut(&key) {
947                for id in connection.watchers.keys() {
948                    if let Some(connection) = port_map.connections.get(id) {
949                        let _ = connection
950                            .response_channel
951                            .send(ConnectionResponse::Disconnect);
952                    }
953                }
954
955                connection.watchers.clear();
956            }
957        }
958
959        if app_map.watcher_registrants.is_empty() && app_map.publisher_registrants.is_empty() {
960            port_map.rtmp_applications.remove(&app);
961        }
962    }
963}
964
965fn handle_connection_stop_watch(connection_id: ConnectionId, port_map: &mut PortMapping) {
966    let connection = match port_map.connections.get_mut(&connection_id) {
967        Some(connection) => connection,
968        None => {
969            warn!("Connection handler for connection {:?} a sent playback finished notification, but \
970                that connection isn't being tracked", connection_id);
971
972            return;
973        }
974    };
975
976    match &connection.state {
977        ConnectionState::Watching {
978            rtmp_app,
979            stream_key,
980        } => {
981            let rtmp_app = rtmp_app.clone();
982            let stream_key = stream_key.clone();
983            connection.state = ConnectionState::None;
984            match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
985                None => (),
986                Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
987                    None => (),
988                    Some(active_key) => {
989                        active_key.watchers.remove(&connection_id);
990
991                        if active_key.watchers.is_empty() {
992                            let registrant = match app_map
993                                .watcher_registrants
994                                .get(&StreamKeyRegistration::Any)
995                            {
996                                Some(x) => Some(x),
997                                None => app_map
998                                    .watcher_registrants
999                                    .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1000                            };
1001
1002                            if let Some(registrant) = registrant {
1003                                let _ = registrant.response_channel.send(
1004                                    RtmpEndpointWatcherNotification::StreamKeyBecameInactive {
1005                                        stream_key,
1006                                    },
1007                                );
1008                            }
1009                        }
1010                    }
1011                },
1012            }
1013        }
1014
1015        _ => (),
1016    }
1017}
1018
1019fn handle_connection_stop_publish(connection_id: ConnectionId, port_map: &mut PortMapping) {
1020    let connection = match port_map.connections.get_mut(&connection_id) {
1021        Some(connection) => connection,
1022        None => {
1023            warn!(
1024                "Connection handler for connection {:?} a sent publish finished notification, but \
1025                that connection isn't being tracked",
1026                connection_id
1027            );
1028
1029            return;
1030        }
1031    };
1032
1033    match &connection.state {
1034        ConnectionState::Publishing {
1035            rtmp_app,
1036            stream_key,
1037        } => {
1038            let rtmp_app = rtmp_app.clone();
1039            let stream_key = stream_key.clone();
1040            connection.state = ConnectionState::None;
1041
1042            match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1043                None => (),
1044                Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
1045                    None => (),
1046                    Some(active_key) => {
1047                        match &active_key.publisher {
1048                            None => (),
1049                            Some(publisher_id) => {
1050                                if *publisher_id == connection_id {
1051                                    active_key.publisher = None;
1052                                    active_key.latest_video_sequence_header = None;
1053                                    active_key.latest_audio_sequence_header = None;
1054
1055                                    let registrant = match app_map
1056                                        .publisher_registrants
1057                                        .get(&StreamKeyRegistration::Any)
1058                                    {
1059                                        Some(x) => Some(x),
1060                                        None => app_map
1061                                            .publisher_registrants
1062                                            .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1063                                    };
1064
1065                                    if let Some(registrant) = registrant {
1066                                        let _ = registrant.response_channel.send(
1067                                            RtmpEndpointPublisherMessage::PublishingStopped {
1068                                                connection_id,
1069                                            },
1070                                        );
1071                                    }
1072                                }
1073                            }
1074                        };
1075                    }
1076                },
1077            }
1078        }
1079
1080        _ => (),
1081    }
1082}
1083
1084#[instrument(skip(port_map))]
1085fn handle_connection_request_watch(
1086    connection_id: ConnectionId,
1087    port_map: &mut PortMapping,
1088    port: u16,
1089    rtmp_app: String,
1090    stream_key: &String,
1091    reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
1092) -> Option<BoxFuture<'static, FutureResult>> {
1093    let connection = match port_map.connections.get_mut(&connection_id) {
1094        Some(x) => x,
1095        None => {
1096            warn!("Connection handler for connection {:?} sent request to watch on port {}, but that \
1097                connection isn't being tracked.", connection_id, port);
1098
1099            return None;
1100        }
1101    };
1102
1103    // Has this app been registered yet?
1104    let application = match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1105        Some(x) => x,
1106        None => {
1107            info!(
1108                "Connection {} requested watching '{}/{}' but that app is not registered \
1109                        to accept watchers",
1110                connection_id, rtmp_app, stream_key
1111            );
1112
1113            let _ = connection
1114                .response_channel
1115                .send(ConnectionResponse::RequestRejected);
1116
1117            return None;
1118        }
1119    };
1120
1121    // Is this stream key registered for watching
1122    let registrant = match application
1123        .watcher_registrants
1124        .get(&StreamKeyRegistration::Any)
1125    {
1126        Some(x) => x,
1127        None => {
1128            match application
1129                .watcher_registrants
1130                .get(&StreamKeyRegistration::Exact(stream_key.clone()))
1131            {
1132                Some(x) => x,
1133                None => {
1134                    info!(
1135                        "Connection {} requested watching '{}/{}' but that stream key is \
1136                                not registered to accept watchers",
1137                        connection_id, rtmp_app, stream_key
1138                    );
1139
1140                    let _ = connection
1141                        .response_channel
1142                        .send(ConnectionResponse::RequestRejected);
1143
1144                    return None;
1145                }
1146            }
1147        }
1148    };
1149
1150    if !is_ip_allowed(&connection.socket_address, &registrant.ip_restrictions) {
1151        error!(
1152            "Connection {} requested watching to '{}/{}', but the client's ip address of '{}' \
1153        is not allowed",
1154            connection_id,
1155            rtmp_app,
1156            stream_key,
1157            connection.socket_address.ip()
1158        );
1159
1160        let _ = connection
1161            .response_channel
1162            .send(ConnectionResponse::RequestRejected);
1163
1164        return None;
1165    }
1166
1167    if registrant.requires_registrant_approval && !connection.received_registrant_approval {
1168        info!(
1169            "Connection {} requested watching to '{}/{}' but requires approval from the \
1170            registrant first",
1171            connection_id, rtmp_app, stream_key
1172        );
1173
1174        connection.state = ConnectionState::WaitingForWatchValidation {
1175            rtmp_app,
1176            stream_key: stream_key.clone(),
1177        };
1178
1179        let (sender, receiver) = channel();
1180        let _ = registrant.response_channel.send(
1181            RtmpEndpointWatcherNotification::WatcherRequiringApproval {
1182                stream_key: stream_key.clone(),
1183                connection_id: connection_id.clone(),
1184                response_channel: sender,
1185            },
1186        );
1187
1188        let future = wait_for_validation(port, connection_id.clone(), receiver).boxed();
1189
1190        return Some(future);
1191    }
1192
1193    let active_stream_key = application
1194        .active_stream_keys
1195        .entry(stream_key.clone())
1196        .or_insert(StreamKeyConnections {
1197            watchers: HashMap::new(),
1198            publisher: None,
1199            latest_video_sequence_header: None,
1200            latest_audio_sequence_header: None,
1201        });
1202
1203    connection.state = ConnectionState::Watching {
1204        rtmp_app,
1205        stream_key: stream_key.clone(),
1206    };
1207
1208    if active_stream_key.watchers.is_empty() {
1209        let _ = registrant.response_channel.send(
1210            RtmpEndpointWatcherNotification::StreamKeyBecameActive {
1211                stream_key: stream_key.clone(),
1212                reactor_update_channel,
1213            },
1214        );
1215    }
1216
1217    let (media_sender, media_receiver) = unbounded_channel();
1218
1219    // If we have a sequence headers available, send it to the client so they can immediately
1220    // start decoding video
1221    if let Some(sequence_header) = &active_stream_key.latest_video_sequence_header {
1222        let _ = media_sender.send(RtmpEndpointMediaData::NewVideoData {
1223            codec: sequence_header.codec.clone(),
1224            is_sequence_header: true,
1225            is_keyframe: true,
1226            data: sequence_header.data.clone(),
1227            timestamp: RtmpTimestamp::new(0),
1228            composition_time_offset: 0,
1229        });
1230    }
1231
1232    if let Some(sequence_header) = &active_stream_key.latest_audio_sequence_header {
1233        let _ = media_sender.send(RtmpEndpointMediaData::NewAudioData {
1234            codec: sequence_header.codec.clone(),
1235            data: sequence_header.data.clone(),
1236            is_sequence_header: true,
1237            timestamp: RtmpTimestamp::new(0),
1238        });
1239    }
1240
1241    active_stream_key
1242        .watchers
1243        .insert(connection_id, WatcherDetails { media_sender });
1244
1245    let _ = connection
1246        .response_channel
1247        .send(ConnectionResponse::WatchRequestAccepted {
1248            channel: media_receiver,
1249        });
1250
1251    return None;
1252}
1253
1254#[instrument(skip(port_map))]
1255fn handle_connection_request_publish(
1256    connection_id: &ConnectionId,
1257    port_map: &mut PortMapping,
1258    port: u16,
1259    rtmp_app: String,
1260    stream_key: &String,
1261    reactor_response_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
1262) -> Option<BoxFuture<'static, FutureResult>> {
1263    let connection = match port_map.connections.get_mut(&connection_id) {
1264        Some(x) => x,
1265        None => {
1266            warn!("Connection handler for connection {:?} sent a request to publish on port {}, but that \
1267                connection isn't being tracked.", connection_id, port);
1268
1269            return None;
1270        }
1271    };
1272
1273    // Has this RTMP application been registered yet?
1274    let application = match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1275        Some(x) => x,
1276        None => {
1277            info!("Connection {} requested publishing to '{}/{}', but the RTMP app '{}' isn't registered yet",
1278                    connection_id, rtmp_app, stream_key, rtmp_app);
1279
1280            let _ = connection
1281                .response_channel
1282                .send(ConnectionResponse::RequestRejected);
1283
1284            return None;
1285        }
1286    };
1287
1288    // Has this stream key been registered yet?
1289    let registrant = match application
1290        .publisher_registrants
1291        .get(&StreamKeyRegistration::Any)
1292    {
1293        Some(x) => x,
1294        None => {
1295            match application
1296                .publisher_registrants
1297                .get(&StreamKeyRegistration::Exact(stream_key.clone()))
1298            {
1299                Some(x) => x,
1300                None => {
1301                    error!(
1302                        "Connection {} requested publishing to '{}/{}', but no one has registered \
1303                            to support publishers on that stream key",
1304                        connection_id, rtmp_app, stream_key
1305                    );
1306
1307                    let _ = connection
1308                        .response_channel
1309                        .send(ConnectionResponse::RequestRejected);
1310
1311                    return None;
1312                }
1313            }
1314        }
1315    };
1316
1317    // app/stream key combination is valid and we have a registrant for it
1318    let stream_key_connections = application
1319        .active_stream_keys
1320        .entry(stream_key.clone())
1321        .or_insert(StreamKeyConnections {
1322            publisher: None,
1323            watchers: HashMap::new(),
1324            latest_video_sequence_header: None,
1325            latest_audio_sequence_header: None,
1326        });
1327
1328    // Is someone already publishing on this stream key?
1329    if let Some(id) = &stream_key_connections.publisher {
1330        error!(
1331            "Connection {} requested publishing to '{}/{}', but connection {} is already \
1332        publishing to this stream key",
1333            connection_id, rtmp_app, stream_key, id
1334        );
1335
1336        let _ = connection
1337            .response_channel
1338            .send(ConnectionResponse::RequestRejected);
1339
1340        return None;
1341    }
1342
1343    if !is_ip_allowed(&connection.socket_address, &registrant.ip_restrictions) {
1344        error!(
1345            "Connection {} requested publishing to '{}/{}', but the client's ip address of '{}' \
1346        is not allowed",
1347            connection_id,
1348            rtmp_app,
1349            stream_key,
1350            connection.socket_address.ip()
1351        );
1352
1353        let _ = connection
1354            .response_channel
1355            .send(ConnectionResponse::RequestRejected);
1356
1357        return None;
1358    }
1359
1360    if registrant.requires_registrant_approval && !connection.received_registrant_approval {
1361        info!(
1362            "Connection {} requested publishing to '{}/{}' but requires approval from the \
1363            registrant first",
1364            connection_id, rtmp_app, stream_key
1365        );
1366
1367        connection.state = ConnectionState::WaitingForPublishValidation {
1368            rtmp_app,
1369            stream_key: stream_key.clone(),
1370        };
1371
1372        let (sender, receiver) = channel();
1373        let _ = registrant.response_channel.send(
1374            RtmpEndpointPublisherMessage::PublisherRequiringApproval {
1375                stream_key: stream_key.clone(),
1376                connection_id: connection_id.clone(),
1377                response_channel: sender,
1378            },
1379        );
1380
1381        let future = wait_for_validation(port, connection_id.clone(), receiver).boxed();
1382
1383        return Some(future);
1384    }
1385
1386    // All good to publish
1387    stream_key_connections.publisher = Some(connection_id.clone());
1388    connection.state = ConnectionState::Publishing {
1389        rtmp_app: rtmp_app.clone(),
1390        stream_key: stream_key.clone(),
1391    };
1392
1393    let stream_id = if let Some(id) = &registrant.stream_id {
1394        (*id).clone()
1395    } else {
1396        StreamId(Uuid::new_v4().to_string())
1397    };
1398
1399    let _ = connection
1400        .response_channel
1401        .send(ConnectionResponse::PublishRequestAccepted {
1402            channel: registrant.response_channel.clone(),
1403        });
1404
1405    let _ = registrant
1406        .response_channel
1407        .send(RtmpEndpointPublisherMessage::NewPublisherConnected {
1408            connection_id: connection_id.clone(),
1409            stream_key: stream_key.clone(),
1410            stream_id,
1411            reactor_update_channel: reactor_response_channel,
1412        });
1413
1414    return None;
1415}
1416
1417#[instrument(skip(port_map))]
1418fn handle_connection_request_connect_to_app(
1419    connection_id: &ConnectionId,
1420    port_map: &mut PortMapping,
1421    port: u16,
1422    rtmp_app: String,
1423) {
1424    let connection = match port_map.connections.get_mut(&connection_id) {
1425        Some(x) => x,
1426        None => {
1427            warn!("Connection handler for connection {} sent a request to connect to an rtmp app on port {}, \
1428            but that connection isn't being tracked.", connection_id, port);
1429
1430            return;
1431        }
1432    };
1433    let response = if !port_map.rtmp_applications.contains_key(rtmp_app.as_str()) {
1434        info!(
1435            "Connection {} requested connection to RTMP app '{}' which isn't registered yet",
1436            connection_id, rtmp_app
1437        );
1438
1439        ConnectionResponse::RequestRejected
1440    } else {
1441        info!(
1442            "Connection {} accepted connection for RTMP app '{}'",
1443            connection_id, rtmp_app
1444        );
1445
1446        ConnectionResponse::AppConnectRequestAccepted
1447    };
1448
1449    let _ = connection.response_channel.send(response);
1450}
1451
1452#[instrument(skip(port_map))]
1453fn clean_disconnected_connection(connection_id: ConnectionId, port_map: &mut PortMapping) {
1454    let connection = match port_map.connections.remove(&connection_id) {
1455        Some(x) => x,
1456        None => return,
1457    };
1458
1459    info!("Connection {} disconnected.  Cleaning it up", connection_id);
1460    match connection.state {
1461        ConnectionState::None => (),
1462        ConnectionState::WaitingForPublishValidation { .. } => (),
1463        ConnectionState::WaitingForWatchValidation { .. } => (),
1464        ConnectionState::Publishing {
1465            rtmp_app,
1466            stream_key,
1467        } => match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1468            None => (),
1469            Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
1470                None => (),
1471                Some(active_key) => {
1472                    match &active_key.publisher {
1473                        None => (),
1474                        Some(publisher_id) => {
1475                            if *publisher_id == connection_id {
1476                                active_key.publisher = None;
1477                                active_key.latest_video_sequence_header = None;
1478                                active_key.latest_audio_sequence_header = None;
1479
1480                                let registrant = match app_map
1481                                    .publisher_registrants
1482                                    .get(&StreamKeyRegistration::Any)
1483                                {
1484                                    Some(x) => Some(x),
1485                                    None => app_map
1486                                        .publisher_registrants
1487                                        .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1488                                };
1489
1490                                if let Some(registrant) = registrant {
1491                                    let _ = registrant.response_channel.send(
1492                                        RtmpEndpointPublisherMessage::PublishingStopped {
1493                                            connection_id,
1494                                        },
1495                                    );
1496                                }
1497                            }
1498                        }
1499                    };
1500                }
1501            },
1502        },
1503
1504        ConnectionState::Watching {
1505            rtmp_app,
1506            stream_key,
1507        } => match port_map.rtmp_applications.get_mut(rtmp_app.as_str()) {
1508            None => (),
1509            Some(app_map) => match app_map.active_stream_keys.get_mut(stream_key.as_str()) {
1510                None => (),
1511                Some(active_key) => {
1512                    active_key.watchers.remove(&connection_id);
1513
1514                    if active_key.watchers.is_empty() {
1515                        let registrant =
1516                            match app_map.watcher_registrants.get(&StreamKeyRegistration::Any) {
1517                                Some(x) => Some(x),
1518                                None => app_map
1519                                    .watcher_registrants
1520                                    .get(&StreamKeyRegistration::Exact(stream_key.clone())),
1521                            };
1522
1523                        if let Some(registrant) = registrant {
1524                            let _ = registrant.response_channel.send(
1525                                RtmpEndpointWatcherNotification::StreamKeyBecameInactive {
1526                                    stream_key,
1527                                },
1528                            );
1529                        }
1530                    }
1531                }
1532            },
1533        },
1534    };
1535}
1536
1537mod internal_futures {
1538    use super::{
1539        FutureResult, RtmpEndpointPublisherMessage, RtmpEndpointRequest, StreamKeyRegistration,
1540    };
1541    use crate::endpoints::rtmp_server::actor::connection_handler::ConnectionRequest;
1542    use crate::endpoints::rtmp_server::{
1543        RtmpEndpointMediaMessage, RtmpEndpointWatcherNotification, ValidationResponse,
1544    };
1545    use crate::net::tcp::{TcpSocketRequest, TcpSocketResponse};
1546    use crate::net::ConnectionId;
1547    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
1548    use tokio::sync::oneshot::Receiver;
1549
1550    pub(super) async fn wait_for_endpoint_request(
1551        mut endpoint_receiver: UnboundedReceiver<RtmpEndpointRequest>,
1552    ) -> FutureResult {
1553        match endpoint_receiver.recv().await {
1554            None => FutureResult::NoMoreEndpointRequesters,
1555            Some(request) => FutureResult::EndpointRequestReceived {
1556                request,
1557                receiver: endpoint_receiver,
1558            },
1559        }
1560    }
1561
1562    pub(super) async fn wait_for_socket_response(
1563        mut socket_receiver: UnboundedReceiver<TcpSocketResponse>,
1564        port: u16,
1565    ) -> FutureResult {
1566        match socket_receiver.recv().await {
1567            None => FutureResult::PortGone { port },
1568            Some(response) => FutureResult::SocketResponseReceived {
1569                port,
1570                response,
1571                receiver: socket_receiver,
1572            },
1573        }
1574    }
1575
1576    pub(super) async fn wait_for_publisher_channel_closed(
1577        sender: UnboundedSender<RtmpEndpointPublisherMessage>,
1578        port: u16,
1579        app_name: String,
1580        stream_key: StreamKeyRegistration,
1581        cancellation_receiver: UnboundedSender<()>,
1582    ) -> FutureResult {
1583        tokio::select! {
1584            _ = sender.closed() => (),
1585            _ = cancellation_receiver.closed() => (),
1586        }
1587
1588        FutureResult::PublishingRegistrantGone {
1589            port,
1590            app: app_name,
1591            stream_key,
1592        }
1593    }
1594
1595    pub(super) async fn wait_for_connection_request(
1596        port: u16,
1597        connection_id: ConnectionId,
1598        mut receiver: UnboundedReceiver<ConnectionRequest>,
1599    ) -> FutureResult {
1600        match receiver.recv().await {
1601            Some(request) => FutureResult::ConnectionHandlerRequestReceived {
1602                port,
1603                receiver,
1604                connection_id,
1605                request,
1606            },
1607
1608            None => FutureResult::ConnectionHandlerGone {
1609                port,
1610                connection_id,
1611            },
1612        }
1613    }
1614
1615    pub(super) async fn wait_for_watcher_notification_channel_closed(
1616        sender: UnboundedSender<RtmpEndpointWatcherNotification>,
1617        port: u16,
1618        app_name: String,
1619        stream_key: StreamKeyRegistration,
1620        cancellation_token: UnboundedSender<()>,
1621    ) -> FutureResult {
1622        tokio::select! {
1623            _ = sender.closed() => (),
1624            _ = cancellation_token.closed() => (),
1625        }
1626
1627        FutureResult::WatcherRegistrantGone {
1628            port,
1629            app: app_name,
1630            stream_key,
1631        }
1632    }
1633
1634    pub(super) async fn wait_for_watcher_media(
1635        mut receiver: UnboundedReceiver<RtmpEndpointMediaMessage>,
1636        port: u16,
1637        app_name: String,
1638        stream_key_registration: StreamKeyRegistration,
1639    ) -> FutureResult {
1640        match receiver.recv().await {
1641            None => FutureResult::WatcherRegistrantGone {
1642                port,
1643                app: app_name,
1644                stream_key: stream_key_registration,
1645            },
1646            Some(message) => FutureResult::WatcherMediaDataReceived {
1647                port,
1648                app: app_name,
1649                stream_key: message.stream_key,
1650                stream_key_registration,
1651                data: message.data,
1652                receiver,
1653            },
1654        }
1655    }
1656
1657    pub(super) async fn wait_for_validation(
1658        port: u16,
1659        connection_id: ConnectionId,
1660        receiver: Receiver<ValidationResponse>,
1661    ) -> FutureResult {
1662        match receiver.await {
1663            Ok(response) => {
1664                FutureResult::ValidationApprovalResponseReceived(port, connection_id, response)
1665            }
1666            Err(_) => FutureResult::ValidationApprovalResponseReceived(
1667                port,
1668                connection_id,
1669                ValidationResponse::Reject,
1670            ),
1671        }
1672    }
1673
1674    pub(super) async fn notify_on_socket_manager_gone(
1675        sender: UnboundedSender<TcpSocketRequest>,
1676    ) -> FutureResult {
1677        sender.closed().await;
1678
1679        FutureResult::SocketManagerClosed
1680    }
1681}
1682
1683fn is_ip_allowed(client_socket: &SocketAddr, ip_restrictions: &IpRestriction) -> bool {
1684    match ip_restrictions {
1685        IpRestriction::None => return true,
1686        IpRestriction::Allow(allowed_ips) => {
1687            if let SocketAddr::V4(client_ip) = client_socket {
1688                return allowed_ips.into_iter().any(|ip| ip.matches(client_ip.ip()));
1689            }
1690
1691            return false; // ipv6 clients not supported atm
1692        }
1693
1694        IpRestriction::Deny(denied_ips) => {
1695            if let SocketAddr::V4(client_ip) = client_socket {
1696                return denied_ips.into_iter().all(|ip| !ip.matches(client_ip.ip()));
1697            }
1698
1699            return false; // ipv6
1700        }
1701    };
1702}