1#[cfg(test)]
2mod test_context;
3#[cfg(test)]
4mod test_steps;
5#[cfg(test)]
6mod tests;
7
8use crate::workflows::definitions::{WorkflowDefinition, WorkflowStepDefinition};
9use crate::workflows::steps::factory::WorkflowStepFactory;
10use crate::workflows::steps::{
11 StepFutureResult, StepInputs, StepOutputs, StepStatus, WorkflowStep,
12};
13use crate::workflows::{MediaNotification, MediaNotificationContent};
14use crate::StreamId;
15use futures::future::BoxFuture;
16use futures::stream::FuturesUnordered;
17use futures::{FutureExt, StreamExt};
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
21use tokio::sync::oneshot::Sender;
22use tracing::{error, info, instrument, span, warn, Level};
23
24#[derive(Debug)]
26pub struct WorkflowRequest {
27 pub request_id: String,
29 pub operation: WorkflowRequestOperation,
30}
31
32#[derive(Debug)]
34pub enum WorkflowRequestOperation {
35 UpdateDefinition { new_definition: WorkflowDefinition },
40
41 GetState {
43 response_channel: Sender<Option<WorkflowState>>,
44 },
45
46 StopWorkflow,
48
49 MediaNotification { media: MediaNotification },
51}
52
53#[derive(Debug)]
54pub struct WorkflowState {
55 pub status: WorkflowStatus,
56 pub active_steps: Vec<WorkflowStepState>,
57 pub pending_steps: Vec<WorkflowStepState>,
58}
59
60#[derive(Debug)]
61pub struct WorkflowStepState {
62 pub step_id: u64,
63 pub definition: WorkflowStepDefinition,
64 pub status: StepStatus,
65}
66
67#[derive(PartialEq, Clone, Debug)]
68pub enum WorkflowStatus {
69 Running,
70 Error {
71 failed_step_id: u64,
72 message: String,
73 },
74}
75
76pub fn start_workflow(
78 definition: WorkflowDefinition,
79 step_factory: Arc<WorkflowStepFactory>,
80) -> UnboundedSender<WorkflowRequest> {
81 let (sender, receiver) = unbounded_channel();
82 let actor = Actor::new(&definition, step_factory, receiver);
83 tokio::spawn(actor.run(definition));
84
85 sender
86}
87
88enum FutureResult {
89 AllConsumersGone,
90 WorkflowRequestReceived(WorkflowRequest, UnboundedReceiver<WorkflowRequest>),
91
92 StepFutureResolved {
93 step_id: u64,
94 result: Box<dyn StepFutureResult>,
95 },
96}
97
98struct StreamDetails {
99 originating_step_id: u64,
102}
103
104struct Actor {
105 name: String,
106 steps_by_definition_id: HashMap<u64, Box<dyn WorkflowStep>>,
107 active_steps: Vec<u64>,
108 pending_steps: Vec<u64>,
109 futures: FuturesUnordered<BoxFuture<'static, FutureResult>>,
110 step_inputs: StepInputs,
111 step_outputs: StepOutputs,
112 cached_step_media: HashMap<u64, HashMap<StreamId, Vec<MediaNotification>>>,
113 cached_inbound_media: HashMap<StreamId, Vec<MediaNotification>>,
114 active_streams: HashMap<StreamId, StreamDetails>,
115 step_factory: Arc<WorkflowStepFactory>,
116 step_definitions: HashMap<u64, WorkflowStepDefinition>,
117 status: WorkflowStatus,
118}
119
120impl Actor {
121 #[instrument(skip(definition, step_factory, receiver), fields(workflow_name = %definition.name))]
122 fn new(
123 definition: &WorkflowDefinition,
124 step_factory: Arc<WorkflowStepFactory>,
125 receiver: UnboundedReceiver<WorkflowRequest>,
126 ) -> Self {
127 let futures = FuturesUnordered::new();
128 info!("Creating workflow");
129
130 futures.push(wait_for_workflow_request(receiver).boxed());
131
132 Actor {
133 name: definition.name.clone(),
134 futures,
135 steps_by_definition_id: HashMap::new(),
136 active_steps: Vec::new(),
137 pending_steps: Vec::new(),
138 step_inputs: StepInputs::new(),
139 step_outputs: StepOutputs::new(),
140 cached_step_media: HashMap::new(),
141 cached_inbound_media: HashMap::new(),
142 active_streams: HashMap::new(),
143 step_factory,
144 step_definitions: HashMap::new(),
145 status: WorkflowStatus::Running,
146 }
147 }
148
149 #[instrument(name = "Workflow Execution", skip(self, initial_definition), fields(workflow_name = %self.name))]
150 async fn run(mut self, initial_definition: WorkflowDefinition) {
151 info!("Starting workflow");
152
153 self.apply_new_definition(initial_definition);
154
155 while let Some(future) = self.futures.next().await {
156 match future {
157 FutureResult::AllConsumersGone => {
158 warn!("All channel owners gone");
159 break;
160 }
161
162 FutureResult::WorkflowRequestReceived(request, receiver) => {
163 self.futures
164 .push(wait_for_workflow_request(receiver).boxed());
165
166 let mut stop_workflow = false;
167 self.handle_workflow_request(request, &mut stop_workflow);
168
169 if stop_workflow {
170 break;
171 }
172 }
173
174 FutureResult::StepFutureResolved { step_id, result } => {
175 self.execute_steps(step_id, Some(result), false, true);
176 }
177 }
178 }
179
180 info!("Workflow closing");
181 }
182
183 #[instrument(skip(self, request, stop_workflow), fields(request_id = %request.request_id))]
184 fn handle_workflow_request(&mut self, request: WorkflowRequest, stop_workflow: &mut bool) {
185 match request.operation {
186 WorkflowRequestOperation::UpdateDefinition { new_definition } => {
187 self.apply_new_definition(new_definition);
188 }
189
190 WorkflowRequestOperation::GetState { response_channel } => {
191 info!("Workflow state requested by external caller");
192 let mut state = WorkflowState {
193 status: self.status.clone(),
194 pending_steps: Vec::new(),
195 active_steps: Vec::new(),
196 };
197
198 for id in &self.pending_steps {
199 if let Some(definition) = self.step_definitions.get(&id) {
200 if let Some(step) = self.steps_by_definition_id.get(&id) {
201 state.pending_steps.push(WorkflowStepState {
202 step_id: *id,
203 definition: definition.clone(),
204 status: step.get_status().clone(),
205 });
206 } else {
207 state.pending_steps.push(WorkflowStepState {
208 step_id: *id,
209 definition: definition.clone(),
210 status: StepStatus::Error {
211 message: "Step not instantiated".to_string(),
212 },
213 });
214 }
215 } else {
216 error!(step_id = %id, "No definition was found for step id {}", id);
217 }
218 }
219
220 for id in &self.active_steps {
221 if let Some(definition) = self.step_definitions.get(&id) {
222 if let Some(step) = self.steps_by_definition_id.get(&id) {
223 state.active_steps.push(WorkflowStepState {
224 step_id: *id,
225 definition: definition.clone(),
226 status: step.get_status().clone(),
227 });
228 } else {
229 state.active_steps.push(WorkflowStepState {
230 step_id: *id,
231 definition: definition.clone(),
232 status: StepStatus::Error {
233 message: "Step not instantiated".to_string(),
234 },
235 });
236 }
237 } else {
238 error!(step_id = %id, "No definition was found for step id {}", id);
239 }
240 }
241
242 let _ = response_channel.send(Some(state));
243 }
244
245 WorkflowRequestOperation::StopWorkflow => {
246 info!("Closing workflow as requested");
247 *stop_workflow = true;
248
249 for id in &self.active_steps {
250 if let Some(step) = self.steps_by_definition_id.get_mut(id) {
251 step.shutdown();
252 }
253 }
254
255 for id in &self.pending_steps {
256 if let Some(step) = self.steps_by_definition_id.get_mut(id) {
257 step.shutdown();
258 }
259 }
260 }
261
262 WorkflowRequestOperation::MediaNotification { media } => {
263 self.update_inbound_media_cache(&media);
264 self.step_inputs.clear();
265 self.step_inputs.media.push(media);
266 if let Some(id) = self.active_steps.get(0) {
267 let id = *id;
268 self.execute_steps(id, None, true, true);
269 }
270 }
271 }
272 }
273
274 fn apply_new_definition(&mut self, definition: WorkflowDefinition) {
275 let new_step_ids = definition
276 .steps
277 .iter()
278 .map(|x| x.get_id())
279 .collect::<HashSet<_>>();
280
281 if self.status == WorkflowStatus::Running
282 && self.pending_steps.is_empty()
283 && self.active_steps.len() == new_step_ids.len()
284 && self.active_steps.iter().all(|x| new_step_ids.contains(x))
285 {
286 return;
288 }
289
290 info!(
291 "Applying a new workflow definition with {} steps",
292 definition.steps.len()
293 );
294
295 if let WorkflowStatus::Error {
298 message: _,
299 failed_step_id: _,
300 } = &self.status
301 {
302 self.active_steps.clear();
303 self.steps_by_definition_id.clear();
304 self.status = WorkflowStatus::Running;
305 }
306
307 self.pending_steps.clear();
308 for step_definition in definition.steps {
309 let id = step_definition.get_id();
310 let step_type = step_definition.step_type.clone();
311 self.step_definitions
312 .insert(step_definition.get_id(), step_definition.clone());
313
314 self.pending_steps.push(id);
315
316 if !self.steps_by_definition_id.contains_key(&id) {
317 let span = span!(Level::INFO, "Step Creation", step_id = id);
318 let _enter = span.enter();
319
320 let mut details = format!("{}: ", step_definition.step_type.0);
321 for (key, value) in &step_definition.parameters {
322 match value {
323 Some(value) => details.push_str(&format!("{}={} ", key, value)),
324 None => details.push_str(&format!("{} ", key)),
325 };
326 }
327
328 info!("Creating step {}", details);
329
330 let step_result = match self.step_factory.create_step(step_definition) {
331 Ok(step_result) => step_result,
332 Err(error) => {
333 error!("Step factory failed to generate step instance: {:?}", error);
334 self.set_status_to_error(
335 id,
336 format!("Failed to generate step instance: {:?}", error),
337 );
338
339 return;
340 }
341 };
342
343 let (step, futures) = match step_result {
344 Ok((step, futures)) => (step, futures),
345 Err(error) => {
346 error!("Step could not be generated: {}", error);
347 self.set_status_to_error(id, format!("Failed to generate step: {}", error));
348
349 return;
350 }
351 };
352
353 for future in futures {
354 self.futures.push(wait_for_step_future(id, future).boxed());
355 }
356
357 self.steps_by_definition_id.insert(id, step);
358 info!("Step type '{}' created", step_type);
359 }
360 }
361
362 self.check_if_all_pending_steps_are_active(true);
363 }
364
365 fn execute_steps(
366 &mut self,
367 initial_step_id: u64,
368 future_result: Option<Box<dyn StepFutureResult>>,
369 preserve_current_step_inputs: bool,
370 perform_pending_check: bool,
371 ) {
372 if self.status != WorkflowStatus::Running {
373 return;
374 }
375
376 if !preserve_current_step_inputs {
377 self.step_inputs.clear();
378 }
379
380 self.step_outputs.clear();
381
382 if let Some(future_result) = future_result {
383 self.step_inputs.notifications.push(future_result);
384 }
385
386 let mut start_index = None;
387 for x in 0..self.active_steps.len() {
388 if self.active_steps[x] == initial_step_id {
389 start_index = Some(x);
390 break;
391 }
392 }
393
394 if let Some(start_index) = start_index {
398 for x in start_index..self.active_steps.len() {
399 self.execute_step(self.active_steps[x]);
400 }
401 } else {
402 self.execute_step(initial_step_id);
403 }
404
405 if perform_pending_check {
406 self.check_if_all_pending_steps_are_active(false);
407 }
408 }
409
410 fn execute_step(&mut self, step_id: u64) {
411 if self.status != WorkflowStatus::Running {
412 return;
413 }
414
415 let span = span!(Level::INFO, "Step Execution", step_id = step_id);
416 let _enter = span.enter();
417
418 let step = match self.steps_by_definition_id.get_mut(&step_id) {
419 Some(x) => x,
420 None => {
421 let is_active = self.active_steps.contains(&step_id);
422 error!(
423 "Attempted to execute step id {} but we it has no definition (is active: {})",
424 step_id, is_active
425 );
426
427 return;
428 }
429 };
430
431 step.execute(&mut self.step_inputs, &mut self.step_outputs);
432 if let StepStatus::Error { message } = step.get_status() {
433 let message = message.clone();
434 self.set_status_to_error(step_id, message);
435
436 return;
437 }
438
439 for future in self.step_outputs.futures.drain(..) {
440 self.futures
441 .push(wait_for_step_future(step.get_definition().get_id(), future).boxed());
442 }
443
444 self.update_stream_details(step_id);
445 self.update_media_cache_from_outputs(step_id);
446 self.step_inputs.clear();
447 self.step_inputs
448 .media
449 .extend(self.step_outputs.media.drain(..));
450
451 self.step_outputs.clear();
452 }
453
454 fn check_if_all_pending_steps_are_active(&mut self, swap_if_pending_is_empty: bool) {
455 let mut all_are_active = true;
456 for id in &self.pending_steps {
457 let step = match self.steps_by_definition_id.get(id) {
458 Some(x) => Some(x),
459 None => {
460 error!(
461 step_id = id,
462 "Workflow had step id {} pending but this step was not defined", id
463 );
464
465 let id = *id;
466 self.set_status_to_error(id, "workflow step not defined".to_string());
467 return;
468 }
469 };
470
471 if let Some(step) = step {
472 match step.get_status() {
473 StepStatus::Created => all_are_active = false,
474 StepStatus::Active => (),
475
476 StepStatus::Error { message } => {
477 let id = *id;
478 let message = message.clone();
479 self.set_status_to_error(id, message);
480 return;
481 }
482 StepStatus::Shutdown => return,
483 }
484 } else {
485 }
487 }
488
489 if (self.pending_steps.len() > 0 && all_are_active)
490 || (self.pending_steps.len() == 0 && swap_if_pending_is_empty)
491 {
492 for index in (0..self.active_steps.len()).rev() {
510 let step_id = self.active_steps[index];
511 if !self.pending_steps.contains(&step_id) {
512 info!(step_id = step_id, "Removing now unused step id {}", step_id);
518 self.step_definitions.remove(&step_id);
519 if let Some(mut step) = self.steps_by_definition_id.remove(&step_id) {
520 let span = span!(Level::INFO, "Step Shutdown", step_id = %step_id);
521 let _enter = span.enter();
522 step.shutdown();
523 }
524
525 if let Some(cache) = self.cached_step_media.remove(&step_id) {
526 for key in cache.keys() {
527 if let Some(stream) = self.active_streams.get(key) {
528 if stream.originating_step_id == step_id {
529 for x in (index + 1)..self.active_steps.len() {
530 self.step_outputs.clear();
531 self.step_inputs.clear();
532 self.step_inputs.media.push(MediaNotification {
533 stream_id: key.clone(),
534 content: MediaNotificationContent::StreamDisconnected,
535 });
536
537 self.execute_step(self.active_steps[x]);
538 }
539
540 self.active_streams.remove(key);
541 }
542 }
543 }
544 }
545 }
546 }
547
548 for index in 0..self.pending_steps.len() {
552 let current_step_id = self.pending_steps[index];
553 if !self.active_steps.contains(¤t_step_id) {
554 let notifications = if index == 0 {
556 self.cached_inbound_media
558 .values()
559 .flatten()
560 .map(|x| x.clone())
561 .collect::<Vec<_>>()
562 } else {
563 let previous_step_id = self.pending_steps[index - 1];
564 if let Some(cache) = self.cached_step_media.get(&previous_step_id) {
565 cache
566 .values()
567 .flatten()
568 .map(|x| x.clone())
569 .collect::<Vec<_>>()
570 } else {
571 Vec::new()
572 }
573 };
574
575 self.step_inputs.clear();
576 self.step_inputs.media.extend(notifications);
577 self.execute_steps(current_step_id, None, true, false);
578
579 }
589 }
590
591 std::mem::swap(&mut self.pending_steps, &mut self.active_steps);
592 self.pending_steps.clear();
593
594 info!("All pending steps moved to active");
595 }
596 }
597
598 fn update_stream_details(&mut self, current_step_id: u64) {
599 for media in &self.step_outputs.media {
600 match &media.content {
601 MediaNotificationContent::Video { .. } => (),
602 MediaNotificationContent::Audio { .. } => (),
603 MediaNotificationContent::Metadata { .. } => (),
604 MediaNotificationContent::NewIncomingStream { .. } => {
605 if !self.active_streams.contains_key(&media.stream_id) {
606 self.active_streams.insert(
610 media.stream_id.clone(),
611 StreamDetails {
612 originating_step_id: current_step_id,
613 },
614 );
615 }
616 }
617
618 MediaNotificationContent::StreamDisconnected => {
619 if let Some(details) = self.active_streams.get(&media.stream_id) {
620 if details.originating_step_id == current_step_id {
621 self.active_streams.remove(&media.stream_id);
622 }
623 }
624 }
625 }
626 }
627 }
628
629 fn update_inbound_media_cache(&mut self, media: &MediaNotification) {
630 match media.content {
631 MediaNotificationContent::NewIncomingStream { .. } => {
632 let collection = vec![media.clone()];
633 self.cached_inbound_media
634 .insert(media.stream_id.clone(), collection);
635 }
636
637 MediaNotificationContent::StreamDisconnected => {
638 self.cached_inbound_media.remove(&media.stream_id);
639 }
640
641 MediaNotificationContent::Audio {
642 is_sequence_header: true,
643 ..
644 } => {
645 if let Some(collection) = self.cached_inbound_media.get_mut(&media.stream_id) {
646 collection.push(media.clone());
647 }
648 }
649
650 MediaNotificationContent::Video {
651 is_sequence_header: true,
652 ..
653 } => {
654 if let Some(collectoin) = self.cached_inbound_media.get_mut(&media.stream_id) {
655 collectoin.push(media.clone());
656 }
657 }
658
659 _ => (),
660 }
661 }
662
663 fn update_media_cache_from_outputs(&mut self, step_id: u64) {
664 let step_cache = self
665 .cached_step_media
666 .entry(step_id)
667 .or_insert(HashMap::new());
668
669 for media in &self.step_outputs.media {
670 enum Operation {
671 Add,
672 Remove,
673 Ignore,
674 }
675 let operation = match &media.content {
676 MediaNotificationContent::StreamDisconnected => {
677 Operation::Remove
679 }
680
681 MediaNotificationContent::NewIncomingStream { .. } => Operation::Add,
682
683 MediaNotificationContent::Metadata { .. } => {
684 Operation::Ignore
687 }
688
689 MediaNotificationContent::Video {
690 is_sequence_header, ..
691 } => {
692 if *is_sequence_header {
694 Operation::Add
695 } else {
696 Operation::Ignore
697 }
698 }
699
700 MediaNotificationContent::Audio {
701 is_sequence_header, ..
702 } => {
703 if *is_sequence_header {
704 Operation::Add
705 } else {
706 Operation::Ignore
707 }
708 }
709 };
710
711 match operation {
712 Operation::Ignore => (),
713 Operation::Remove => {
714 step_cache.remove(&media.stream_id);
715 }
716
717 Operation::Add => {
718 let collection = step_cache
719 .entry(media.stream_id.clone())
720 .or_insert(Vec::new());
721
722 collection.push(media.clone());
723 }
724 }
725 }
726 }
727
728 fn set_status_to_error(&mut self, step_id: u64, message: String) {
729 error!(
730 "Workflow set to error state due to step id {}: {}",
731 step_id, message
732 );
733 self.status = WorkflowStatus::Error {
734 failed_step_id: step_id,
735 message,
736 };
737
738 for step_id in &self.active_steps {
739 if let Some(step) = self.steps_by_definition_id.get_mut(step_id) {
740 step.shutdown();
741 }
742 }
743
744 for step_id in &self.pending_steps {
745 if let Some(step) = self.steps_by_definition_id.get_mut(step_id) {
746 step.shutdown();
747 }
748 }
749 }
750}
751
752unsafe impl Send for Actor {}
753
754async fn wait_for_workflow_request(
755 mut receiver: UnboundedReceiver<WorkflowRequest>,
756) -> FutureResult {
757 match receiver.recv().await {
758 Some(x) => FutureResult::WorkflowRequestReceived(x, receiver),
759 None => FutureResult::AllConsumersGone,
760 }
761}
762
763async fn wait_for_step_future(
764 step_id: u64,
765 future: BoxFuture<'static, Box<dyn StepFutureResult>>,
766) -> FutureResult {
767 let result = future.await;
768 FutureResult::StepFutureResolved { step_id, result }
769}