mmids_core/workflows/steps/rtmp_watch/
mod.rs

1//! The RTMP watch step registers with the RTMP server endpoint to allow for RTMP clients to connect
2//! and watch media streams based on the specified port, application name, and stream key
3//! combinations.  When the workflow step is passed in media notifications it passes them to
4//! the RTMP endpoint for distribution for waiting clients.
5//!
6//! When a stream key of `*` is specified, this allows for RTMP clients to connect on any stream key
7//! for the rtmp application to watch video.  Media packets will be routed to clients that connected
8//! on stream key that matches the name of the stream in the pipeline.
9//!
10//! If an exact stream key is configured, then the first media stream that comes into the step will
11//! be surfaced on that stream key.
12//!
13//! All media notifications that are passed into this step are passed onto the next step.
14
15#[cfg(test)]
16mod tests;
17
18use crate::endpoints::rtmp_server::{
19    IpRestriction, RegistrationType, RtmpEndpointMediaData, RtmpEndpointMediaMessage,
20    RtmpEndpointRequest, RtmpEndpointWatcherNotification, StreamKeyRegistration,
21    ValidationResponse,
22};
23use crate::net::{IpAddress, IpAddressParseError};
24use crate::reactors::manager::ReactorManagerRequest;
25use crate::reactors::ReactorWorkflowUpdate;
26use crate::utils::hash_map_to_stream_metadata;
27use crate::workflows::definitions::WorkflowStepDefinition;
28use crate::workflows::steps::factory::StepGenerator;
29use crate::workflows::steps::{
30    StepCreationResult, StepFutureResult, StepInputs, StepOutputs, StepStatus, WorkflowStep,
31};
32use crate::workflows::{MediaNotification, MediaNotificationContent};
33use crate::StreamId;
34use futures::FutureExt;
35use rml_rtmp::time::RtmpTimestamp;
36use std::collections::HashMap;
37use thiserror::Error as ThisError;
38use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
39use tokio::sync::oneshot::Sender;
40use tracing::{error, info, warn};
41
42pub const PORT_PROPERTY_NAME: &'static str = "port";
43pub const APP_PROPERTY_NAME: &'static str = "rtmp_app";
44pub const STREAM_KEY_PROPERTY_NAME: &'static str = "stream_key";
45pub const IP_ALLOW_PROPERTY_NAME: &'static str = "allow_ips";
46pub const IP_DENY_PROPERTY_NAME: &'static str = "deny_ips";
47pub const RTMPS_FLAG: &'static str = "rtmps";
48pub const REACTOR_NAME: &'static str = "reactor";
49
50/// Generates new rtmp watch workflow step instances based on a given step definition.
51pub struct RtmpWatchStepGenerator {
52    rtmp_endpoint_sender: UnboundedSender<RtmpEndpointRequest>,
53    reactor_manager: UnboundedSender<ReactorManagerRequest>,
54}
55
56struct StreamWatchers {
57    // Use an unbounded channel for this instead of a one shot, as we risk losing the cancellation
58    // channel when a reactor update comes through. We can work around this by recreating the
59    // cancellation token each time, but it's easier to just use an `UnboundedSender` instead.
60    _reactor_cancel_channel: Option<UnboundedSender<()>>,
61}
62
63struct RtmpWatchStep {
64    definition: WorkflowStepDefinition,
65    port: u16,
66    rtmp_app: String,
67    stream_key: StreamKeyRegistration,
68    reactor_name: Option<String>,
69    status: StepStatus,
70    rtmp_endpoint_sender: UnboundedSender<RtmpEndpointRequest>,
71    reactor_manager: UnboundedSender<ReactorManagerRequest>,
72    media_channel: UnboundedSender<RtmpEndpointMediaMessage>,
73    stream_id_to_name_map: HashMap<StreamId, String>,
74    stream_watchers: HashMap<String, StreamWatchers>,
75}
76
77impl StepFutureResult for RtmpWatchStepFutureResult {}
78
79enum RtmpWatchStepFutureResult {
80    RtmpEndpointGone,
81    ReactorManagerGone,
82    ReactorGone,
83    RtmpWatchNotificationReceived(
84        RtmpEndpointWatcherNotification,
85        UnboundedReceiver<RtmpEndpointWatcherNotification>,
86    ),
87
88    ReactorWorkflowResponse {
89        is_valid: bool,
90        validation_channel: Sender<ValidationResponse>,
91        reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
92    },
93
94    ReactorUpdateReceived {
95        stream_name: String,
96        update: ReactorWorkflowUpdate,
97        reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
98        cancellation_channel: UnboundedReceiver<()>,
99    },
100
101    ReactorReceiverCanceled {
102        stream_name: String,
103    },
104}
105
106#[derive(ThisError, Debug)]
107enum StepStartupError {
108    #[error(
109        "No RTMP app specified.  A non-empty parameter of '{}' is required",
110        PORT_PROPERTY_NAME
111    )]
112    NoRtmpAppSpecified,
113
114    #[error(
115        "No stream key specified.  A non-empty parameter of '{}' is required",
116        APP_PROPERTY_NAME
117    )]
118    NoStreamKeySpecified,
119
120    #[error(
121        "Invalid port value of '{0}' specified.  A number from 0 to 65535 should be specified"
122    )]
123    InvalidPortSpecified(String),
124
125    #[error("Failed to parse ip address")]
126    InvalidIpAddressSpecified(#[from] IpAddressParseError),
127
128    #[error(
129        "Both {} and {} were specified, but only one is allowed",
130        IP_ALLOW_PROPERTY_NAME,
131        IP_DENY_PROPERTY_NAME
132    )]
133    BothDenyAndAllowIpRestrictionsSpecified,
134}
135
136impl RtmpWatchStepGenerator {
137    pub fn new(
138        rtmp_endpoint_sender: UnboundedSender<RtmpEndpointRequest>,
139        reactor_manager: UnboundedSender<ReactorManagerRequest>,
140    ) -> Self {
141        RtmpWatchStepGenerator {
142            rtmp_endpoint_sender,
143            reactor_manager,
144        }
145    }
146}
147
148impl StepGenerator for RtmpWatchStepGenerator {
149    fn generate(&self, definition: WorkflowStepDefinition) -> StepCreationResult {
150        let use_rtmps = match definition.parameters.get(RTMPS_FLAG) {
151            Some(_) => true,
152            None => false,
153        };
154
155        let port = match definition.parameters.get(PORT_PROPERTY_NAME) {
156            Some(Some(value)) => match value.parse::<u16>() {
157                Ok(num) => num,
158                Err(_) => {
159                    return Err(Box::new(StepStartupError::InvalidPortSpecified(
160                        value.clone(),
161                    )));
162                }
163            },
164
165            _ => {
166                if use_rtmps {
167                    443
168                } else {
169                    1935
170                }
171            }
172        };
173
174        let app = match definition.parameters.get(APP_PROPERTY_NAME) {
175            Some(Some(x)) => x.trim(),
176            _ => return Err(Box::new(StepStartupError::NoRtmpAppSpecified)),
177        };
178
179        let stream_key = match definition.parameters.get(STREAM_KEY_PROPERTY_NAME) {
180            Some(Some(x)) => x.trim(),
181            _ => return Err(Box::new(StepStartupError::NoStreamKeySpecified)),
182        };
183
184        let stream_key = if stream_key == "*" {
185            StreamKeyRegistration::Any
186        } else {
187            StreamKeyRegistration::Exact(stream_key.to_string())
188        };
189
190        let allowed_ips = match definition.parameters.get(IP_ALLOW_PROPERTY_NAME) {
191            Some(Some(value)) => IpAddress::parse_comma_delimited_list(Some(value))?,
192            _ => Vec::new(),
193        };
194
195        let denied_ips = match definition.parameters.get(IP_DENY_PROPERTY_NAME) {
196            Some(Some(value)) => IpAddress::parse_comma_delimited_list(Some(value))?,
197            _ => Vec::new(),
198        };
199
200        let ip_restriction = match (allowed_ips.len() > 0, denied_ips.len() > 0) {
201            (true, true) => {
202                return Err(Box::new(
203                    StepStartupError::BothDenyAndAllowIpRestrictionsSpecified,
204                ));
205            }
206            (true, false) => IpRestriction::Allow(allowed_ips),
207            (false, true) => IpRestriction::Deny(denied_ips),
208            (false, false) => IpRestriction::None,
209        };
210
211        let reactor_name = match definition.parameters.get(REACTOR_NAME) {
212            Some(Some(value)) => Some(value.clone()),
213            _ => None,
214        };
215
216        let (media_sender, media_receiver) = unbounded_channel();
217
218        let step = RtmpWatchStep {
219            definition: definition.clone(),
220            status: StepStatus::Created,
221            port,
222            rtmp_app: app.to_string(),
223            rtmp_endpoint_sender: self.rtmp_endpoint_sender.clone(),
224            reactor_manager: self.reactor_manager.clone(),
225            media_channel: media_sender,
226            stream_key,
227            stream_id_to_name_map: HashMap::new(),
228            reactor_name,
229            stream_watchers: HashMap::new(),
230        };
231
232        let (notification_sender, notification_receiver) = unbounded_channel();
233        let _ = step
234            .rtmp_endpoint_sender
235            .send(RtmpEndpointRequest::ListenForWatchers {
236                port: step.port,
237                rtmp_app: step.rtmp_app.clone(),
238                rtmp_stream_key: step.stream_key.clone(),
239                media_channel: media_receiver,
240                notification_channel: notification_sender,
241                ip_restrictions: ip_restriction,
242                use_tls: use_rtmps,
243                requires_registrant_approval: step.reactor_name.is_some(),
244            });
245
246        Ok((
247            Box::new(step),
248            vec![
249                wait_for_endpoint_notification(notification_receiver).boxed(),
250                notify_on_reactor_manager_close(self.reactor_manager.clone()).boxed(),
251            ],
252        ))
253    }
254}
255
256impl RtmpWatchStep {
257    fn handle_endpoint_notification(
258        &mut self,
259        notification: RtmpEndpointWatcherNotification,
260        outputs: &mut StepOutputs,
261    ) {
262        match notification {
263            RtmpEndpointWatcherNotification::WatcherRegistrationFailed => {
264                error!("Registration for RTMP watchers was denied");
265                self.status = StepStatus::Error {
266                    message: "Registration for watchers failed".to_string(),
267                };
268            }
269
270            RtmpEndpointWatcherNotification::WatcherRegistrationSuccessful => {
271                info!("Registration for RTMP watchers was accepted");
272                self.status = StepStatus::Active;
273            }
274
275            RtmpEndpointWatcherNotification::StreamKeyBecameActive {
276                stream_key,
277                reactor_update_channel,
278            } => {
279                info!(
280                    stream_key = %stream_key,
281                    "At least one watcher became active for stream key '{}'", stream_key
282                );
283
284                let cancellation_channel =
285                    if let Some(reactor_update_channel) = reactor_update_channel {
286                        let (cancellation_sender, cancellation_receiver) = unbounded_channel();
287                        let future = wait_for_reactor_update(
288                            stream_key.clone(),
289                            reactor_update_channel,
290                            cancellation_receiver,
291                        )
292                        .boxed();
293
294                        outputs.futures.push(future);
295                        Some(cancellation_sender)
296                    } else {
297                        None
298                    };
299
300                self.stream_watchers.insert(
301                    stream_key,
302                    StreamWatchers {
303                        _reactor_cancel_channel: cancellation_channel,
304                    },
305                );
306            }
307
308            RtmpEndpointWatcherNotification::StreamKeyBecameInactive { stream_key } => {
309                info!(
310                    stream_key = %stream_key,
311                    "All watchers left stream key '{}'", stream_key
312                );
313
314                self.stream_watchers.remove(&stream_key);
315            }
316
317            RtmpEndpointWatcherNotification::WatcherRequiringApproval {
318                connection_id,
319                stream_key,
320                response_channel,
321            } => {
322                if let Some(reactor) = &self.reactor_name {
323                    let (sender, receiver) = unbounded_channel();
324                    let _ = self.reactor_manager.send(
325                        ReactorManagerRequest::CreateWorkflowForStreamName {
326                            reactor_name: reactor.clone(),
327                            stream_name: stream_key,
328                            response_channel: sender,
329                        },
330                    );
331
332                    outputs
333                        .futures
334                        .push(wait_for_reactor_response(receiver, response_channel).boxed());
335                } else {
336                    error!(
337                        connection_id = %connection_id,
338                        stream_key = %stream_key,
339                        "Watcher requires approval for stream key {} but no reactor name was set",
340                        stream_key
341                    );
342
343                    let _ = response_channel.send(ValidationResponse::Reject);
344                }
345            }
346        }
347    }
348
349    fn handle_media(&mut self, media: MediaNotification, outputs: &mut StepOutputs) {
350        outputs.media.push(media.clone());
351
352        if self.status == StepStatus::Active {
353            match &media.content {
354                MediaNotificationContent::NewIncomingStream { stream_name } => {
355                    // If this step was registered with an exact stream name, then we don't care
356                    // what stream name this was originally published as.  For watch purposes treat
357                    // it as the configured stream key
358                    let stream_name = match &self.stream_key {
359                        StreamKeyRegistration::Any => stream_name,
360                        StreamKeyRegistration::Exact(configured_stream_name) => {
361                            configured_stream_name
362                        }
363                    };
364
365                    info!(
366                        stream_id = ?media.stream_id,
367                        stream_name = %stream_name,
368                        "New incoming stream notification found for stream id {:?} and stream name '{}", media.stream_id, stream_name
369                    );
370
371                    match self.stream_id_to_name_map.get(&media.stream_id) {
372                        None => (),
373                        Some(current_stream_name) => {
374                            if current_stream_name == stream_name {
375                                warn!(
376                                    stream_id = ?media.stream_id,
377                                    stream_name = %stream_name,
378                                    "New incoming stream notification for stream id {:?} is already mapped \
379                                        to this same stream name.", media.stream_id
380                                );
381                            } else {
382                                warn!(
383                                    stream_id = ?media.stream_id,
384                                    new_stream_name = %stream_name,
385                                    active_stream_name = %current_stream_name,
386                                    "New incoming stream notification for stream id {:?} is already mapped \
387                                        to the stream name '{}'", media.stream_id, current_stream_name
388                                );
389                            }
390                        }
391                    }
392
393                    self.stream_id_to_name_map
394                        .insert(media.stream_id.clone(), stream_name.clone());
395                }
396
397                MediaNotificationContent::StreamDisconnected => {
398                    info!(
399                        stream_id = ?media.stream_id,
400                        "Stream disconnected notification received for stream id {:?}", media.stream_id
401                    );
402                    match self.stream_id_to_name_map.remove(&media.stream_id) {
403                        Some(_) => (),
404                        None => {
405                            warn!(
406                                stream_id = ?media.stream_id,
407                                "Disconnected stream {:?} was not mapped to a stream name", media.stream_id
408                            );
409                        }
410                    }
411                }
412
413                MediaNotificationContent::Metadata { data } => {
414                    let stream_key = match self.stream_id_to_name_map.get(&media.stream_id) {
415                        Some(key) => key,
416                        None => return,
417                    };
418
419                    let metadata = hash_map_to_stream_metadata(data);
420                    let rtmp_media = RtmpEndpointMediaMessage {
421                        stream_key: stream_key.clone(),
422                        data: RtmpEndpointMediaData::NewStreamMetaData { metadata },
423                    };
424
425                    let _ = self.media_channel.send(rtmp_media);
426                }
427
428                MediaNotificationContent::Video {
429                    is_keyframe,
430                    is_sequence_header,
431                    codec,
432                    timestamp,
433                    data,
434                } => {
435                    let stream_key = match self.stream_id_to_name_map.get(&media.stream_id) {
436                        Some(key) => key,
437                        None => return,
438                    };
439
440                    let rtmp_media = RtmpEndpointMediaMessage {
441                        stream_key: stream_key.clone(),
442                        data: RtmpEndpointMediaData::NewVideoData {
443                            is_keyframe: *is_keyframe,
444                            is_sequence_header: *is_sequence_header,
445                            codec: codec.clone(),
446                            data: data.clone(),
447                            timestamp: RtmpTimestamp::new(timestamp.dts.as_millis() as u32),
448                            composition_time_offset: timestamp.pts_offset,
449                        },
450                    };
451
452                    let _ = self.media_channel.send(rtmp_media);
453                }
454
455                MediaNotificationContent::Audio {
456                    is_sequence_header,
457                    codec,
458                    timestamp,
459                    data,
460                } => {
461                    let stream_key = match self.stream_id_to_name_map.get(&media.stream_id) {
462                        Some(key) => key,
463                        None => return,
464                    };
465
466                    let rtmp_media = RtmpEndpointMediaMessage {
467                        stream_key: stream_key.clone(),
468                        data: RtmpEndpointMediaData::NewAudioData {
469                            is_sequence_header: *is_sequence_header,
470                            codec: codec.clone(),
471                            data: data.clone(),
472                            timestamp: RtmpTimestamp::new(timestamp.as_millis() as u32),
473                        },
474                    };
475
476                    let _ = self.media_channel.send(rtmp_media);
477                }
478            }
479        }
480    }
481}
482
483impl WorkflowStep for RtmpWatchStep {
484    fn get_status(&self) -> &StepStatus {
485        &self.status
486    }
487
488    fn get_definition(&self) -> &WorkflowStepDefinition {
489        &self.definition
490    }
491
492    fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs) {
493        for notification in inputs.notifications.drain(..) {
494            let future_result = match notification.downcast::<RtmpWatchStepFutureResult>() {
495                Ok(x) => *x,
496                Err(_) => {
497                    error!("Rtmp receive step received a notification that is not an 'RtmpReceiveFutureResult' type");
498                    self.status = StepStatus::Error {
499                        message: "Received invalid future result type".to_string(),
500                    };
501
502                    return;
503                }
504            };
505
506            match future_result {
507                RtmpWatchStepFutureResult::RtmpEndpointGone => {
508                    error!("Rtmp endpoint gone, shutting step down");
509                    self.status = StepStatus::Error {
510                        message: "Rtmp endpoint gone".to_string(),
511                    };
512
513                    return;
514                }
515
516                RtmpWatchStepFutureResult::ReactorManagerGone => {
517                    error!("Reactor manager gone");
518                    self.status = StepStatus::Error {
519                        message: "Reactor manager gone".to_string(),
520                    };
521
522                    return;
523                }
524
525                RtmpWatchStepFutureResult::ReactorGone => {
526                    if let Some(reactor_name) = &self.reactor_name {
527                        error!("The {} reactor is gone", reactor_name);
528                    } else {
529                        error!("Received notice that the reactor is gone, but this step doesn't use one");
530                    }
531
532                    self.status = StepStatus::Error {
533                        message: "Reactor gone".to_string(),
534                    };
535
536                    return;
537                }
538
539                RtmpWatchStepFutureResult::RtmpWatchNotificationReceived(
540                    notification,
541                    receiver,
542                ) => {
543                    outputs
544                        .futures
545                        .push(wait_for_endpoint_notification(receiver).boxed());
546
547                    self.handle_endpoint_notification(notification, outputs);
548                }
549
550                RtmpWatchStepFutureResult::ReactorWorkflowResponse {
551                    is_valid,
552                    validation_channel,
553                    reactor_update_channel,
554                } => {
555                    if is_valid {
556                        let _ = validation_channel.send(ValidationResponse::Approve {
557                            reactor_update_channel,
558                        });
559                    } else {
560                        let _ = validation_channel.send(ValidationResponse::Reject);
561                    }
562                }
563
564                RtmpWatchStepFutureResult::ReactorUpdateReceived {
565                    stream_name,
566                    update,
567                    reactor_update_channel,
568                    cancellation_channel,
569                } => {
570                    if update.is_valid {
571                        // No action needed as this is still a valid stream name
572                        let future = wait_for_reactor_update(
573                            stream_name,
574                            reactor_update_channel,
575                            cancellation_channel,
576                        );
577
578                        outputs.futures.push(future.boxed());
579                    } else {
580                        info!(
581                            stream_key = %stream_name,
582                            "Received update that stream {} is no longer tied to a workflow",
583                            stream_name
584                        );
585
586                        // TODO: Need some way to disconnect watchers
587                    }
588                }
589
590                RtmpWatchStepFutureResult::ReactorReceiverCanceled { stream_name } => {
591                    if let Some(_) = self.stream_watchers.remove(&stream_name) {
592                        info!(
593                            "Stream {}'s reactor updating has been cancelled",
594                            stream_name
595                        );
596                    }
597                }
598            }
599        }
600
601        for media in inputs.media.drain(..) {
602            self.handle_media(media, outputs);
603        }
604    }
605
606    fn shutdown(&mut self) {
607        self.status = StepStatus::Shutdown;
608        let _ = self
609            .rtmp_endpoint_sender
610            .send(RtmpEndpointRequest::RemoveRegistration {
611                registration_type: RegistrationType::Watcher,
612                port: self.port,
613                rtmp_app: self.rtmp_app.clone(),
614                rtmp_stream_key: self.stream_key.clone(),
615            });
616    }
617}
618
619async fn wait_for_endpoint_notification(
620    mut receiver: UnboundedReceiver<RtmpEndpointWatcherNotification>,
621) -> Box<dyn StepFutureResult> {
622    let future_result = match receiver.recv().await {
623        Some(message) => {
624            RtmpWatchStepFutureResult::RtmpWatchNotificationReceived(message, receiver)
625        }
626        None => RtmpWatchStepFutureResult::RtmpEndpointGone,
627    };
628
629    Box::new(future_result)
630}
631
632async fn wait_for_reactor_response(
633    mut receiver: UnboundedReceiver<ReactorWorkflowUpdate>,
634    response_channel: Sender<ValidationResponse>,
635) -> Box<dyn StepFutureResult> {
636    let result = match receiver.recv().await {
637        Some(result) => result.is_valid,
638        None => false, // Treat the channel being closed as no workflow
639    };
640
641    let result = RtmpWatchStepFutureResult::ReactorWorkflowResponse {
642        is_valid: result,
643        validation_channel: response_channel,
644        reactor_update_channel: receiver,
645    };
646
647    Box::new(result)
648}
649
650async fn wait_for_reactor_update(
651    stream_name: String,
652    mut update_receiver: UnboundedReceiver<ReactorWorkflowUpdate>,
653    mut cancellation_receiver: UnboundedReceiver<()>,
654) -> Box<dyn StepFutureResult> {
655    let result = tokio::select! {
656        update = update_receiver.recv() => {
657            match update {
658                Some(update) => RtmpWatchStepFutureResult::ReactorUpdateReceived{
659                    stream_name,
660                    update,
661                    reactor_update_channel: update_receiver,
662                    cancellation_channel: cancellation_receiver,
663                },
664
665                None => RtmpWatchStepFutureResult::ReactorGone,
666            }
667        }
668
669        _ = cancellation_receiver.recv() => RtmpWatchStepFutureResult::ReactorReceiverCanceled {
670            stream_name,
671        }
672    };
673
674    Box::new(result)
675}
676
677async fn notify_on_reactor_manager_close(
678    sender: UnboundedSender<ReactorManagerRequest>,
679) -> Box<dyn StepFutureResult> {
680    sender.closed().await;
681    Box::new(RtmpWatchStepFutureResult::ReactorManagerGone)
682}