mcai_models 0.9.1

Models for Media Cloud AI project
Documentation
use super::{NotificationHook, StartParameter, Step, WorkflowDefinition, WorkflowInstance};
use crate::common::Job;
use mcai_graph::{Graph, GraphConfiguration, ToGraph};
use semver::Version;
use std::collections::HashMap;

#[derive(PartialEq)]
pub enum SchemaVersion {
  _1_8,
  _1_9,
  _1_10,
  _1_11,
}

#[derive(Clone, PartialEq)]
pub enum Workflow {
  Definition(WorkflowDefinition),
  Instance(WorkflowInstance),
}

impl Workflow {
  pub fn is_definition(&self) -> bool {
    match self {
      Workflow::Definition(_) => true,
      Workflow::Instance(_) => false,
    }
  }

  pub fn schema_version(&self) -> SchemaVersion {
    match self {
      Workflow::Definition(workflow) => workflow.schema_version(),
      Workflow::Instance(workflow) => workflow.schema_version(),
    }
  }

  pub fn identifier(&self) -> &str {
    match self {
      Workflow::Definition(workflow) => workflow.identifier(),
      Workflow::Instance(workflow) => &workflow.identifier,
    }
  }

  pub fn label(&self) -> &str {
    match self {
      Workflow::Definition(workflow) => workflow.label(),
      Workflow::Instance(workflow) => &workflow.label,
    }
  }

  pub fn version(&self) -> Version {
    match self {
      Workflow::Definition(workflow) => workflow.version(),
      Workflow::Instance(workflow) => Version::new(
        workflow.version_major as u64,
        workflow.version_minor as u64,
        workflow.version_micro as u64,
      ),
    }
  }

  pub fn is_live(&self) -> bool {
    match self {
      Workflow::Definition(workflow) => workflow.is_live(),
      Workflow::Instance(workflow) => workflow.is_live,
    }
  }

  pub fn tags(&self) -> &Vec<String> {
    match self {
      Workflow::Definition(workflow) => workflow.tags(),
      Workflow::Instance(workflow) => &workflow.tags,
    }
  }

  pub fn reference(&self) -> Option<String> {
    match self {
      Workflow::Definition(_) => None,
      Workflow::Instance(workflow) => workflow.reference.clone(),
    }
  }

  pub fn jobs(&self) -> Option<&Vec<Job>> {
    match self {
      Workflow::Definition(_) => None,
      Workflow::Instance(workflow) => Some(&workflow.jobs),
    }
  }

  pub fn steps(&self) -> &Vec<Step> {
    match self {
      Workflow::Definition(workflow) => workflow.steps(),
      Workflow::Instance(workflow) => &workflow.steps,
    }
  }

  pub fn get_start_parameters(&self) -> &Vec<StartParameter> {
    match self {
      Workflow::Definition(workflow) => workflow.get_start_parameters(),
      Workflow::Instance(workflow) => &workflow.start_parameters,
    }
  }

  pub fn get_notification_hooks(&self) -> Option<&Vec<NotificationHook>> {
    match self {
      Workflow::Definition(workflow) => workflow.get_notification_hooks(),
      Workflow::Instance(_workflow) => None,
    }
  }

  pub fn update_steps_coordinates_from_graph(&mut self, graph: &Graph) {
    if let Workflow::Definition(workflow_definition) = self {
      for (id, node) in graph.nodes() {
        if let Some(step) = workflow_definition.get_mut_step(id) {
          step.coordinates = Some(node.borrow().coordinates().clone());
        }
        continue;
      }
    }
  }
}

impl ToGraph for Workflow {
  fn to_graph(&self, configuration: GraphConfiguration) -> Graph {
    let steps = self.steps().clone();

    let mut graph = Graph::new(configuration.clone());

    let mut nodes = HashMap::with_capacity(steps.len());
    let mut node_level = HashMap::with_capacity(steps.len());

    let mut level = 0;
    let mut column = 0;

    for step in self.steps().iter() {
      // Add node from step coordinates
      if let Some(coordinates) = &step.coordinates {
        let node = graph.add_node(step.id, coordinates.clone());
        nodes.insert(step.id, node);
        node_level.insert(step.id, level);
        continue;
      }

      // Add node from step without parents
      if step.parent_ids.is_empty() {
        let node = graph.add_node(
          step.id,
          configuration
            .node_configuration()
            .get_coordinates(level, column),
        );
        nodes.insert(step.id, node);
        node_level.insert(step.id, level);

        column += 1;
      }
    }

    loop {
      column = 0;
      if nodes.len() == steps.len() {
        break;
      }
      level += 1;

      // Add nodes from step with parents
      for step in steps.iter() {
        if nodes.contains_key(&step.id) {
          continue;
        }

        if let Some(latest_level) = step
          .parent_ids
          .iter()
          .filter_map(|parent_id| node_level.get(parent_id))
          .max()
        {
          let step_level = latest_level + 1;
          if step_level == level {
            let node = graph.add_node(
              step.id,
              configuration
                .node_configuration()
                .get_coordinates(step_level, column),
            );

            nodes.insert(step.id, node);
            node_level.insert(step.id, step_level);

            column += 1;
          }
        }
      }
    }

    // Set step parents and required steps
    for step in steps.iter() {
      let parents = step
        .parent_ids
        .iter()
        .filter_map(|parent_id| nodes.get(parent_id).cloned())
        .collect();

      let required_to_start = step
        .required_to_start
        .iter()
        .filter_map(|required_step_id| nodes.get(required_step_id).cloned())
        .collect();

      if let Some(node) = nodes.get(&step.id) {
        node.borrow_mut().set_parents(parents);
        node.borrow_mut().set_required_nodes(required_to_start);
      }
    }

    graph
  }
}