mmids_core/workflows/steps/ffmpeg_hls/
mod.rs1use crate::endpoints::ffmpeg::{
7    AudioTranscodeParams, FfmpegEndpointRequest, FfmpegParams, TargetParams, VideoTranscodeParams,
8};
9use crate::endpoints::rtmp_server::RtmpEndpointRequest;
10use crate::workflows::definitions::WorkflowStepDefinition;
11use crate::workflows::steps::factory::StepGenerator;
12use crate::workflows::steps::ffmpeg_handler::{FfmpegHandlerGenerator, FfmpegParameterGenerator};
13use crate::workflows::steps::{
14    ExternalStreamReader, StepCreationResult, StepFutureResult, StepInputs, StepOutputs,
15    StepStatus, WorkflowStep,
16};
17use crate::StreamId;
18use futures::FutureExt;
19use thiserror::Error;
20use tokio::sync::mpsc::UnboundedSender;
21use tracing::error;
22
23const PATH: &str = "path";
24const SEGMENT_DURATION: &str = "duration";
25const SEGMENT_COUNT: &str = "count";
26const STREAM_NAME: &str = "stream_name";
27
28pub struct FfmpegHlsStepGenerator {
30    rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
31    ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
32}
33
34struct FfmpegHlsStep {
35    definition: WorkflowStepDefinition,
36    status: StepStatus,
37    stream_reader: ExternalStreamReader,
38    path: String,
39}
40
41enum FutureResult {
42    FfmpegEndpointGone,
43    HlsPathCreated(tokio::io::Result<()>),
44}
45
46impl StepFutureResult for FutureResult {}
47
48#[derive(Error, Debug)]
49enum StepStartupError {
50    #[error("No path specified.  A 'path' is required")]
51    NoPathProvided,
52
53    #[error("Invalid duration of '{0}'.  {} should be a number.", SEGMENT_DURATION)]
54    InvalidSegmentLength(String),
55
56    #[error(
57        "Invalid segment count of '{0}'.  {} should be a positive number",
58        SEGMENT_COUNT
59    )]
60    InvalidSegmentCount(String),
61}
62
63struct ParamGenerator {
64    rtmp_app: String,
65    path: String,
66    segment_duration: u16,
67    segment_count: u16,
68    stream_name: Option<String>,
69}
70
71impl FfmpegHlsStepGenerator {
72    pub fn new(
73        rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
74        ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
75    ) -> Self {
76        FfmpegHlsStepGenerator {
77            rtmp_endpoint,
78            ffmpeg_endpoint,
79        }
80    }
81}
82
83impl StepGenerator for FfmpegHlsStepGenerator {
84    fn generate(&self, definition: WorkflowStepDefinition) -> StepCreationResult {
85        let path = match definition.parameters.get(PATH) {
86            Some(Some(value)) => value,
87            _ => return Err(Box::new(StepStartupError::NoPathProvided)),
88        };
89
90        let duration = match definition.parameters.get(SEGMENT_DURATION) {
91            Some(Some(value)) => match value.parse() {
92                Ok(num) => num,
93                Err(_) => {
94                    return Err(Box::new(StepStartupError::InvalidSegmentLength(
95                        value.clone(),
96                    )));
97                }
98            },
99
100            _ => 2,
101        };
102
103        let count = match definition.parameters.get(SEGMENT_COUNT) {
104            Some(Some(value)) => match value.parse::<u16>() {
105                Ok(num) => num,
106                Err(_) => {
107                    return Err(Box::new(StepStartupError::InvalidSegmentCount(
108                        value.clone(),
109                    )));
110                }
111            },
112
113            _ => 0,
114        };
115
116        let stream_name = definition.parameters.get(STREAM_NAME).cloned().flatten();
117
118        let param_generator = ParamGenerator {
119            rtmp_app: get_rtmp_app(definition.get_id().to_string()),
120            path: path.clone(),
121            segment_duration: duration,
122            segment_count: count,
123            stream_name,
124        };
125
126        let handler_generator =
127            FfmpegHandlerGenerator::new(self.ffmpeg_endpoint.clone(), Box::new(param_generator));
128
129        let (reader, mut futures) = ExternalStreamReader::new(
130            get_rtmp_app(definition.get_id().to_string()),
131            self.rtmp_endpoint.clone(),
132            Box::new(handler_generator),
133        );
134
135        let step = FfmpegHlsStep {
136            definition: definition.clone(),
137            status: StepStatus::Created,
138            stream_reader: reader,
139            path: path.clone(),
140        };
141
142        futures.push(notify_when_ffmpeg_endpoint_is_gone(self.ffmpeg_endpoint.clone()).boxed());
143        futures.push(notify_when_path_created(path.clone()).boxed());
144
145        Ok((Box::new(step), futures))
146    }
147}
148
149impl WorkflowStep for FfmpegHlsStep {
150    fn get_status(&self) -> &StepStatus {
151        &self.status
152    }
153
154    fn get_definition(&self) -> &WorkflowStepDefinition {
155        &self.definition
156    }
157
158    fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs) {
159        if let StepStatus::Error { message } = &self.stream_reader.status {
160            error!("external stream reader is in error status, so putting the step in in error status as well.");
161            self.status = StepStatus::Error {
162                message: message.to_string(),
163            };
164            return;
165        }
166
167        for future_result in inputs.notifications.drain(..) {
168            match future_result.downcast::<FutureResult>() {
169                Err(future_result) => {
170                    self.stream_reader
172                        .handle_resolved_future(future_result, outputs)
173                }
174
175                Ok(future_result) => match *future_result {
176                    FutureResult::FfmpegEndpointGone => {
177                        error!("Ffmpeg endpoint has disappeared.  Closing all streams");
178                        self.stream_reader.stop_all_streams();
179                    }
180
181                    FutureResult::HlsPathCreated(result) => match result {
182                        Ok(()) => {
183                            self.status = StepStatus::Active;
184                        }
185
186                        Err(error) => {
187                            error!("Could not create HLS path: '{}': {:?}", self.path, error);
188                            self.status = StepStatus::Error {
189                                message: format!(
190                                    "Could not create HLS path: '{}': {:?}",
191                                    self.path, error
192                                ),
193                            };
194
195                            return;
196                        }
197                    },
198                },
199            };
200        }
201
202        for media in inputs.media.drain(..) {
203            self.stream_reader.handle_media(media, outputs);
204        }
205    }
206
207    fn shutdown(&mut self) {
208        self.stream_reader.stop_all_streams();
209        self.status = StepStatus::Shutdown;
210    }
211}
212
213impl FfmpegParameterGenerator for ParamGenerator {
214    fn form_parameters(&self, stream_id: &StreamId, stream_name: &str) -> FfmpegParams {
215        FfmpegParams {
216            read_in_real_time: true,
217            input: format!("rtmp://localhost/{}/{}", self.rtmp_app, stream_id.0),
218            video_transcode: VideoTranscodeParams::Copy,
219            audio_transcode: AudioTranscodeParams::Copy,
220            scale: None,
221            bitrate_in_kbps: None,
222            target: TargetParams::Hls {
223                path: format!(
224                    "{}/{}.m3u8",
225                    self.path,
226                    self.stream_name.as_deref().unwrap_or(stream_name)
227                ),
228                max_entries: Some(self.segment_count),
229                segment_length: self.segment_duration,
230            },
231        }
232    }
233}
234
235fn get_rtmp_app(id: String) -> String {
236    format!("ffmpeg-hls-{}", id)
237}
238
239async fn notify_when_ffmpeg_endpoint_is_gone(
240    endpoint: UnboundedSender<FfmpegEndpointRequest>,
241) -> Box<dyn StepFutureResult> {
242    endpoint.closed().await;
243
244    Box::new(FutureResult::FfmpegEndpointGone)
245}
246
247async fn notify_when_path_created(path: String) -> Box<dyn StepFutureResult> {
248    let result = tokio::fs::create_dir_all(&path).await;
249    Box::new(FutureResult::HlsPathCreated(result))
250}