mmids_core/workflows/steps/
mod.rs

1//! Workflow steps are individual actions that can be taken on media as part of a media pipeline.
2
3mod external_stream_handler;
4mod external_stream_reader;
5pub mod factory;
6mod ffmpeg_handler;
7pub mod ffmpeg_hls;
8pub mod ffmpeg_pull;
9pub mod ffmpeg_rtmp_push;
10pub mod ffmpeg_transcode;
11pub mod rtmp_receive;
12pub mod rtmp_watch;
13pub mod workflow_forwarder;
14
15use super::MediaNotification;
16use crate::workflows::definitions::WorkflowStepDefinition;
17use downcast_rs::{impl_downcast, Downcast};
18use futures::future::BoxFuture;
19
20pub use external_stream_handler::*;
21pub use external_stream_reader::*;
22
23/// Represents the result of a future for a workflow step.  It is expected that the workflow step
24/// will downcast this result into a struct that it owns.
25pub trait StepFutureResult: Downcast {}
26impl_downcast!(StepFutureResult);
27
28pub type FutureList = Vec<BoxFuture<'static, Box<dyn StepFutureResult>>>;
29pub type StepCreationResult = Result<
30    (Box<dyn WorkflowStep + Sync + Send>, FutureList),
31    Box<dyn std::error::Error + Sync + Send>,
32>;
33pub type CreateFactoryFnResult =
34    Box<dyn Fn(&WorkflowStepDefinition) -> StepCreationResult + Send + Sync>;
35
36/// Various statuses of an individual step
37#[derive(Clone, Debug, PartialEq)]
38pub enum StepStatus {
39    /// The step has been created but it is not yet ready to handle media
40    Created,
41
42    /// The step is fully active and ready for handling media
43    Active,
44
45    /// The step has encountered an unrecoverable error and can no longer handle media or
46    /// notifications.  It will likely have to be recreated.
47    Error { message: String },
48
49    /// The step has been shut down and is not expected to be invoked anymore. If it's wanted to be
50    /// used it will have to be recreated
51    Shutdown,
52}
53
54/// Inputs to be passed in for execution of a workflow step.
55pub struct StepInputs {
56    /// Media notifications that the step may be interested in
57    pub media: Vec<MediaNotification>,
58
59    /// Any resolved futures that are specific to this step
60    pub notifications: Vec<Box<dyn StepFutureResult>>,
61}
62
63impl StepInputs {
64    pub fn new() -> Self {
65        StepInputs {
66            media: Vec::new(),
67            notifications: Vec::new(),
68        }
69    }
70
71    pub fn clear(&mut self) {
72        self.media.clear();
73        self.notifications.clear();
74    }
75}
76
77/// Resulting outputs that come from executing a workflow step.
78pub struct StepOutputs {
79    /// Media notifications that the workflow step intends to pass to the next workflow step
80    pub media: Vec<MediaNotification>,
81
82    /// Any futures the workflow should track for this step
83    pub futures: Vec<BoxFuture<'static, Box<dyn StepFutureResult>>>,
84}
85
86impl StepOutputs {
87    pub fn new() -> Self {
88        StepOutputs {
89            media: Vec::new(),
90            futures: Vec::new(),
91        }
92    }
93
94    pub fn clear(&mut self) {
95        self.futures.clear();
96        self.media.clear();
97    }
98}
99
100/// Represents a workflow step that can be executed
101pub trait WorkflowStep {
102    /// Returns a reference to the status of the current workflow step
103    fn get_status(&self) -> &StepStatus;
104
105    /// Returns a reference to the definition this workflow step was created with
106    fn get_definition(&self) -> &WorkflowStepDefinition;
107
108    /// Executes the workflow step with the specified media and future resolution inputs.  Any outputs
109    /// that are generated as a result of this execution will be placed in the `outputs` parameter,
110    /// to allow vectors to be re-used.
111    ///
112    /// It is expected that `execute()` will not be called if the step is in an Error or Torn Down
113    /// state.
114    fn execute(&mut self, inputs: &mut StepInputs, outputs: &mut StepOutputs);
115
116    /// Notifies the step that it is no longer needed and that all streams its managing should be
117    /// closed.  All endpoints the step has interacted with should be proactively notified that it
118    /// is being removed, as it can not be guaranteed that all channels will be automatically
119    /// closed.
120    ///
121    /// After this is called it is expected that the workflow step is in a `TornDown` state.
122    fn shutdown(&mut self);
123}
124
125#[cfg(test)]
126use crate::workflows::steps::factory::StepGenerator;
127#[cfg(test)]
128use anyhow::{anyhow, Result};
129#[cfg(test)]
130use futures::stream::FuturesUnordered;
131#[cfg(test)]
132use futures::StreamExt;
133#[cfg(test)]
134use std::iter::FromIterator;
135#[cfg(test)]
136use std::time::Duration;
137
138#[cfg(test)]
139struct StepTestContext {
140    step: Box<dyn WorkflowStep>,
141    futures: FuturesUnordered<BoxFuture<'static, Box<dyn StepFutureResult>>>,
142    media_outputs: Vec<MediaNotification>,
143}
144
145#[cfg(test)]
146impl StepTestContext {
147    fn new(generator: Box<dyn StepGenerator>, definition: WorkflowStepDefinition) -> Result<Self> {
148        let (step, futures) = generator
149            .generate(definition)
150            .or_else(|error| Err(anyhow!("Failed to generate workflow step: {:?}", error)))?;
151
152        Ok(StepTestContext {
153            step,
154            futures: FuturesUnordered::from_iter(futures),
155            media_outputs: Vec::new(),
156        })
157    }
158
159    fn execute_with_media(&mut self, media: MediaNotification) {
160        let mut outputs = StepOutputs::new();
161        let mut inputs = StepInputs::new();
162        inputs.media.push(media);
163
164        self.step.execute(&mut inputs, &mut outputs);
165
166        self.futures.extend(outputs.futures.drain(..));
167        self.media_outputs = outputs.media;
168    }
169
170    async fn execute_notification(&mut self, notification: Box<dyn StepFutureResult>) {
171        let mut outputs = StepOutputs::new();
172        let mut inputs = StepInputs::new();
173        inputs.notifications.push(notification);
174
175        self.step.execute(&mut inputs, &mut outputs);
176
177        self.futures.extend(outputs.futures.drain(..));
178        self.media_outputs = outputs.media;
179
180        self.execute_pending_notifications().await;
181    }
182
183    async fn execute_pending_notifications(&mut self) {
184        loop {
185            let notification =
186                match tokio::time::timeout(Duration::from_millis(10), self.futures.next()).await {
187                    Ok(Some(notification)) => notification,
188                    _ => break,
189                };
190
191            let mut outputs = StepOutputs::new();
192            let mut inputs = StepInputs::new();
193            inputs.notifications.push(notification);
194
195            self.step.execute(&mut inputs, &mut outputs);
196
197            self.futures.extend(outputs.futures.drain(..));
198            self.media_outputs = outputs.media;
199        }
200    }
201
202    fn assert_media_passed_through(&mut self, media: MediaNotification) {
203        self.execute_with_media(media.clone());
204
205        assert_eq!(
206            self.media_outputs.len(),
207            1,
208            "Unexpected number of media outputs"
209        );
210        assert_eq!(self.media_outputs[0], media, "Unexpected media message");
211    }
212
213    fn assert_media_not_passed_through(&mut self, media: MediaNotification) {
214        self.execute_with_media(media.clone());
215
216        assert!(self.media_outputs.is_empty(), "Expected no media outputs");
217    }
218}