mmids_core/workflows/steps/
mod.rs1mod external_stream_handler;
4mod external_stream_reader;
5pub mod factory;
6mod ffmpeg_handler;
7pub mod ffmpeg_hls;
8pub mod ffmpeg_pull;
9pub mod ffmpeg_rtmp_push;
10pub mod ffmpeg_transcode;
11pub mod rtmp_receive;
12pub mod rtmp_watch;
13pub mod workflow_forwarder;
14
15use super::MediaNotification;
16use crate::workflows::definitions::WorkflowStepDefinition;
17use downcast_rs::{impl_downcast, Downcast};
18use futures::future::BoxFuture;
19
20pub use external_stream_handler::*;
21pub use external_stream_reader::*;
22
23pub trait StepFutureResult: Downcast {}
26impl_downcast!(StepFutureResult);
27
28pub type FutureList = Vec<BoxFuture<'static, Box<dyn StepFutureResult>>>;
29pub type StepCreationResult = Result<
30 (Box<dyn WorkflowStep + Sync + Send>, FutureList),
31 Box<dyn std::error::Error + Sync + Send>,
32>;
33pub type CreateFactoryFnResult =
34 Box<dyn Fn(&WorkflowStepDefinition) -> StepCreationResult + Send + Sync>;
35
36#[derive(Clone, Debug, PartialEq)]
38pub enum StepStatus {
39 Created,
41
42 Active,
44
45 Error { message: String },
48
49 Shutdown,
52}
53
54pub struct StepInputs {
56 pub media: Vec<MediaNotification>,
58
59 pub notifications: Vec<Box<dyn StepFutureResult>>,
61}
62
63impl StepInputs {
64 pub fn new() -> Self {
65 StepInputs {
66 media: Vec::new(),
67 notifications: Vec::new(),
68 }
69 }
70
71 pub fn clear(&mut self) {
72 self.media.clear();
73 self.notifications.clear();
74 }
75}
76
77pub struct StepOutputs {
79 pub media: Vec<MediaNotification>,
81
82 pub futures: Vec<BoxFuture<'static, Box<dyn StepFutureResult>>>,
84}
85
86impl StepOutputs {
87 pub fn new() -> Self {
88 StepOutputs {
89 media: Vec::new(),
90 futures: Vec::new(),
91 }
92 }
93
94 pub fn clear(&mut self) {
95 self.futures.clear();
96 self.media.clear();
97 }
98}
99
100pub trait WorkflowStep {
102 fn get_status(&self) -> &StepStatus;
104
105 fn get_definition(&self) -> &WorkflowStepDefinition;
107
108 fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs);
115
116 fn shutdown(&mut self);
123}
124
125#[cfg(test)]
126use crate::workflows::steps::factory::StepGenerator;
127#[cfg(test)]
128use anyhow::{anyhow, Result};
129#[cfg(test)]
130use futures::stream::FuturesUnordered;
131#[cfg(test)]
132use futures::StreamExt;
133#[cfg(test)]
134use std::iter::FromIterator;
135#[cfg(test)]
136use std::time::Duration;
137
138#[cfg(test)]
139struct StepTestContext {
140 step: Box<dyn WorkflowStep>,
141 futures: FuturesUnordered<BoxFuture<'static, Box<dyn StepFutureResult>>>,
142 media_outputs: Vec<MediaNotification>,
143}
144
145#[cfg(test)]
146impl StepTestContext {
147 fn new(generator: Box<dyn StepGenerator>, definition: WorkflowStepDefinition) -> Result<Self> {
148 let (step, futures) = generator
149 .generate(definition)
150 .or_else(|error| Err(anyhow!("Failed to generate workflow step: {:?}", error)))?;
151
152 Ok(StepTestContext {
153 step,
154 futures: FuturesUnordered::from_iter(futures),
155 media_outputs: Vec::new(),
156 })
157 }
158
159 fn execute_with_media(&mut self, media: MediaNotification) {
160 let mut outputs = StepOutputs::new();
161 let mut inputs = StepInputs::new();
162 inputs.media.push(media);
163
164 self.step.execute(&mut inputs, &mut outputs);
165
166 self.futures.extend(outputs.futures.drain(..));
167 self.media_outputs = outputs.media;
168 }
169
170 async fn execute_notification(&mut self, notification: Box<dyn StepFutureResult>) {
171 let mut outputs = StepOutputs::new();
172 let mut inputs = StepInputs::new();
173 inputs.notifications.push(notification);
174
175 self.step.execute(&mut inputs, &mut outputs);
176
177 self.futures.extend(outputs.futures.drain(..));
178 self.media_outputs = outputs.media;
179
180 self.execute_pending_notifications().await;
181 }
182
183 async fn execute_pending_notifications(&mut self) {
184 loop {
185 let notification =
186 match tokio::time::timeout(Duration::from_millis(10), self.futures.next()).await {
187 Ok(Some(notification)) => notification,
188 _ => break,
189 };
190
191 let mut outputs = StepOutputs::new();
192 let mut inputs = StepInputs::new();
193 inputs.notifications.push(notification);
194
195 self.step.execute(&mut inputs, &mut outputs);
196
197 self.futures.extend(outputs.futures.drain(..));
198 self.media_outputs = outputs.media;
199 }
200 }
201
202 fn assert_media_passed_through(&mut self, media: MediaNotification) {
203 self.execute_with_media(media.clone());
204
205 assert_eq!(
206 self.media_outputs.len(),
207 1,
208 "Unexpected number of media outputs"
209 );
210 assert_eq!(self.media_outputs[0], media, "Unexpected media message");
211 }
212
213 fn assert_media_not_passed_through(&mut self, media: MediaNotification) {
214 self.execute_with_media(media.clone());
215
216 assert!(self.media_outputs.is_empty(), "Expected no media outputs");
217 }
218}