execution_engine/workflow.rs
1//! The workflow module contains the structs and functions relating to the Workflow object
2//! and job object.
3//!
4//! The Workflow struct is a 1:1 representation of the Workflow class we have defined currently
5//! within the system. The Job struct is a representation of a Workflow when it is being processed
6//! by the execution engine.
7//!
8//! The Job struct implements the Iterator trait and this contains the logic for moving from node
9//! to node during execution.
10use std::collections::HashMap;
11use std::fmt::Debug;
12use std::iter::Iterator;
13use std::sync::Arc;
14use std::time::Instant;
15
16use eval::Expr;
17use serde::Deserialize;
18use serde_json::Value;
19use tokio::sync::mpsc::{Receiver, Sender};
20use uuid::Uuid;
21
22use crate::node::{types::*, Node};
23use crate::reactor::Event;
24
25/// A pointer indicates which other nodes a node is pointing to.
26///
27/// This includes the ID of the node it is pointing to and an expression (if there is one)
28/// that must be evaluated for the pointer to be followed.
29///
30#[derive(Deserialize, Debug, Clone, PartialEq)]
31#[serde(rename_all = "camelCase")]
32pub struct Pointer {
33 /// ID of the node that is being pointed to
34 pub points_to: String,
35 /// The expression that must be evaluated to true if the pointer is to be followed
36 expression: Option<String>,
37}
38
39/// A generic Parameter which is a 1:1 representation of a Parameter in the existing system.
40///
41/// This is used to represent objects in context.
42#[derive(Deserialize, Debug, Clone, PartialEq)]
43#[serde(rename_all = "camelCase")]
44pub struct Parameter {
45 /// The key of the parameter
46 pub key: String,
47 /// The type of parameter
48 #[serde(rename = "type")]
49 kind: String,
50 /// The value of the parameter, represented as a JSON value as we never actually use this
51 pub value: Value,
52}
53
54/// A struct representing each Node within a Workflow where a node is simply an item to be executed
55/// or an instruction on how to execute the Workflow.
56#[derive(Deserialize, Debug, Clone)]
57pub struct WorkflowNode {
58 /// The type of Node
59 #[serde(rename = "type")]
60 kind: WorkflowNodeType,
61 /// Unique ID for the node
62 pub id: String,
63 /// The nodes which `self` points to in the Workflow
64 pointers: Vec<Pointer>,
65 /// Any inputs associated with `self`
66 pub parameters: Option<Vec<Parameter>>,
67}
68
69/// A 1:1 representation of the Workflow class in our existing codebase.
70#[derive(Deserialize, Debug)]
71#[serde(rename_all = "camelCase")]
72pub struct Workflow {
73 /// Database ID to the Workflow
74 id: String,
75 /// ID used internally to represent the Workflow
76 workflow_id: String,
77 /// User who "owns" the Workflow
78 associated_user_id: String,
79 /// The ID of the Project that the Workflow resides in
80 project_id: String,
81 /// The detail of the Workflow
82 workflow: Vec<WorkflowNode>,
83}
84
85/// The status representing the state any Node can find itself in
86#[derive(PartialEq, Deserialize, Debug)]
87pub enum WorkflowNodeStatus {
88 /// The default state - indicates the node has not been run yet
89 NotProcessed,
90 /// The node is queued for execution but something is blocking it from running
91 Pending,
92 /// The node is currently being executed
93 Processing,
94 /// The node was executed unsuccessfully as something caused an error to occur
95 Failed,
96 /// The node was executed successfully
97 Success,
98 /// An activity is unable to be executed as the Bot is currently busy executing another process
99 BotBusy,
100 /// The Bot is executing the process and we are waiting for a response
101 AwaitingBotResponse,
102 /// The node has been cancelled via external signal
103 Cancelled,
104}
105
106/// Definition for each type of node that can appear in a Workflow/Job
107#[derive(PartialEq, Deserialize, Debug, Clone)]
108pub enum WorkflowNodeType {
109 /// The node which indicates where to start execution from
110 Start,
111 /// Indicates the job must run all branches in a parallel manner until the closing parallel
112 /// gate
113 Parallel,
114 /// Indicates a conditional logic point where all paths where the expression == true must
115 /// be executed, even if this results in parallel behaviour if multiple branches are true
116 Exclusive,
117 /// Represents an activity to be executed on a Bot
118 Activity,
119 /// Indicates a RESTful HTTP call is to be made
120 Trigger,
121 /// Signals the End of the job, the job must stop when an End node is encountered
122 End,
123}
124
125/// A definition for the current running state of a Job struct.
126#[derive(PartialEq, Deserialize, Debug, Clone)]
127pub enum JobStatus {
128 /// The default state, indicates that the job has been created but not started yet
129 NotStarted,
130 /// This state indicates the job is currently being run by a process
131 Processing,
132 /// The job has encountered an End node and has run to completion or errored
133 Finished,
134}
135
136/// Job represents the workflow whilst it is running.
137///
138/// When a job is created from a workflow, all of the nodes are converted from a generic Node
139/// struct to specific structs which represent only the data required for each node and each
140/// struct has it's own implementation of what it should do when it is run.
141#[derive(Debug, Clone)]
142pub struct Job {
143 /// The unique id of the current job, must be unique as it is used to identify the specific job.
144 /// This is a uuid v4 under the hood converted to a string for ease of serialization.
145 id: String,
146 /// Represents the user_id of the individual who executed the job
147 owner_id: String,
148 /// A container type to include all the data returned from each node as the job runs
149 context: Vec<Parameter>,
150 /// Current is a pointer into `self.nodes` to indicate which node we are currently on within
151 /// the job. It will start as None indicating the job has not started
152 current: Option<usize>,
153 /// Cursor map is a set of (key, value) pairs where the key is the nodeId and the value is the
154 /// list of pointers coming off that node
155 cursor_map: HashMap<String, Vec<Pointer>>,
156 /// A list of all the nodes within the job, each node shown in a workflow will appear
157 /// exactly once
158 /// These nodes are wrapped in `Arc` so they can be sent across thread boundaries safely
159 pub nodes: Vec<Arc<Box<dyn Node>>>,
160 /// The status represents the jobs running state
161 status: JobStatus,
162}
163
164impl Job {
165 /// Creates a new Job struct from a Workflow. Also requires the sender to the reactor for any
166 /// activity nodes that this will create as part of the Job struct. Will also own the Receiver
167 /// to the executor channel so nodes can send data to it.
168 /// Need to add position as property to each node
169 /// Flatten pointer map to quickly scan for a nodes dependencies
170 pub fn new(wf: &Workflow, reactor_tx: &Sender<Event>) -> (Self, Receiver<Message>) {
171 let (exec_tx, exec_rx) = tokio::sync::mpsc::channel(20);
172 let mut nodes: Vec<Arc<Box<dyn Node>>> = Vec::with_capacity(wf.workflow.len());
173 let mut cursor_map: HashMap<String, Vec<Pointer>> = HashMap::new();
174 for (i, node) in wf.workflow.iter().enumerate() {
175 match node.kind {
176 WorkflowNodeType::Start => {
177 nodes.push(Arc::new(Box::new(Start::new(node, exec_tx.clone(), i))));
178 }
179 WorkflowNodeType::Parallel => {
180 let dependencies = wf
181 .workflow
182 .iter()
183 .map(|n| &n.pointers)
184 .flatten()
185 .filter(|p| p.points_to == node.id)
186 .count();
187 //println!("{dependencies}");
188 let wrapped_deps = if dependencies == 1 {
189 None
190 } else {
191 Some(dependencies)
192 };
193 nodes.push(Arc::new(Box::new(Parallel::new(
194 node,
195 exec_tx.clone(),
196 i,
197 wrapped_deps,
198 ))));
199 }
200 WorkflowNodeType::Exclusive => {
201 nodes.push(Arc::new(Box::new(Exclusive::new(node, exec_tx.clone(), i))));
202 }
203 WorkflowNodeType::Activity => {
204 nodes.push(Arc::new(Box::new(Activity::new(
205 node,
206 reactor_tx,
207 exec_tx.clone(),
208 i,
209 ))));
210 }
211 WorkflowNodeType::Trigger => {
212 todo!()
213 }
214 WorkflowNodeType::End => {
215 nodes.push(Arc::new(Box::new(End::new(node, exec_tx.clone(), i))));
216 }
217 }
218 cursor_map.insert(node.id.clone(), node.pointers.clone());
219 }
220 (
221 Job {
222 id: Uuid::new_v4().to_string(),
223 owner_id: wf.associated_user_id.clone(),
224 context: vec![],
225 current: None,
226 cursor_map,
227 nodes,
228 status: JobStatus::NotStarted,
229 },
230 exec_rx,
231 )
232 }
233
234 /// This will return the next nodes being pointed to by the one that has just completed
235 ///
236 /// This takes an optional pointer to the node which has just completed and then based off
237 /// that will return the nodes which are pointed to from the one that has just completed.
238 ///
239 /// If we are at the start of the job then we can pass in `None` to signify this and it will
240 /// return the Start node.
241 ///
242 /// This will return more than one node in the cases where multiple nodes are pointed to from the
243 /// node that just completed. This covers cases such as opening parallel nodes and exclusives which
244 /// can point to multiple nodes.
245 ///
246 /// This will return None when it has reached an End node to signify there are no more nodes to
247 /// be run.
248 pub fn next_node(&self, pointer: Option<usize>) -> Option<Vec<Arc<Box<dyn Node>>>> {
249 if let Some(ptr) = pointer {
250 let current = &**self.nodes.get(ptr)?;
251 let points_to = self.cursor_map.get(current.id())?;
252 let mut next_nodes: Vec<Arc<Box<dyn Node>>> = vec![];
253 for path in points_to {
254 if let Some(expression) = &path.expression {
255 if !Expr::new(expression)
256 .exec()
257 .expect("Unable to evaluate expression")
258 .is_boolean()
259 {
260 continue;
261 }
262 }
263 next_nodes.push(
264 self.nodes
265 .iter()
266 .find(|x| path.points_to == x.id())?
267 .clone(),
268 )
269 }
270 if next_nodes.is_empty() && current.kind() == WorkflowNodeType::End {
271 None
272 } else {
273 Some(next_nodes)
274 }
275 } else {
276 Some(vec![
277 self.nodes
278 .iter()
279 .find(|x| x.kind() == WorkflowNodeType::Start)?
280 .clone();
281 1
282 ])
283 }
284 }
285}
286
287/// Indicates whether a node has succeeded or failed
288pub enum NodeStatus {
289 Success,
290 Failed,
291}
292
293/// Message is the struct transmitted to the executor to signal the job can make progress
294pub struct Message {
295 /// Index of node sending message
296 pub pointer: usize,
297 /// Status of the node
298 pub status: NodeStatus,
299 /// List of context sent back
300 pub context: Vec<Parameter>,
301}