mmids_core/workflows/runner/
mod.rs

1#[cfg(test)]
2mod test_context;
3#[cfg(test)]
4mod test_steps;
5#[cfg(test)]
6mod tests;
7
8use crate::workflows::definitions::{WorkflowDefinition, WorkflowStepDefinition};
9use crate::workflows::steps::factory::WorkflowStepFactory;
10use crate::workflows::steps::{
11    StepFutureResult, StepInputs, StepOutputs, StepStatus, WorkflowStep,
12};
13use crate::workflows::{MediaNotification, MediaNotificationContent};
14use crate::StreamId;
15use futures::future::BoxFuture;
16use futures::stream::FuturesUnordered;
17use futures::{FutureExt, StreamExt};
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
21use tokio::sync::oneshot::Sender;
22use tracing::{error, info, instrument, span, warn, Level};
23
24/// A request to the workflow to perform an action
25#[derive(Debug)]
26pub struct WorkflowRequest {
27    /// An identifier that can be used to correlate this request with its
28    pub request_id: String,
29    pub operation: WorkflowRequestOperation,
30}
31
32/// Operations that can be made to an actively running workflow
33#[derive(Debug)]
34pub enum WorkflowRequestOperation {
35    /// Requests the workflow update with a new definition. The workflow will take shape to look
36    /// exactly as the specified definition has.  Any existing steps that aren't specified will
37    /// be removed, any new steps will be created, and any steps that stay will reflect the order
38    /// specified.
39    UpdateDefinition { new_definition: WorkflowDefinition },
40
41    /// Requests the workflow to return a snapshot of its current state
42    GetState {
43        response_channel: Sender<Option<WorkflowState>>,
44    },
45
46    /// Requests the workflow stop operating
47    StopWorkflow,
48
49    /// Sends a media notification to this stream
50    MediaNotification { media: MediaNotification },
51}
52
53#[derive(Debug)]
54pub struct WorkflowState {
55    pub status: WorkflowStatus,
56    pub active_steps: Vec<WorkflowStepState>,
57    pub pending_steps: Vec<WorkflowStepState>,
58}
59
60#[derive(Debug)]
61pub struct WorkflowStepState {
62    pub step_id: u64,
63    pub definition: WorkflowStepDefinition,
64    pub status: StepStatus,
65}
66
67#[derive(PartialEq, Clone, Debug)]
68pub enum WorkflowStatus {
69    Running,
70    Error {
71        failed_step_id: u64,
72        message: String,
73    },
74}
75
76/// Starts the execution of a workflow with the specified definition
77pub fn start_workflow(
78    definition: WorkflowDefinition,
79    step_factory: Arc<WorkflowStepFactory>,
80) -> UnboundedSender<WorkflowRequest> {
81    let (sender, receiver) = unbounded_channel();
82    let actor = Actor::new(&definition, step_factory, receiver);
83    tokio::spawn(actor.run(definition));
84
85    sender
86}
87
88enum FutureResult {
89    AllConsumersGone,
90    WorkflowRequestReceived(WorkflowRequest, UnboundedReceiver<WorkflowRequest>),
91
92    StepFutureResolved {
93        step_id: u64,
94        result: Box<dyn StepFutureResult>,
95    },
96}
97
98struct StreamDetails {
99    /// The step that first sent a new stream media notification.  We know that if this step is
100    /// removed, the stream no longer has a source of video and should be considered disconnected
101    originating_step_id: u64,
102}
103
104struct Actor {
105    name: String,
106    steps_by_definition_id: HashMap<u64, Box<dyn WorkflowStep>>,
107    active_steps: Vec<u64>,
108    pending_steps: Vec<u64>,
109    futures: FuturesUnordered<BoxFuture<'static, FutureResult>>,
110    step_inputs: StepInputs,
111    step_outputs: StepOutputs,
112    cached_step_media: HashMap<u64, HashMap<StreamId, Vec<MediaNotification>>>,
113    cached_inbound_media: HashMap<StreamId, Vec<MediaNotification>>,
114    active_streams: HashMap<StreamId, StreamDetails>,
115    step_factory: Arc<WorkflowStepFactory>,
116    step_definitions: HashMap<u64, WorkflowStepDefinition>,
117    status: WorkflowStatus,
118}
119
120impl Actor {
121    #[instrument(skip(definition, step_factory, receiver), fields(workflow_name = %definition.name))]
122    fn new(
123        definition: &WorkflowDefinition,
124        step_factory: Arc<WorkflowStepFactory>,
125        receiver: UnboundedReceiver<WorkflowRequest>,
126    ) -> Self {
127        let futures = FuturesUnordered::new();
128        info!("Creating workflow");
129
130        futures.push(wait_for_workflow_request(receiver).boxed());
131
132        Actor {
133            name: definition.name.clone(),
134            futures,
135            steps_by_definition_id: HashMap::new(),
136            active_steps: Vec::new(),
137            pending_steps: Vec::new(),
138            step_inputs: StepInputs::new(),
139            step_outputs: StepOutputs::new(),
140            cached_step_media: HashMap::new(),
141            cached_inbound_media: HashMap::new(),
142            active_streams: HashMap::new(),
143            step_factory,
144            step_definitions: HashMap::new(),
145            status: WorkflowStatus::Running,
146        }
147    }
148
149    #[instrument(name = "Workflow Execution", skip(self, initial_definition), fields(workflow_name = %self.name))]
150    async fn run(mut self, initial_definition: WorkflowDefinition) {
151        info!("Starting workflow");
152
153        self.apply_new_definition(initial_definition);
154
155        while let Some(future) = self.futures.next().await {
156            match future {
157                FutureResult::AllConsumersGone => {
158                    warn!("All channel owners gone");
159                    break;
160                }
161
162                FutureResult::WorkflowRequestReceived(request, receiver) => {
163                    self.futures
164                        .push(wait_for_workflow_request(receiver).boxed());
165
166                    let mut stop_workflow = false;
167                    self.handle_workflow_request(request, &mut stop_workflow);
168
169                    if stop_workflow {
170                        break;
171                    }
172                }
173
174                FutureResult::StepFutureResolved { step_id, result } => {
175                    self.execute_steps(step_id, Some(result), false, true);
176                }
177            }
178        }
179
180        info!("Workflow closing");
181    }
182
183    #[instrument(skip(self, request, stop_workflow), fields(request_id = %request.request_id))]
184    fn handle_workflow_request(&mut self, request: WorkflowRequest, stop_workflow: &mut bool) {
185        match request.operation {
186            WorkflowRequestOperation::UpdateDefinition { new_definition } => {
187                self.apply_new_definition(new_definition);
188            }
189
190            WorkflowRequestOperation::GetState { response_channel } => {
191                info!("Workflow state requested by external caller");
192                let mut state = WorkflowState {
193                    status: self.status.clone(),
194                    pending_steps: Vec::new(),
195                    active_steps: Vec::new(),
196                };
197
198                for id in &self.pending_steps {
199                    if let Some(definition) = self.step_definitions.get(&id) {
200                        if let Some(step) = self.steps_by_definition_id.get(&id) {
201                            state.pending_steps.push(WorkflowStepState {
202                                step_id: *id,
203                                definition: definition.clone(),
204                                status: step.get_status().clone(),
205                            });
206                        } else {
207                            state.pending_steps.push(WorkflowStepState {
208                                step_id: *id,
209                                definition: definition.clone(),
210                                status: StepStatus::Error {
211                                    message: "Step not instantiated".to_string(),
212                                },
213                            });
214                        }
215                    } else {
216                        error!(step_id = %id, "No definition was found for step id {}", id);
217                    }
218                }
219
220                for id in &self.active_steps {
221                    if let Some(definition) = self.step_definitions.get(&id) {
222                        if let Some(step) = self.steps_by_definition_id.get(&id) {
223                            state.active_steps.push(WorkflowStepState {
224                                step_id: *id,
225                                definition: definition.clone(),
226                                status: step.get_status().clone(),
227                            });
228                        } else {
229                            state.active_steps.push(WorkflowStepState {
230                                step_id: *id,
231                                definition: definition.clone(),
232                                status: StepStatus::Error {
233                                    message: "Step not instantiated".to_string(),
234                                },
235                            });
236                        }
237                    } else {
238                        error!(step_id = %id, "No definition was found for step id {}", id);
239                    }
240                }
241
242                let _ = response_channel.send(Some(state));
243            }
244
245            WorkflowRequestOperation::StopWorkflow => {
246                info!("Closing workflow as requested");
247                *stop_workflow = true;
248
249                for id in &self.active_steps {
250                    if let Some(step) = self.steps_by_definition_id.get_mut(id) {
251                        step.shutdown();
252                    }
253                }
254
255                for id in &self.pending_steps {
256                    if let Some(step) = self.steps_by_definition_id.get_mut(id) {
257                        step.shutdown();
258                    }
259                }
260            }
261
262            WorkflowRequestOperation::MediaNotification { media } => {
263                self.update_inbound_media_cache(&media);
264                self.step_inputs.clear();
265                self.step_inputs.media.push(media);
266                if let Some(id) = self.active_steps.get(0) {
267                    let id = *id;
268                    self.execute_steps(id, None, true, true);
269                }
270            }
271        }
272    }
273
274    fn apply_new_definition(&mut self, definition: WorkflowDefinition) {
275        let new_step_ids = definition
276            .steps
277            .iter()
278            .map(|x| x.get_id())
279            .collect::<HashSet<_>>();
280
281        if self.status == WorkflowStatus::Running
282            && self.pending_steps.is_empty()
283            && self.active_steps.len() == new_step_ids.len()
284            && self.active_steps.iter().all(|x| new_step_ids.contains(x))
285        {
286            // No actual changes to this workflow
287            return;
288        }
289
290        info!(
291            "Applying a new workflow definition with {} steps",
292            definition.steps.len()
293        );
294
295        // If the workflow is in an errored state, clear out all the existing steps, as they've
296        // been shut down anyway. So start this from a clean state
297        if let WorkflowStatus::Error {
298            message: _,
299            failed_step_id: _,
300        } = &self.status
301        {
302            self.active_steps.clear();
303            self.steps_by_definition_id.clear();
304            self.status = WorkflowStatus::Running;
305        }
306
307        self.pending_steps.clear();
308        for step_definition in definition.steps {
309            let id = step_definition.get_id();
310            let step_type = step_definition.step_type.clone();
311            self.step_definitions
312                .insert(step_definition.get_id(), step_definition.clone());
313
314            self.pending_steps.push(id);
315
316            if !self.steps_by_definition_id.contains_key(&id) {
317                let span = span!(Level::INFO, "Step Creation", step_id = id);
318                let _enter = span.enter();
319
320                let mut details = format!("{}: ", step_definition.step_type.0);
321                for (key, value) in &step_definition.parameters {
322                    match value {
323                        Some(value) => details.push_str(&format!("{}={} ", key, value)),
324                        None => details.push_str(&format!("{} ", key)),
325                    };
326                }
327
328                info!("Creating step {}", details);
329
330                let step_result = match self.step_factory.create_step(step_definition) {
331                    Ok(step_result) => step_result,
332                    Err(error) => {
333                        error!("Step factory failed to generate step instance: {:?}", error);
334                        self.set_status_to_error(
335                            id,
336                            format!("Failed to generate step instance: {:?}", error),
337                        );
338
339                        return;
340                    }
341                };
342
343                let (step, futures) = match step_result {
344                    Ok((step, futures)) => (step, futures),
345                    Err(error) => {
346                        error!("Step could not be generated: {}", error);
347                        self.set_status_to_error(id, format!("Failed to generate step: {}", error));
348
349                        return;
350                    }
351                };
352
353                for future in futures {
354                    self.futures.push(wait_for_step_future(id, future).boxed());
355                }
356
357                self.steps_by_definition_id.insert(id, step);
358                info!("Step type '{}' created", step_type);
359            }
360        }
361
362        self.check_if_all_pending_steps_are_active(true);
363    }
364
365    fn execute_steps(
366        &mut self,
367        initial_step_id: u64,
368        future_result: Option<Box<dyn StepFutureResult>>,
369        preserve_current_step_inputs: bool,
370        perform_pending_check: bool,
371    ) {
372        if self.status != WorkflowStatus::Running {
373            return;
374        }
375
376        if !preserve_current_step_inputs {
377            self.step_inputs.clear();
378        }
379
380        self.step_outputs.clear();
381
382        if let Some(future_result) = future_result {
383            self.step_inputs.notifications.push(future_result);
384        }
385
386        let mut start_index = None;
387        for x in 0..self.active_steps.len() {
388            if self.active_steps[x] == initial_step_id {
389                start_index = Some(x);
390                break;
391            }
392        }
393
394        // If we have a start_index, that means the step we want to execute is an active step.  So
395        // execute that step and all active steps after it. If it's not an active step, then we
396        // only want to execute that one step and none others.
397        if let Some(start_index) = start_index {
398            for x in start_index..self.active_steps.len() {
399                self.execute_step(self.active_steps[x]);
400            }
401        } else {
402            self.execute_step(initial_step_id);
403        }
404
405        if perform_pending_check {
406            self.check_if_all_pending_steps_are_active(false);
407        }
408    }
409
410    fn execute_step(&mut self, step_id: u64) {
411        if self.status != WorkflowStatus::Running {
412            return;
413        }
414
415        let span = span!(Level::INFO, "Step Execution", step_id = step_id);
416        let _enter = span.enter();
417
418        let step = match self.steps_by_definition_id.get_mut(&step_id) {
419            Some(x) => x,
420            None => {
421                let is_active = self.active_steps.contains(&step_id);
422                error!(
423                    "Attempted to execute step id {} but we it has no definition (is active: {})",
424                    step_id, is_active
425                );
426
427                return;
428            }
429        };
430
431        step.execute(&mut self.step_inputs, &mut self.step_outputs);
432        if let StepStatus::Error { message } = step.get_status() {
433            let message = message.clone();
434            self.set_status_to_error(step_id, message);
435
436            return;
437        }
438
439        for future in self.step_outputs.futures.drain(..) {
440            self.futures
441                .push(wait_for_step_future(step.get_definition().get_id(), future).boxed());
442        }
443
444        self.update_stream_details(step_id);
445        self.update_media_cache_from_outputs(step_id);
446        self.step_inputs.clear();
447        self.step_inputs
448            .media
449            .extend(self.step_outputs.media.drain(..));
450
451        self.step_outputs.clear();
452    }
453
454    fn check_if_all_pending_steps_are_active(&mut self, swap_if_pending_is_empty: bool) {
455        let mut all_are_active = true;
456        for id in &self.pending_steps {
457            let step = match self.steps_by_definition_id.get(id) {
458                Some(x) => Some(x),
459                None => {
460                    error!(
461                        step_id = id,
462                        "Workflow had step id {} pending but this step was not defined", id
463                    );
464
465                    let id = *id;
466                    self.set_status_to_error(id, "workflow step not defined".to_string());
467                    return;
468                }
469            };
470
471            if let Some(step) = step {
472                match step.get_status() {
473                    StepStatus::Created => all_are_active = false,
474                    StepStatus::Active => (),
475
476                    StepStatus::Error { message } => {
477                        let id = *id;
478                        let message = message.clone();
479                        self.set_status_to_error(id, message);
480                        return;
481                    }
482                    StepStatus::Shutdown => return,
483                }
484            } else {
485                // the step is still waiting to be instantiated by the factory
486            }
487        }
488
489        if (self.pending_steps.len() > 0 && all_are_active)
490            || (self.pending_steps.len() == 0 && swap_if_pending_is_empty)
491        {
492            // Since we have pending steps and all are now ready to become active, we need to
493            // swap all active steps for pending steps to make them active.
494
495            // In the case of `swap_if_pending_is_empty`, this is usually the case if the user
496            // updates this workflow with a definition that contains no workflow steps, then that
497            // means the user specifically wants this workflow empty.  So we need to tear down all
498            // active steps.
499
500            // Note: there's a possibility that a pending swap can trigger a new set
501            // of sequence headers to fall through.  An example of this happening is if
502            // a transcoding step is placed in between an existing playback step.  This
503            // will probably cause playback issues unless the client supports changing
504            // decoding parameters mid-stream, which isn't certain.  We either need to
505            // leave this up to mmids operators to realize, or need to come up with a
506            // solution to remove the footgun (such as disconnecting playback clients
507            // upon a new sequence header being seen).  Unsure if that's the best
508            // approach though.
509            for index in (0..self.active_steps.len()).rev() {
510                let step_id = self.active_steps[index];
511                if !self.pending_steps.contains(&step_id) {
512                    // Since this step is currently active but not pending, the swap will make this
513                    // step go away for good.  Therefore, we need to clean up its definition and
514                    // raise disconnection notices for any streams originating from this step, so
515                    // that latter steps that will survive will know not to expect more media
516                    // from these streams.
517                    info!(step_id = step_id, "Removing now unused step id {}", step_id);
518                    self.step_definitions.remove(&step_id);
519                    if let Some(mut step) = self.steps_by_definition_id.remove(&step_id) {
520                        let span = span!(Level::INFO, "Step Shutdown", step_id = %step_id);
521                        let _enter = span.enter();
522                        step.shutdown();
523                    }
524
525                    if let Some(cache) = self.cached_step_media.remove(&step_id) {
526                        for key in cache.keys() {
527                            if let Some(stream) = self.active_streams.get(key) {
528                                if stream.originating_step_id == step_id {
529                                    for x in (index + 1)..self.active_steps.len() {
530                                        self.step_outputs.clear();
531                                        self.step_inputs.clear();
532                                        self.step_inputs.media.push(MediaNotification {
533                                            stream_id: key.clone(),
534                                            content: MediaNotificationContent::StreamDisconnected,
535                                        });
536
537                                        self.execute_step(self.active_steps[x]);
538                                    }
539
540                                    self.active_streams.remove(key);
541                                }
542                            }
543                        }
544                    }
545                }
546            }
547
548            // Since some pending steps may not have been around previously, they would not have
549            // gotten stream started notifications and missing sequence headers.  So we need to
550            // find its parent step's cache and replay any required media notifications
551            for index in 0..self.pending_steps.len() {
552                let current_step_id = self.pending_steps[index];
553                if !self.active_steps.contains(&current_step_id) {
554                    // This is a new step
555                    let notifications = if index == 0 {
556                        // The first step uses the inbound cache, not step based cache
557                        self.cached_inbound_media
558                            .values()
559                            .flatten()
560                            .map(|x| x.clone())
561                            .collect::<Vec<_>>()
562                    } else {
563                        let previous_step_id = self.pending_steps[index - 1];
564                        if let Some(cache) = self.cached_step_media.get(&previous_step_id) {
565                            cache
566                                .values()
567                                .flatten()
568                                .map(|x| x.clone())
569                                .collect::<Vec<_>>()
570                        } else {
571                            Vec::new()
572                        }
573                    };
574
575                    self.step_inputs.clear();
576                    self.step_inputs.media.extend(notifications);
577                    self.execute_steps(current_step_id, None, true, false);
578
579                    // TODO: This is probably going to cause duplicate stream started notifications.
580                    // Not sure a way around that and we probably need to remove those warnings.
581
582                    // TODO: The current code only handles notifications raised by parents of
583                    // new steps.  There's the possibility that a change of order of existing
584                    // steps could cause steps to be tracking streams that come in after the step,
585                    // or not know about steps that were created in steps that used to be after but
586                    // is now before.  It also means it may have outdated sequence headers if
587                    // a transcoding step was removed.
588                }
589            }
590
591            std::mem::swap(&mut self.pending_steps, &mut self.active_steps);
592            self.pending_steps.clear();
593
594            info!("All pending steps moved to active");
595        }
596    }
597
598    fn update_stream_details(&mut self, current_step_id: u64) {
599        for media in &self.step_outputs.media {
600            match &media.content {
601                MediaNotificationContent::Video { .. } => (),
602                MediaNotificationContent::Audio { .. } => (),
603                MediaNotificationContent::Metadata { .. } => (),
604                MediaNotificationContent::NewIncomingStream { .. } => {
605                    if !self.active_streams.contains_key(&media.stream_id) {
606                        // Since this is the first time we've gotten a new incoming stream
607                        // notification for this stream, assume this this stream originates from
608                        // the current step
609                        self.active_streams.insert(
610                            media.stream_id.clone(),
611                            StreamDetails {
612                                originating_step_id: current_step_id,
613                            },
614                        );
615                    }
616                }
617
618                MediaNotificationContent::StreamDisconnected => {
619                    if let Some(details) = self.active_streams.get(&media.stream_id) {
620                        if details.originating_step_id == current_step_id {
621                            self.active_streams.remove(&media.stream_id);
622                        }
623                    }
624                }
625            }
626        }
627    }
628
629    fn update_inbound_media_cache(&mut self, media: &MediaNotification) {
630        match media.content {
631            MediaNotificationContent::NewIncomingStream { .. } => {
632                let collection = vec![media.clone()];
633                self.cached_inbound_media
634                    .insert(media.stream_id.clone(), collection);
635            }
636
637            MediaNotificationContent::StreamDisconnected => {
638                self.cached_inbound_media.remove(&media.stream_id);
639            }
640
641            MediaNotificationContent::Audio {
642                is_sequence_header: true,
643                ..
644            } => {
645                if let Some(collection) = self.cached_inbound_media.get_mut(&media.stream_id) {
646                    collection.push(media.clone());
647                }
648            }
649
650            MediaNotificationContent::Video {
651                is_sequence_header: true,
652                ..
653            } => {
654                if let Some(collectoin) = self.cached_inbound_media.get_mut(&media.stream_id) {
655                    collectoin.push(media.clone());
656                }
657            }
658
659            _ => (),
660        }
661    }
662
663    fn update_media_cache_from_outputs(&mut self, step_id: u64) {
664        let step_cache = self
665            .cached_step_media
666            .entry(step_id)
667            .or_insert(HashMap::new());
668
669        for media in &self.step_outputs.media {
670            enum Operation {
671                Add,
672                Remove,
673                Ignore,
674            }
675            let operation = match &media.content {
676                MediaNotificationContent::StreamDisconnected => {
677                    // Stream has ended so no reason to keep the cache around
678                    Operation::Remove
679                }
680
681                MediaNotificationContent::NewIncomingStream { .. } => Operation::Add,
682
683                MediaNotificationContent::Metadata { .. } => {
684                    // I *think* we can ignore these, since the sequence headers are really
685                    // what's important to replay
686                    Operation::Ignore
687                }
688
689                MediaNotificationContent::Video {
690                    is_sequence_header, ..
691                } => {
692                    // We must cache sequence headers.  We *may* need to cache the latest key frame
693                    if *is_sequence_header {
694                        Operation::Add
695                    } else {
696                        Operation::Ignore
697                    }
698                }
699
700                MediaNotificationContent::Audio {
701                    is_sequence_header, ..
702                } => {
703                    if *is_sequence_header {
704                        Operation::Add
705                    } else {
706                        Operation::Ignore
707                    }
708                }
709            };
710
711            match operation {
712                Operation::Ignore => (),
713                Operation::Remove => {
714                    step_cache.remove(&media.stream_id);
715                }
716
717                Operation::Add => {
718                    let collection = step_cache
719                        .entry(media.stream_id.clone())
720                        .or_insert(Vec::new());
721
722                    collection.push(media.clone());
723                }
724            }
725        }
726    }
727
728    fn set_status_to_error(&mut self, step_id: u64, message: String) {
729        error!(
730            "Workflow set to error state due to step id {}: {}",
731            step_id, message
732        );
733        self.status = WorkflowStatus::Error {
734            failed_step_id: step_id,
735            message,
736        };
737
738        for step_id in &self.active_steps {
739            if let Some(step) = self.steps_by_definition_id.get_mut(step_id) {
740                step.shutdown();
741            }
742        }
743
744        for step_id in &self.pending_steps {
745            if let Some(step) = self.steps_by_definition_id.get_mut(step_id) {
746                step.shutdown();
747            }
748        }
749    }
750}
751
752unsafe impl Send for Actor {}
753
754async fn wait_for_workflow_request(
755    mut receiver: UnboundedReceiver<WorkflowRequest>,
756) -> FutureResult {
757    match receiver.recv().await {
758        Some(x) => FutureResult::WorkflowRequestReceived(x, receiver),
759        None => FutureResult::AllConsumersGone,
760    }
761}
762
763async fn wait_for_step_future(
764    step_id: u64,
765    future: BoxFuture<'static, Box<dyn StepFutureResult>>,
766) -> FutureResult {
767    let result = future.await;
768    FutureResult::StepFutureResolved { step_id, result }
769}