mmids_core/workflows/steps/ffmpeg_pull/
mod.rs

1//! This workflow step utilizes ffmpeg to read video from an external source.  The external source
2//! can be a remote RTMP server or a file on the file system.  If ffmpeg closes (such as when the
3//! video file has been fully streamed) then the ffmpeg will restart until the workflow is
4//! removed.
5//!
6//! Media packets that come in from previous steps are ignored.
7
8use crate::endpoints::ffmpeg::{
9    AudioTranscodeParams, FfmpegEndpointNotification, FfmpegEndpointRequest, FfmpegParams,
10    TargetParams, VideoTranscodeParams,
11};
12use crate::endpoints::rtmp_server::{
13    IpRestriction, RegistrationType, RtmpEndpointPublisherMessage, RtmpEndpointRequest,
14    StreamKeyRegistration,
15};
16use crate::workflows::definitions::WorkflowStepDefinition;
17use crate::workflows::steps::factory::StepGenerator;
18use crate::workflows::steps::{
19    StepCreationResult, StepFutureResult, StepInputs, StepOutputs, StepStatus, WorkflowStep,
20};
21use crate::workflows::{MediaNotification, MediaNotificationContent};
22use crate::{StreamId, VideoTimestamp};
23use futures::FutureExt;
24use std::time::Duration;
25use thiserror::Error;
26use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
27use tracing::{error, info};
28use uuid::Uuid;
29
30pub const LOCATION: &'static str = "location";
31pub const STREAM_NAME: &'static str = "stream_name";
32
33/// Generates new instances of the ffmpeg pull workflow step based on specified step definitions.
34pub struct FfmpegPullStepGenerator {
35    rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
36    ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
37}
38
39struct FfmpegPullStep {
40    definition: WorkflowStepDefinition,
41    ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
42    rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
43    status: StepStatus,
44    rtmp_app: String,
45    pull_location: String,
46    stream_name: String,
47    ffmpeg_id: Option<Uuid>,
48    active_stream_id: Option<StreamId>,
49}
50
51enum FutureResult {
52    RtmpEndpointGone,
53    FfmpegEndpointGone,
54    RtmpEndpointResponseReceived(
55        RtmpEndpointPublisherMessage,
56        UnboundedReceiver<RtmpEndpointPublisherMessage>,
57    ),
58    FfmpegNotificationReceived(
59        FfmpegEndpointNotification,
60        UnboundedReceiver<FfmpegEndpointNotification>,
61    ),
62}
63
64impl StepFutureResult for FutureResult {}
65
66#[derive(Error, Debug)]
67enum StepStartupError {
68    #[error("No {} parameter specified", LOCATION)]
69    NoLocationSpecified,
70
71    #[error("No {} parameter specified", STREAM_NAME)]
72    NoStreamNameSpecified,
73}
74
75impl FfmpegPullStepGenerator {
76    pub fn new(
77        rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
78        ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
79    ) -> Self {
80        FfmpegPullStepGenerator {
81            rtmp_endpoint,
82            ffmpeg_endpoint,
83        }
84    }
85}
86
87impl StepGenerator for FfmpegPullStepGenerator {
88    fn generate(&self, definition: WorkflowStepDefinition) -> StepCreationResult {
89        let location = match definition.parameters.get(LOCATION) {
90            Some(Some(value)) => value.clone(),
91            _ => return Err(Box::new(StepStartupError::NoLocationSpecified)),
92        };
93
94        let stream_name = match definition.parameters.get(STREAM_NAME) {
95            Some(Some(value)) => value.clone(),
96            _ => return Err(Box::new(StepStartupError::NoStreamNameSpecified)),
97        };
98
99        let step = FfmpegPullStep {
100            definition: definition.clone(),
101            status: StepStatus::Created,
102            rtmp_app: format!("ffmpeg-pull-{}", definition.get_id()),
103            ffmpeg_endpoint: self.ffmpeg_endpoint.clone(),
104            rtmp_endpoint: self.rtmp_endpoint.clone(),
105            pull_location: location,
106            stream_name: stream_name.clone(),
107            ffmpeg_id: None,
108            active_stream_id: None,
109        };
110
111        let (sender, receiver) = unbounded_channel();
112        let _ = self
113            .rtmp_endpoint
114            .send(RtmpEndpointRequest::ListenForPublishers {
115                port: 1935,
116                rtmp_app: step.rtmp_app.clone(),
117                rtmp_stream_key: StreamKeyRegistration::Exact(stream_name),
118                stream_id: None,
119                message_channel: sender,
120                ip_restrictions: IpRestriction::None,
121                use_tls: false,
122                requires_registrant_approval: false,
123            });
124
125        let futures = vec![
126            notify_rtmp_endpoint_gone(self.rtmp_endpoint.clone()).boxed(),
127            notify_ffmpeg_endpoint_gone(self.ffmpeg_endpoint.clone()).boxed(),
128            wait_for_rtmp_notification(receiver).boxed(),
129        ];
130
131        Ok((Box::new(step), futures))
132    }
133}
134
135impl FfmpegPullStep {
136    fn handle_resolved_future(&mut self, result: FutureResult, outputs: &mut StepOutputs) {
137        match result {
138            FutureResult::FfmpegEndpointGone => {
139                error!("Ffmpeg endpoint is gone");
140                self.status = StepStatus::Error {
141                    message: "Ffmpeg endpoint is gone".to_string(),
142                };
143                self.stop_ffmpeg();
144            }
145
146            FutureResult::RtmpEndpointGone => {
147                error!("Rtmp endpoint gone");
148                self.status = StepStatus::Error {
149                    message: "Rtmp endpoint gone".to_string(),
150                };
151                self.stop_ffmpeg();
152            }
153
154            FutureResult::RtmpEndpointResponseReceived(response, receiver) => {
155                outputs
156                    .futures
157                    .push(wait_for_rtmp_notification(receiver).boxed());
158
159                self.handle_rtmp_notification(outputs, response);
160            }
161
162            FutureResult::FfmpegNotificationReceived(notification, receiver) => {
163                self.handle_ffmpeg_notification(outputs, notification, receiver);
164            }
165        }
166    }
167
168    fn handle_ffmpeg_notification(
169        &mut self,
170        outputs: &mut StepOutputs,
171        message: FfmpegEndpointNotification,
172        receiver: UnboundedReceiver<FfmpegEndpointNotification>,
173    ) {
174        match message {
175            FfmpegEndpointNotification::FfmpegFailedToStart { cause } => {
176                error!("Ffmpeg failed to start: {:?}", cause);
177                self.status = StepStatus::Error {
178                    message: format!("Ffmpeg failed to start: {:?}", cause),
179                };
180            }
181
182            FfmpegEndpointNotification::FfmpegStarted => {
183                info!("Ffmpeg started");
184                outputs
185                    .futures
186                    .push(wait_for_ffmpeg_notification(receiver).boxed());
187            }
188
189            FfmpegEndpointNotification::FfmpegStopped => {
190                info!("Ffmpeg stopped");
191            }
192        }
193    }
194
195    fn handle_rtmp_notification(
196        &mut self,
197        outputs: &mut StepOutputs,
198        message: RtmpEndpointPublisherMessage,
199    ) {
200        match message {
201            RtmpEndpointPublisherMessage::PublisherRegistrationFailed => {
202                error!("Publisher registration failed");
203                self.status = StepStatus::Error {
204                    message: "Publisher registration failed".to_string(),
205                };
206            }
207
208            RtmpEndpointPublisherMessage::PublisherRegistrationSuccessful => {
209                info!("Publisher registration successful");
210                self.status = StepStatus::Active;
211                self.start_ffmpeg(outputs);
212            }
213
214            RtmpEndpointPublisherMessage::NewPublisherConnected {
215                stream_id,
216                stream_key,
217                connection_id,
218                reactor_update_channel: _,
219            } => {
220                info!(
221                    stream_id = ?stream_id,
222                    connection_id = ?connection_id,
223                    stream_key = %stream_key,
224                    "New RTMP publisher seen: {:?}, {:?}, {:?}", stream_id, connection_id, stream_key
225                );
226
227                if stream_key != self.stream_name {
228                    error!(
229                        stream_name = %self.stream_name,
230                        stream_key = %stream_key,
231                        "Expected publisher to have a stream name of {} but instead it was {}", self.stream_name, stream_key
232                    );
233
234                    self.status = StepStatus::Error {
235                        message: format!(
236                            "Expected publisher to have a stream name of {} but instead it was {}",
237                            self.stream_name, stream_key
238                        ),
239                    };
240
241                    self.stop_ffmpeg();
242                }
243
244                self.active_stream_id = Some(stream_id.clone());
245                outputs.media.push(MediaNotification {
246                    stream_id,
247                    content: MediaNotificationContent::NewIncomingStream {
248                        stream_name: self.stream_name.clone(),
249                    },
250                });
251            }
252
253            RtmpEndpointPublisherMessage::PublishingStopped { connection_id: _ } => {
254                info!("RTMP publisher has stopped");
255                if let Some(stream_id) = &self.active_stream_id {
256                    outputs.media.push(MediaNotification {
257                        stream_id: stream_id.clone(),
258                        content: MediaNotificationContent::StreamDisconnected,
259                    });
260                }
261            }
262
263            RtmpEndpointPublisherMessage::StreamMetadataChanged {
264                publisher: _,
265                metadata,
266            } => {
267                if let Some(stream_id) = &self.active_stream_id {
268                    outputs.media.push(MediaNotification {
269                        stream_id: stream_id.clone(),
270                        content: MediaNotificationContent::Metadata {
271                            data: crate::utils::stream_metadata_to_hash_map(metadata),
272                        },
273                    });
274                } else {
275                    error!("Received stream metadata without an active stream id");
276                    self.stop_ffmpeg();
277                    self.status = StepStatus::Error {
278                        message: "Received stream metadata without an active stream id".to_string(),
279                    };
280                }
281            }
282
283            RtmpEndpointPublisherMessage::NewVideoData {
284                publisher: _,
285                data,
286                is_keyframe,
287                is_sequence_header,
288                timestamp,
289                codec,
290                composition_time_offset,
291            } => {
292                if let Some(stream_id) = &self.active_stream_id {
293                    outputs.media.push(MediaNotification {
294                        stream_id: stream_id.clone(),
295                        content: MediaNotificationContent::Video {
296                            codec,
297                            timestamp: VideoTimestamp::from_rtmp_data(
298                                timestamp,
299                                composition_time_offset,
300                            ),
301                            is_keyframe,
302                            is_sequence_header,
303                            data,
304                        },
305                    });
306                } else {
307                    error!("Received video data without an active stream id");
308                    self.stop_ffmpeg();
309                    self.status = StepStatus::Error {
310                        message: "Received video data without an active stream id".to_string(),
311                    };
312                }
313            }
314
315            RtmpEndpointPublisherMessage::NewAudioData {
316                publisher: _,
317                data,
318                is_sequence_header,
319                timestamp,
320                codec,
321            } => {
322                if let Some(stream_id) = &self.active_stream_id {
323                    outputs.media.push(MediaNotification {
324                        stream_id: stream_id.clone(),
325                        content: MediaNotificationContent::Audio {
326                            codec,
327                            timestamp: Duration::from_millis(timestamp.value as u64),
328                            is_sequence_header,
329                            data,
330                        },
331                    });
332                } else {
333                    error!("Received audio data without an active stream id");
334                    self.stop_ffmpeg();
335                    self.status = StepStatus::Error {
336                        message: "Received audio data without an active stream id".to_string(),
337                    };
338                }
339            }
340
341            RtmpEndpointPublisherMessage::PublisherRequiringApproval { .. } => {
342                error!("Publisher approval requested but publishers should be auto-approved");
343                self.status = StepStatus::Error {
344                    message: "Publisher approval requested but publishers should be auto-approved"
345                        .to_string(),
346                };
347            }
348        }
349    }
350
351    fn start_ffmpeg(&mut self, outputs: &mut StepOutputs) {
352        if self.ffmpeg_id.is_none() {
353            info!("Starting ffmpeg");
354            let id = Uuid::new_v4();
355            let (sender, receiver) = unbounded_channel();
356            let _ = self
357                .ffmpeg_endpoint
358                .send(FfmpegEndpointRequest::StartFfmpeg {
359                    id: id.clone(),
360                    notification_channel: sender,
361                    params: FfmpegParams {
362                        read_in_real_time: true,
363                        input: self.pull_location.clone(),
364                        video_transcode: VideoTranscodeParams::Copy,
365                        audio_transcode: AudioTranscodeParams::Copy,
366                        scale: None,
367                        bitrate_in_kbps: None,
368                        target: TargetParams::Rtmp {
369                            url: format!("rtmp://localhost/{}/{}", self.rtmp_app, self.stream_name),
370                        },
371                    },
372                });
373
374            outputs
375                .futures
376                .push(wait_for_ffmpeg_notification(receiver).boxed());
377        }
378    }
379
380    fn stop_ffmpeg(&mut self) {
381        if let Some(id) = &self.ffmpeg_id {
382            let _ = self
383                .ffmpeg_endpoint
384                .send(FfmpegEndpointRequest::StopFfmpeg { id: id.clone() });
385        }
386
387        self.ffmpeg_id = None;
388    }
389}
390
391impl WorkflowStep for FfmpegPullStep {
392    fn get_status(&self) -> &StepStatus {
393        &self.status
394    }
395
396    fn get_definition(&self) -> &WorkflowStepDefinition {
397        &self.definition
398    }
399
400    fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs) {
401        for result in inputs.notifications.drain(..) {
402            if let Ok(result) = result.downcast::<FutureResult>() {
403                self.handle_resolved_future(*result, outputs);
404            }
405        }
406    }
407
408    fn shutdown(&mut self) {
409        self.status = StepStatus::Shutdown;
410        self.stop_ffmpeg();
411
412        let _ = self
413            .rtmp_endpoint
414            .send(RtmpEndpointRequest::RemoveRegistration {
415                registration_type: RegistrationType::Publisher,
416                port: 1935,
417                rtmp_app: self.rtmp_app.clone(),
418                rtmp_stream_key: StreamKeyRegistration::Exact(self.stream_name.clone()),
419            });
420    }
421}
422
423async fn notify_rtmp_endpoint_gone(
424    endpoint: UnboundedSender<RtmpEndpointRequest>,
425) -> Box<dyn StepFutureResult> {
426    endpoint.closed().await;
427
428    Box::new(FutureResult::RtmpEndpointGone)
429}
430
431async fn notify_ffmpeg_endpoint_gone(
432    endpoint: UnboundedSender<FfmpegEndpointRequest>,
433) -> Box<dyn StepFutureResult> {
434    endpoint.closed().await;
435
436    Box::new(FutureResult::FfmpegEndpointGone)
437}
438
439async fn wait_for_rtmp_notification(
440    mut receiver: UnboundedReceiver<RtmpEndpointPublisherMessage>,
441) -> Box<dyn StepFutureResult> {
442    let result = match receiver.recv().await {
443        Some(msg) => FutureResult::RtmpEndpointResponseReceived(msg, receiver),
444        None => FutureResult::RtmpEndpointGone,
445    };
446
447    Box::new(result)
448}
449
450async fn wait_for_ffmpeg_notification(
451    mut receiver: UnboundedReceiver<FfmpegEndpointNotification>,
452) -> Box<dyn StepFutureResult> {
453    let result = match receiver.recv().await {
454        Some(msg) => FutureResult::FfmpegNotificationReceived(msg, receiver),
455        None => FutureResult::FfmpegEndpointGone,
456    };
457
458    Box::new(result)
459}