mmids_core/workflows/steps/ffmpeg_hls/
mod.rs1use crate::endpoints::ffmpeg::{
7 AudioTranscodeParams, FfmpegEndpointRequest, FfmpegParams, TargetParams, VideoTranscodeParams,
8};
9use crate::endpoints::rtmp_server::RtmpEndpointRequest;
10use crate::workflows::definitions::WorkflowStepDefinition;
11use crate::workflows::steps::factory::StepGenerator;
12use crate::workflows::steps::ffmpeg_handler::{FfmpegHandlerGenerator, FfmpegParameterGenerator};
13use crate::workflows::steps::{
14 ExternalStreamReader, StepCreationResult, StepFutureResult, StepInputs, StepOutputs,
15 StepStatus, WorkflowStep,
16};
17use crate::StreamId;
18use futures::FutureExt;
19use thiserror::Error;
20use tokio::sync::mpsc::UnboundedSender;
21use tracing::error;
22
23const PATH: &str = "path";
24const SEGMENT_DURATION: &str = "duration";
25const SEGMENT_COUNT: &str = "count";
26const STREAM_NAME: &str = "stream_name";
27
28pub struct FfmpegHlsStepGenerator {
30 rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
31 ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
32}
33
34struct FfmpegHlsStep {
35 definition: WorkflowStepDefinition,
36 status: StepStatus,
37 stream_reader: ExternalStreamReader,
38 path: String,
39}
40
41enum FutureResult {
42 FfmpegEndpointGone,
43 HlsPathCreated(tokio::io::Result<()>),
44}
45
46impl StepFutureResult for FutureResult {}
47
48#[derive(Error, Debug)]
49enum StepStartupError {
50 #[error("No path specified. A 'path' is required")]
51 NoPathProvided,
52
53 #[error("Invalid duration of '{0}'. {} should be a number.", SEGMENT_DURATION)]
54 InvalidSegmentLength(String),
55
56 #[error(
57 "Invalid segment count of '{0}'. {} should be a positive number",
58 SEGMENT_COUNT
59 )]
60 InvalidSegmentCount(String),
61}
62
63struct ParamGenerator {
64 rtmp_app: String,
65 path: String,
66 segment_duration: u16,
67 segment_count: u16,
68 stream_name: Option<String>,
69}
70
71impl FfmpegHlsStepGenerator {
72 pub fn new(
73 rtmp_endpoint: UnboundedSender<RtmpEndpointRequest>,
74 ffmpeg_endpoint: UnboundedSender<FfmpegEndpointRequest>,
75 ) -> Self {
76 FfmpegHlsStepGenerator {
77 rtmp_endpoint,
78 ffmpeg_endpoint,
79 }
80 }
81}
82
83impl StepGenerator for FfmpegHlsStepGenerator {
84 fn generate(&self, definition: WorkflowStepDefinition) -> StepCreationResult {
85 let path = match definition.parameters.get(PATH) {
86 Some(Some(value)) => value,
87 _ => return Err(Box::new(StepStartupError::NoPathProvided)),
88 };
89
90 let duration = match definition.parameters.get(SEGMENT_DURATION) {
91 Some(Some(value)) => match value.parse() {
92 Ok(num) => num,
93 Err(_) => {
94 return Err(Box::new(StepStartupError::InvalidSegmentLength(
95 value.clone(),
96 )));
97 }
98 },
99
100 _ => 2,
101 };
102
103 let count = match definition.parameters.get(SEGMENT_COUNT) {
104 Some(Some(value)) => match value.parse::<u16>() {
105 Ok(num) => num,
106 Err(_) => {
107 return Err(Box::new(StepStartupError::InvalidSegmentCount(
108 value.clone(),
109 )));
110 }
111 },
112
113 _ => 0,
114 };
115
116 let stream_name = definition.parameters.get(STREAM_NAME).cloned().flatten();
117
118 let param_generator = ParamGenerator {
119 rtmp_app: get_rtmp_app(definition.get_id().to_string()),
120 path: path.clone(),
121 segment_duration: duration,
122 segment_count: count,
123 stream_name,
124 };
125
126 let handler_generator =
127 FfmpegHandlerGenerator::new(self.ffmpeg_endpoint.clone(), Box::new(param_generator));
128
129 let (reader, mut futures) = ExternalStreamReader::new(
130 get_rtmp_app(definition.get_id().to_string()),
131 self.rtmp_endpoint.clone(),
132 Box::new(handler_generator),
133 );
134
135 let step = FfmpegHlsStep {
136 definition: definition.clone(),
137 status: StepStatus::Created,
138 stream_reader: reader,
139 path: path.clone(),
140 };
141
142 futures.push(notify_when_ffmpeg_endpoint_is_gone(self.ffmpeg_endpoint.clone()).boxed());
143 futures.push(notify_when_path_created(path.clone()).boxed());
144
145 Ok((Box::new(step), futures))
146 }
147}
148
149impl WorkflowStep for FfmpegHlsStep {
150 fn get_status(&self) -> &StepStatus {
151 &self.status
152 }
153
154 fn get_definition(&self) -> &WorkflowStepDefinition {
155 &self.definition
156 }
157
158 fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs) {
159 if let StepStatus::Error { message } = &self.stream_reader.status {
160 error!("external stream reader is in error status, so putting the step in in error status as well.");
161 self.status = StepStatus::Error {
162 message: message.to_string(),
163 };
164 return;
165 }
166
167 for future_result in inputs.notifications.drain(..) {
168 match future_result.downcast::<FutureResult>() {
169 Err(future_result) => {
170 self.stream_reader
172 .handle_resolved_future(future_result, outputs)
173 }
174
175 Ok(future_result) => match *future_result {
176 FutureResult::FfmpegEndpointGone => {
177 error!("Ffmpeg endpoint has disappeared. Closing all streams");
178 self.stream_reader.stop_all_streams();
179 }
180
181 FutureResult::HlsPathCreated(result) => match result {
182 Ok(()) => {
183 self.status = StepStatus::Active;
184 }
185
186 Err(error) => {
187 error!("Could not create HLS path: '{}': {:?}", self.path, error);
188 self.status = StepStatus::Error {
189 message: format!(
190 "Could not create HLS path: '{}': {:?}",
191 self.path, error
192 ),
193 };
194
195 return;
196 }
197 },
198 },
199 };
200 }
201
202 for media in inputs.media.drain(..) {
203 self.stream_reader.handle_media(media, outputs);
204 }
205 }
206
207 fn shutdown(&mut self) {
208 self.stream_reader.stop_all_streams();
209 self.status = StepStatus::Shutdown;
210 }
211}
212
213impl FfmpegParameterGenerator for ParamGenerator {
214 fn form_parameters(&self, stream_id: &StreamId, stream_name: &str) -> FfmpegParams {
215 FfmpegParams {
216 read_in_real_time: true,
217 input: format!("rtmp://localhost/{}/{}", self.rtmp_app, stream_id.0),
218 video_transcode: VideoTranscodeParams::Copy,
219 audio_transcode: AudioTranscodeParams::Copy,
220 scale: None,
221 bitrate_in_kbps: None,
222 target: TargetParams::Hls {
223 path: format!(
224 "{}/{}.m3u8",
225 self.path,
226 self.stream_name.as_deref().unwrap_or(stream_name)
227 ),
228 max_entries: Some(self.segment_count),
229 segment_length: self.segment_duration,
230 },
231 }
232 }
233}
234
235fn get_rtmp_app(id: String) -> String {
236 format!("ffmpeg-hls-{}", id)
237}
238
239async fn notify_when_ffmpeg_endpoint_is_gone(
240 endpoint: UnboundedSender<FfmpegEndpointRequest>,
241) -> Box<dyn StepFutureResult> {
242 endpoint.closed().await;
243
244 Box::new(FutureResult::FfmpegEndpointGone)
245}
246
247async fn notify_when_path_created(path: String) -> Box<dyn StepFutureResult> {
248 let result = tokio::fs::create_dir_all(&path).await;
249 Box::new(FutureResult::HlsPathCreated(result))
250}