execution_engine/
workflow.rs

1//! The workflow module contains the structs and functions relating to the Workflow object
2//! and job object.
3//!
4//! The Workflow struct is a 1:1 representation of the Workflow class we have defined currently
5//! within the system. The Job struct is a representation of a Workflow when it is being processed
6//! by the execution engine.
7//!
8//! The Job struct implements the Iterator trait and this contains the logic for moving from node
9//! to node during execution.
10use std::collections::HashMap;
11use std::fmt::Debug;
12use std::iter::Iterator;
13use std::sync::Arc;
14use std::time::Instant;
15
16use eval::Expr;
17use serde::Deserialize;
18use serde_json::Value;
19use tokio::sync::mpsc::{Receiver, Sender};
20use uuid::Uuid;
21
22use crate::node::{types::*, Node};
23use crate::reactor::Event;
24
25/// A pointer indicates which other nodes a node is pointing to.
26///
27/// This includes the ID of the node it is pointing to and an expression (if there is one)
28/// that must be evaluated for the pointer to be followed.
29///
30#[derive(Deserialize, Debug, Clone, PartialEq)]
31#[serde(rename_all = "camelCase")]
32pub struct Pointer {
33    /// ID of the node that is being pointed to
34    pub points_to: String,
35    /// The expression that must be evaluated to true if the pointer is to be followed
36    expression: Option<String>,
37}
38
39/// A generic Parameter which is a 1:1 representation of a Parameter in the existing system.
40///
41/// This is used to represent objects in context.
42#[derive(Deserialize, Debug, Clone, PartialEq)]
43#[serde(rename_all = "camelCase")]
44pub struct Parameter {
45    /// The key of the parameter
46    pub key: String,
47    /// The type of parameter
48    #[serde(rename = "type")]
49    kind: String,
50    /// The value of the parameter, represented as a JSON value as we never actually use this
51    pub value: Value,
52}
53
54/// A struct representing each Node within a Workflow where a node is simply an item to be executed
55/// or an instruction on how to execute the Workflow.
56#[derive(Deserialize, Debug, Clone)]
57pub struct WorkflowNode {
58    /// The type of Node
59    #[serde(rename = "type")]
60    kind: WorkflowNodeType,
61    /// Unique ID for the node
62    pub id: String,
63    /// The nodes which `self` points to in the Workflow
64    pointers: Vec<Pointer>,
65    /// Any inputs associated with `self`
66    pub parameters: Option<Vec<Parameter>>,
67}
68
69/// A 1:1 representation of the Workflow class in our existing codebase.
70#[derive(Deserialize, Debug)]
71#[serde(rename_all = "camelCase")]
72pub struct Workflow {
73    /// Database ID to the Workflow
74    id: String,
75    /// ID used internally to represent the Workflow
76    workflow_id: String,
77    /// User who "owns" the Workflow
78    associated_user_id: String,
79    /// The ID of the Project that the Workflow resides in
80    project_id: String,
81    /// The detail of the Workflow
82    workflow: Vec<WorkflowNode>,
83}
84
85/// The status representing the state any Node can find itself in
86#[derive(PartialEq, Deserialize, Debug)]
87pub enum WorkflowNodeStatus {
88    /// The default state - indicates the node has not been run yet
89    NotProcessed,
90    /// The node is queued for execution but something is blocking it from running
91    Pending,
92    /// The node is currently being executed
93    Processing,
94    /// The node was executed unsuccessfully as something caused an error to occur
95    Failed,
96    /// The node was executed successfully
97    Success,
98    /// An activity is unable to be executed as the Bot is currently busy executing another process
99    BotBusy,
100    /// The Bot is executing the process and we are waiting for a response
101    AwaitingBotResponse,
102    /// The node has been cancelled via external signal
103    Cancelled,
104}
105
106/// Definition for each type of node that can appear in a Workflow/Job
107#[derive(PartialEq, Deserialize, Debug, Clone)]
108pub enum WorkflowNodeType {
109    /// The node which indicates where to start execution from
110    Start,
111    /// Indicates the job must run all branches in a parallel manner until the closing parallel
112    /// gate
113    Parallel,
114    /// Indicates a conditional logic point where all paths where the expression == true must
115    /// be executed, even if this results in parallel behaviour if multiple branches are true
116    Exclusive,
117    /// Represents an activity to be executed on a Bot
118    Activity,
119    /// Indicates a RESTful HTTP call is to be made
120    Trigger,
121    /// Signals the End of the job, the job must stop when an End node is encountered
122    End,
123}
124
125/// A definition for the current running state of a Job struct.
126#[derive(PartialEq, Deserialize, Debug, Clone)]
127pub enum JobStatus {
128    /// The default state, indicates that the job has been created but not started yet
129    NotStarted,
130    /// This state indicates the job is currently being run by a process
131    Processing,
132    /// The job has encountered an End node and has run to completion or errored
133    Finished,
134}
135
136/// Job represents the workflow whilst it is running.
137///
138/// When a job is created from a workflow, all of the nodes are converted from a generic Node
139/// struct to specific structs which represent only the data required for each node and each
140/// struct has it's own implementation of what it should do when it is run.
141#[derive(Debug, Clone)]
142pub struct Job {
143    /// The unique id of the current job, must be unique as it is used to identify the specific job.
144    /// This is a uuid v4 under the hood converted to a string for ease of serialization.
145    id: String,
146    /// Represents the user_id of the individual who executed the job
147    owner_id: String,
148    /// A container type to include all the data returned from each node as the job runs
149    context: Vec<Parameter>,
150    /// Current is a pointer into `self.nodes` to indicate which node we are currently on within
151    /// the job. It will start as None indicating the job has not started
152    current: Option<usize>,
153    /// Cursor map is a set of (key, value) pairs where the key is the nodeId and the value is the
154    /// list of pointers coming off that node
155    cursor_map: HashMap<String, Vec<Pointer>>,
156    /// A list of all the nodes within the job, each node shown in a workflow will appear
157    /// exactly once
158    /// These nodes are wrapped in `Arc` so they can be sent across thread boundaries safely
159    pub nodes: Vec<Arc<Box<dyn Node>>>,
160    /// The status represents the jobs running state
161    status: JobStatus,
162}
163
164impl Job {
165    /// Creates a new Job struct from a Workflow. Also requires the sender to the reactor for any
166    /// activity nodes that this will create as part of the Job struct. Will also own the Receiver
167    /// to the executor channel so nodes can send data to it.
168    /// Need to add position as property to each node
169    /// Flatten pointer map to quickly scan for a nodes dependencies
170    pub fn new(wf: &Workflow, reactor_tx: &Sender<Event>) -> (Self, Receiver<Message>) {
171        let (exec_tx, exec_rx) = tokio::sync::mpsc::channel(20);
172        let mut nodes: Vec<Arc<Box<dyn Node>>> = Vec::with_capacity(wf.workflow.len());
173        let mut cursor_map: HashMap<String, Vec<Pointer>> = HashMap::new();
174        for (i, node) in wf.workflow.iter().enumerate() {
175            match node.kind {
176                WorkflowNodeType::Start => {
177                    nodes.push(Arc::new(Box::new(Start::new(node, exec_tx.clone(), i))));
178                }
179                WorkflowNodeType::Parallel => {
180                    let dependencies = wf
181                        .workflow
182                        .iter()
183                        .map(|n| &n.pointers)
184                        .flatten()
185                        .filter(|p| p.points_to == node.id)
186                        .count();
187                    //println!("{dependencies}");
188                    let wrapped_deps = if dependencies == 1 {
189                        None
190                    } else {
191                        Some(dependencies)
192                    };
193                    nodes.push(Arc::new(Box::new(Parallel::new(
194                        node,
195                        exec_tx.clone(),
196                        i,
197                        wrapped_deps,
198                    ))));
199                }
200                WorkflowNodeType::Exclusive => {
201                    nodes.push(Arc::new(Box::new(Exclusive::new(node, exec_tx.clone(), i))));
202                }
203                WorkflowNodeType::Activity => {
204                    nodes.push(Arc::new(Box::new(Activity::new(
205                        node,
206                        reactor_tx,
207                        exec_tx.clone(),
208                        i,
209                    ))));
210                }
211                WorkflowNodeType::Trigger => {
212                    todo!()
213                }
214                WorkflowNodeType::End => {
215                    nodes.push(Arc::new(Box::new(End::new(node, exec_tx.clone(), i))));
216                }
217            }
218            cursor_map.insert(node.id.clone(), node.pointers.clone());
219        }
220        (
221            Job {
222                id: Uuid::new_v4().to_string(),
223                owner_id: wf.associated_user_id.clone(),
224                context: vec![],
225                current: None,
226                cursor_map,
227                nodes,
228                status: JobStatus::NotStarted,
229            },
230            exec_rx,
231        )
232    }
233
234    /// This will return the next nodes being pointed to by the one that has just completed
235    ///
236    /// This takes an optional pointer to the node which has just completed and then based off
237    /// that will return the nodes which are pointed to from the one that has just completed.
238    ///
239    /// If we are at the start of the job then we can pass in `None` to signify this and it will
240    /// return the Start node.
241    ///
242    /// This will return more than one node in the cases where multiple nodes are pointed to from the
243    /// node that just completed. This covers cases such as opening parallel nodes and exclusives which
244    /// can point to multiple nodes.
245    ///
246    /// This will return None when it has reached an End node to signify there are no more nodes to
247    /// be run.
248    pub fn next_node(&self, pointer: Option<usize>) -> Option<Vec<Arc<Box<dyn Node>>>> {
249        if let Some(ptr) = pointer {
250            let current = &**self.nodes.get(ptr)?;
251            let points_to = self.cursor_map.get(current.id())?;
252            let mut next_nodes: Vec<Arc<Box<dyn Node>>> = vec![];
253            for path in points_to {
254                if let Some(expression) = &path.expression {
255                    if !Expr::new(expression)
256                        .exec()
257                        .expect("Unable to evaluate expression")
258                        .is_boolean()
259                    {
260                        continue;
261                    }
262                }
263                next_nodes.push(
264                    self.nodes
265                        .iter()
266                        .find(|x| path.points_to == x.id())?
267                        .clone(),
268                )
269            }
270            if next_nodes.is_empty() && current.kind() == WorkflowNodeType::End {
271                None
272            } else {
273                Some(next_nodes)
274            }
275        } else {
276            Some(vec![
277                self.nodes
278                    .iter()
279                    .find(|x| x.kind() == WorkflowNodeType::Start)?
280                    .clone();
281                1
282            ])
283        }
284    }
285}
286
287/// Indicates whether a node has succeeded or failed
288pub enum NodeStatus {
289    Success,
290    Failed,
291}
292
293/// Message is the struct transmitted to the executor to signal the job can make progress
294pub struct Message {
295    /// Index of node sending message
296    pub pointer: usize,
297    /// Status of the node
298    pub status: NodeStatus,
299    /// List of context sent back
300    pub context: Vec<Parameter>,
301}