1#[cfg(test)]
16mod tests;
17
18use crate::endpoints::rtmp_server::{
19 IpRestriction, RegistrationType, RtmpEndpointMediaData, RtmpEndpointMediaMessage,
20 RtmpEndpointRequest, RtmpEndpointWatcherNotification, StreamKeyRegistration,
21 ValidationResponse,
22};
23use crate::net::{IpAddress, IpAddressParseError};
24use crate::reactors::manager::ReactorManagerRequest;
25use crate::reactors::ReactorWorkflowUpdate;
26use crate::utils::hash_map_to_stream_metadata;
27use crate::workflows::definitions::WorkflowStepDefinition;
28use crate::workflows::steps::factory::StepGenerator;
29use crate::workflows::steps::{
30 StepCreationResult, StepFutureResult, StepInputs, StepOutputs, StepStatus, WorkflowStep,
31};
32use crate::workflows::{MediaNotification, MediaNotificationContent};
33use crate::StreamId;
34use futures::FutureExt;
35use rml_rtmp::time::RtmpTimestamp;
36use std::collections::HashMap;
37use thiserror::Error as ThisError;
38use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
39use tokio::sync::oneshot::Sender;
40use tracing::{error, info, warn};
41
42pub const PORT_PROPERTY_NAME: &'static str = "port";
43pub const APP_PROPERTY_NAME: &'static str = "rtmp_app";
44pub const STREAM_KEY_PROPERTY_NAME: &'static str = "stream_key";
45pub const IP_ALLOW_PROPERTY_NAME: &'static str = "allow_ips";
46pub const IP_DENY_PROPERTY_NAME: &'static str = "deny_ips";
47pub const RTMPS_FLAG: &'static str = "rtmps";
48pub const REACTOR_NAME: &'static str = "reactor";
49
50pub struct RtmpWatchStepGenerator {
52 rtmp_endpoint_sender: UnboundedSender<RtmpEndpointRequest>,
53 reactor_manager: UnboundedSender<ReactorManagerRequest>,
54}
55
56struct StreamWatchers {
57 _reactor_cancel_channel: Option<UnboundedSender<()>>,
61}
62
63struct RtmpWatchStep {
64 definition: WorkflowStepDefinition,
65 port: u16,
66 rtmp_app: String,
67 stream_key: StreamKeyRegistration,
68 reactor_name: Option<String>,
69 status: StepStatus,
70 rtmp_endpoint_sender: UnboundedSender<RtmpEndpointRequest>,
71 reactor_manager: UnboundedSender<ReactorManagerRequest>,
72 media_channel: UnboundedSender<RtmpEndpointMediaMessage>,
73 stream_id_to_name_map: HashMap<StreamId, String>,
74 stream_watchers: HashMap<String, StreamWatchers>,
75}
76
77impl StepFutureResult for RtmpWatchStepFutureResult {}
78
79enum RtmpWatchStepFutureResult {
80 RtmpEndpointGone,
81 ReactorManagerGone,
82 ReactorGone,
83 RtmpWatchNotificationReceived(
84 RtmpEndpointWatcherNotification,
85 UnboundedReceiver<RtmpEndpointWatcherNotification>,
86 ),
87
88 ReactorWorkflowResponse {
89 is_valid: bool,
90 validation_channel: Sender<ValidationResponse>,
91 reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
92 },
93
94 ReactorUpdateReceived {
95 stream_name: String,
96 update: ReactorWorkflowUpdate,
97 reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
98 cancellation_channel: UnboundedReceiver<()>,
99 },
100
101 ReactorReceiverCanceled {
102 stream_name: String,
103 },
104}
105
106#[derive(ThisError, Debug)]
107enum StepStartupError {
108 #[error(
109 "No RTMP app specified. A non-empty parameter of '{}' is required",
110 PORT_PROPERTY_NAME
111 )]
112 NoRtmpAppSpecified,
113
114 #[error(
115 "No stream key specified. A non-empty parameter of '{}' is required",
116 APP_PROPERTY_NAME
117 )]
118 NoStreamKeySpecified,
119
120 #[error(
121 "Invalid port value of '{0}' specified. A number from 0 to 65535 should be specified"
122 )]
123 InvalidPortSpecified(String),
124
125 #[error("Failed to parse ip address")]
126 InvalidIpAddressSpecified(#[from] IpAddressParseError),
127
128 #[error(
129 "Both {} and {} were specified, but only one is allowed",
130 IP_ALLOW_PROPERTY_NAME,
131 IP_DENY_PROPERTY_NAME
132 )]
133 BothDenyAndAllowIpRestrictionsSpecified,
134}
135
136impl RtmpWatchStepGenerator {
137 pub fn new(
138 rtmp_endpoint_sender: UnboundedSender<RtmpEndpointRequest>,
139 reactor_manager: UnboundedSender<ReactorManagerRequest>,
140 ) -> Self {
141 RtmpWatchStepGenerator {
142 rtmp_endpoint_sender,
143 reactor_manager,
144 }
145 }
146}
147
148impl StepGenerator for RtmpWatchStepGenerator {
149 fn generate(&self, definition: WorkflowStepDefinition) -> StepCreationResult {
150 let use_rtmps = match definition.parameters.get(RTMPS_FLAG) {
151 Some(_) => true,
152 None => false,
153 };
154
155 let port = match definition.parameters.get(PORT_PROPERTY_NAME) {
156 Some(Some(value)) => match value.parse::<u16>() {
157 Ok(num) => num,
158 Err(_) => {
159 return Err(Box::new(StepStartupError::InvalidPortSpecified(
160 value.clone(),
161 )));
162 }
163 },
164
165 _ => {
166 if use_rtmps {
167 443
168 } else {
169 1935
170 }
171 }
172 };
173
174 let app = match definition.parameters.get(APP_PROPERTY_NAME) {
175 Some(Some(x)) => x.trim(),
176 _ => return Err(Box::new(StepStartupError::NoRtmpAppSpecified)),
177 };
178
179 let stream_key = match definition.parameters.get(STREAM_KEY_PROPERTY_NAME) {
180 Some(Some(x)) => x.trim(),
181 _ => return Err(Box::new(StepStartupError::NoStreamKeySpecified)),
182 };
183
184 let stream_key = if stream_key == "*" {
185 StreamKeyRegistration::Any
186 } else {
187 StreamKeyRegistration::Exact(stream_key.to_string())
188 };
189
190 let allowed_ips = match definition.parameters.get(IP_ALLOW_PROPERTY_NAME) {
191 Some(Some(value)) => IpAddress::parse_comma_delimited_list(Some(value))?,
192 _ => Vec::new(),
193 };
194
195 let denied_ips = match definition.parameters.get(IP_DENY_PROPERTY_NAME) {
196 Some(Some(value)) => IpAddress::parse_comma_delimited_list(Some(value))?,
197 _ => Vec::new(),
198 };
199
200 let ip_restriction = match (allowed_ips.len() > 0, denied_ips.len() > 0) {
201 (true, true) => {
202 return Err(Box::new(
203 StepStartupError::BothDenyAndAllowIpRestrictionsSpecified,
204 ));
205 }
206 (true, false) => IpRestriction::Allow(allowed_ips),
207 (false, true) => IpRestriction::Deny(denied_ips),
208 (false, false) => IpRestriction::None,
209 };
210
211 let reactor_name = match definition.parameters.get(REACTOR_NAME) {
212 Some(Some(value)) => Some(value.clone()),
213 _ => None,
214 };
215
216 let (media_sender, media_receiver) = unbounded_channel();
217
218 let step = RtmpWatchStep {
219 definition: definition.clone(),
220 status: StepStatus::Created,
221 port,
222 rtmp_app: app.to_string(),
223 rtmp_endpoint_sender: self.rtmp_endpoint_sender.clone(),
224 reactor_manager: self.reactor_manager.clone(),
225 media_channel: media_sender,
226 stream_key,
227 stream_id_to_name_map: HashMap::new(),
228 reactor_name,
229 stream_watchers: HashMap::new(),
230 };
231
232 let (notification_sender, notification_receiver) = unbounded_channel();
233 let _ = step
234 .rtmp_endpoint_sender
235 .send(RtmpEndpointRequest::ListenForWatchers {
236 port: step.port,
237 rtmp_app: step.rtmp_app.clone(),
238 rtmp_stream_key: step.stream_key.clone(),
239 media_channel: media_receiver,
240 notification_channel: notification_sender,
241 ip_restrictions: ip_restriction,
242 use_tls: use_rtmps,
243 requires_registrant_approval: step.reactor_name.is_some(),
244 });
245
246 Ok((
247 Box::new(step),
248 vec![
249 wait_for_endpoint_notification(notification_receiver).boxed(),
250 notify_on_reactor_manager_close(self.reactor_manager.clone()).boxed(),
251 ],
252 ))
253 }
254}
255
256impl RtmpWatchStep {
257 fn handle_endpoint_notification(
258 &mut self,
259 notification: RtmpEndpointWatcherNotification,
260 outputs: &mut StepOutputs,
261 ) {
262 match notification {
263 RtmpEndpointWatcherNotification::WatcherRegistrationFailed => {
264 error!("Registration for RTMP watchers was denied");
265 self.status = StepStatus::Error {
266 message: "Registration for watchers failed".to_string(),
267 };
268 }
269
270 RtmpEndpointWatcherNotification::WatcherRegistrationSuccessful => {
271 info!("Registration for RTMP watchers was accepted");
272 self.status = StepStatus::Active;
273 }
274
275 RtmpEndpointWatcherNotification::StreamKeyBecameActive {
276 stream_key,
277 reactor_update_channel,
278 } => {
279 info!(
280 stream_key = %stream_key,
281 "At least one watcher became active for stream key '{}'", stream_key
282 );
283
284 let cancellation_channel =
285 if let Some(reactor_update_channel) = reactor_update_channel {
286 let (cancellation_sender, cancellation_receiver) = unbounded_channel();
287 let future = wait_for_reactor_update(
288 stream_key.clone(),
289 reactor_update_channel,
290 cancellation_receiver,
291 )
292 .boxed();
293
294 outputs.futures.push(future);
295 Some(cancellation_sender)
296 } else {
297 None
298 };
299
300 self.stream_watchers.insert(
301 stream_key,
302 StreamWatchers {
303 _reactor_cancel_channel: cancellation_channel,
304 },
305 );
306 }
307
308 RtmpEndpointWatcherNotification::StreamKeyBecameInactive { stream_key } => {
309 info!(
310 stream_key = %stream_key,
311 "All watchers left stream key '{}'", stream_key
312 );
313
314 self.stream_watchers.remove(&stream_key);
315 }
316
317 RtmpEndpointWatcherNotification::WatcherRequiringApproval {
318 connection_id,
319 stream_key,
320 response_channel,
321 } => {
322 if let Some(reactor) = &self.reactor_name {
323 let (sender, receiver) = unbounded_channel();
324 let _ = self.reactor_manager.send(
325 ReactorManagerRequest::CreateWorkflowForStreamName {
326 reactor_name: reactor.clone(),
327 stream_name: stream_key,
328 response_channel: sender,
329 },
330 );
331
332 outputs
333 .futures
334 .push(wait_for_reactor_response(receiver, response_channel).boxed());
335 } else {
336 error!(
337 connection_id = %connection_id,
338 stream_key = %stream_key,
339 "Watcher requires approval for stream key {} but no reactor name was set",
340 stream_key
341 );
342
343 let _ = response_channel.send(ValidationResponse::Reject);
344 }
345 }
346 }
347 }
348
349 fn handle_media(&mut self, media: MediaNotification, outputs: &mut StepOutputs) {
350 outputs.media.push(media.clone());
351
352 if self.status == StepStatus::Active {
353 match &media.content {
354 MediaNotificationContent::NewIncomingStream { stream_name } => {
355 let stream_name = match &self.stream_key {
359 StreamKeyRegistration::Any => stream_name,
360 StreamKeyRegistration::Exact(configured_stream_name) => {
361 configured_stream_name
362 }
363 };
364
365 info!(
366 stream_id = ?media.stream_id,
367 stream_name = %stream_name,
368 "New incoming stream notification found for stream id {:?} and stream name '{}", media.stream_id, stream_name
369 );
370
371 match self.stream_id_to_name_map.get(&media.stream_id) {
372 None => (),
373 Some(current_stream_name) => {
374 if current_stream_name == stream_name {
375 warn!(
376 stream_id = ?media.stream_id,
377 stream_name = %stream_name,
378 "New incoming stream notification for stream id {:?} is already mapped \
379 to this same stream name.", media.stream_id
380 );
381 } else {
382 warn!(
383 stream_id = ?media.stream_id,
384 new_stream_name = %stream_name,
385 active_stream_name = %current_stream_name,
386 "New incoming stream notification for stream id {:?} is already mapped \
387 to the stream name '{}'", media.stream_id, current_stream_name
388 );
389 }
390 }
391 }
392
393 self.stream_id_to_name_map
394 .insert(media.stream_id.clone(), stream_name.clone());
395 }
396
397 MediaNotificationContent::StreamDisconnected => {
398 info!(
399 stream_id = ?media.stream_id,
400 "Stream disconnected notification received for stream id {:?}", media.stream_id
401 );
402 match self.stream_id_to_name_map.remove(&media.stream_id) {
403 Some(_) => (),
404 None => {
405 warn!(
406 stream_id = ?media.stream_id,
407 "Disconnected stream {:?} was not mapped to a stream name", media.stream_id
408 );
409 }
410 }
411 }
412
413 MediaNotificationContent::Metadata { data } => {
414 let stream_key = match self.stream_id_to_name_map.get(&media.stream_id) {
415 Some(key) => key,
416 None => return,
417 };
418
419 let metadata = hash_map_to_stream_metadata(data);
420 let rtmp_media = RtmpEndpointMediaMessage {
421 stream_key: stream_key.clone(),
422 data: RtmpEndpointMediaData::NewStreamMetaData { metadata },
423 };
424
425 let _ = self.media_channel.send(rtmp_media);
426 }
427
428 MediaNotificationContent::Video {
429 is_keyframe,
430 is_sequence_header,
431 codec,
432 timestamp,
433 data,
434 } => {
435 let stream_key = match self.stream_id_to_name_map.get(&media.stream_id) {
436 Some(key) => key,
437 None => return,
438 };
439
440 let rtmp_media = RtmpEndpointMediaMessage {
441 stream_key: stream_key.clone(),
442 data: RtmpEndpointMediaData::NewVideoData {
443 is_keyframe: *is_keyframe,
444 is_sequence_header: *is_sequence_header,
445 codec: codec.clone(),
446 data: data.clone(),
447 timestamp: RtmpTimestamp::new(timestamp.dts.as_millis() as u32),
448 composition_time_offset: timestamp.pts_offset,
449 },
450 };
451
452 let _ = self.media_channel.send(rtmp_media);
453 }
454
455 MediaNotificationContent::Audio {
456 is_sequence_header,
457 codec,
458 timestamp,
459 data,
460 } => {
461 let stream_key = match self.stream_id_to_name_map.get(&media.stream_id) {
462 Some(key) => key,
463 None => return,
464 };
465
466 let rtmp_media = RtmpEndpointMediaMessage {
467 stream_key: stream_key.clone(),
468 data: RtmpEndpointMediaData::NewAudioData {
469 is_sequence_header: *is_sequence_header,
470 codec: codec.clone(),
471 data: data.clone(),
472 timestamp: RtmpTimestamp::new(timestamp.as_millis() as u32),
473 },
474 };
475
476 let _ = self.media_channel.send(rtmp_media);
477 }
478 }
479 }
480 }
481}
482
483impl WorkflowStep for RtmpWatchStep {
484 fn get_status(&self) -> &StepStatus {
485 &self.status
486 }
487
488 fn get_definition(&self) -> &WorkflowStepDefinition {
489 &self.definition
490 }
491
492 fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs) {
493 for notification in inputs.notifications.drain(..) {
494 let future_result = match notification.downcast::<RtmpWatchStepFutureResult>() {
495 Ok(x) => *x,
496 Err(_) => {
497 error!("Rtmp receive step received a notification that is not an 'RtmpReceiveFutureResult' type");
498 self.status = StepStatus::Error {
499 message: "Received invalid future result type".to_string(),
500 };
501
502 return;
503 }
504 };
505
506 match future_result {
507 RtmpWatchStepFutureResult::RtmpEndpointGone => {
508 error!("Rtmp endpoint gone, shutting step down");
509 self.status = StepStatus::Error {
510 message: "Rtmp endpoint gone".to_string(),
511 };
512
513 return;
514 }
515
516 RtmpWatchStepFutureResult::ReactorManagerGone => {
517 error!("Reactor manager gone");
518 self.status = StepStatus::Error {
519 message: "Reactor manager gone".to_string(),
520 };
521
522 return;
523 }
524
525 RtmpWatchStepFutureResult::ReactorGone => {
526 if let Some(reactor_name) = &self.reactor_name {
527 error!("The {} reactor is gone", reactor_name);
528 } else {
529 error!("Received notice that the reactor is gone, but this step doesn't use one");
530 }
531
532 self.status = StepStatus::Error {
533 message: "Reactor gone".to_string(),
534 };
535
536 return;
537 }
538
539 RtmpWatchStepFutureResult::RtmpWatchNotificationReceived(
540 notification,
541 receiver,
542 ) => {
543 outputs
544 .futures
545 .push(wait_for_endpoint_notification(receiver).boxed());
546
547 self.handle_endpoint_notification(notification, outputs);
548 }
549
550 RtmpWatchStepFutureResult::ReactorWorkflowResponse {
551 is_valid,
552 validation_channel,
553 reactor_update_channel,
554 } => {
555 if is_valid {
556 let _ = validation_channel.send(ValidationResponse::Approve {
557 reactor_update_channel,
558 });
559 } else {
560 let _ = validation_channel.send(ValidationResponse::Reject);
561 }
562 }
563
564 RtmpWatchStepFutureResult::ReactorUpdateReceived {
565 stream_name,
566 update,
567 reactor_update_channel,
568 cancellation_channel,
569 } => {
570 if update.is_valid {
571 let future = wait_for_reactor_update(
573 stream_name,
574 reactor_update_channel,
575 cancellation_channel,
576 );
577
578 outputs.futures.push(future.boxed());
579 } else {
580 info!(
581 stream_key = %stream_name,
582 "Received update that stream {} is no longer tied to a workflow",
583 stream_name
584 );
585
586 }
588 }
589
590 RtmpWatchStepFutureResult::ReactorReceiverCanceled { stream_name } => {
591 if let Some(_) = self.stream_watchers.remove(&stream_name) {
592 info!(
593 "Stream {}'s reactor updating has been cancelled",
594 stream_name
595 );
596 }
597 }
598 }
599 }
600
601 for media in inputs.media.drain(..) {
602 self.handle_media(media, outputs);
603 }
604 }
605
606 fn shutdown(&mut self) {
607 self.status = StepStatus::Shutdown;
608 let _ = self
609 .rtmp_endpoint_sender
610 .send(RtmpEndpointRequest::RemoveRegistration {
611 registration_type: RegistrationType::Watcher,
612 port: self.port,
613 rtmp_app: self.rtmp_app.clone(),
614 rtmp_stream_key: self.stream_key.clone(),
615 });
616 }
617}
618
619async fn wait_for_endpoint_notification(
620 mut receiver: UnboundedReceiver<RtmpEndpointWatcherNotification>,
621) -> Box<dyn StepFutureResult> {
622 let future_result = match receiver.recv().await {
623 Some(message) => {
624 RtmpWatchStepFutureResult::RtmpWatchNotificationReceived(message, receiver)
625 }
626 None => RtmpWatchStepFutureResult::RtmpEndpointGone,
627 };
628
629 Box::new(future_result)
630}
631
632async fn wait_for_reactor_response(
633 mut receiver: UnboundedReceiver<ReactorWorkflowUpdate>,
634 response_channel: Sender<ValidationResponse>,
635) -> Box<dyn StepFutureResult> {
636 let result = match receiver.recv().await {
637 Some(result) => result.is_valid,
638 None => false, };
640
641 let result = RtmpWatchStepFutureResult::ReactorWorkflowResponse {
642 is_valid: result,
643 validation_channel: response_channel,
644 reactor_update_channel: receiver,
645 };
646
647 Box::new(result)
648}
649
650async fn wait_for_reactor_update(
651 stream_name: String,
652 mut update_receiver: UnboundedReceiver<ReactorWorkflowUpdate>,
653 mut cancellation_receiver: UnboundedReceiver<()>,
654) -> Box<dyn StepFutureResult> {
655 let result = tokio::select! {
656 update = update_receiver.recv() => {
657 match update {
658 Some(update) => RtmpWatchStepFutureResult::ReactorUpdateReceived{
659 stream_name,
660 update,
661 reactor_update_channel: update_receiver,
662 cancellation_channel: cancellation_receiver,
663 },
664
665 None => RtmpWatchStepFutureResult::ReactorGone,
666 }
667 }
668
669 _ = cancellation_receiver.recv() => RtmpWatchStepFutureResult::ReactorReceiverCanceled {
670 stream_name,
671 }
672 };
673
674 Box::new(result)
675}
676
677async fn notify_on_reactor_manager_close(
678 sender: UnboundedSender<ReactorManagerRequest>,
679) -> Box<dyn StepFutureResult> {
680 sender.closed().await;
681 Box::new(RtmpWatchStepFutureResult::ReactorManagerGone)
682}