execution_engine/node/
types.rs

1//! Types are the concrete structs and enums that a Node is cast to
2//!
3//! Each type contains the relevant fields required for storing state during execution.
4//! Every type _must_ implement the [`Node`](super::Node) trait.
5//!
6//! As a minimum a Node type must include the following fields:
7//! ```
8//! struct NewNode {
9//!     id: String  // The unique identifier for the node
10//!     job_channel: Sender<workflow::Message> // The channel to notify the job of completion
11//!     position: usize // A pointer to the node's position in the Job.nodes collection
12//! }
13//! ```
14
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::task::{Context, Poll};
19
20use async_trait::async_trait;
21use tokio::sync::mpsc::Sender;
22
23use crate::reactor::Event;
24use crate::workflow;
25use crate::workflow::{Parameter, WorkflowNode, WorkflowNodeType};
26
27use super::{Node, NodeError};
28
29/// Start Node concrete type
30#[derive(Clone, Debug)]
31pub struct Start {
32    id: String,
33    job_channel: Sender<workflow::Message>,
34    position: usize,
35}
36
37/// End Node concrete type
38#[derive(Clone, Debug)]
39pub struct End {
40    id: String,
41    job_channel: Sender<workflow::Message>,
42    position: usize,
43}
44
45/// Activity Node concrete type
46#[derive(Clone, Debug)]
47pub struct Activity {
48    id: String,
49    activity_id: String,
50    description: String,
51    fail_on_error: bool,
52    waker_tx: Sender<Event>,
53    job_channel: Sender<workflow::Message>,
54    position: usize,
55}
56
57/// Parallel node concrete type
58///
59/// Contains two variants, `Opening` and `Closing`, this is because each one behaves in a different
60/// way when run. The `Closing` variant of a parallel must hold execution until all pointers into it
61/// have been resolved whereas an `Opening` variant does nothing.
62#[derive(Debug)]
63pub enum Parallel {
64    /// No special behaviour required for Opening, simply completes with no bespoke behaviour
65    Opening {
66        id: String,
67        job_channel: Sender<workflow::Message>,
68        position: usize,
69    },
70    /// Keeps track of how many times it is pointed to and how many times it has been run. We can
71    /// only proceed once it has been run as many times as it has pointers to it. The
72    /// `dependencies_met` count is reset every time the node completes, this is in case we loop
73    /// back to this node
74    Closing {
75        id: String,
76        job_channel: Sender<workflow::Message>,
77        position: usize,
78        /// How many times it is pointed to
79        dependencies: AtomicUsize,
80        /// How many times it has been run
81        dependencies_met: AtomicUsize, //todo: this must be set once dependencies == dependencies_met
82    },
83}
84
85impl Parallel {
86    pub(crate) fn new(
87        wf: &WorkflowNode,
88        job_channel: Sender<workflow::Message>,
89        position: usize,
90        dependencies: Option<usize>,
91    ) -> Self {
92        match dependencies {
93            None => Parallel::Opening {
94                id: wf.id.to_string(),
95                job_channel,
96                position,
97            },
98            Some(dependencies) => Parallel::Closing {
99                id: wf.id.to_string(),
100                job_channel,
101                position,
102                dependencies: AtomicUsize::new(dependencies),
103                dependencies_met: AtomicUsize::new(0),
104            },
105        }
106    }
107}
108
109/// Exclusive node concrete type
110///
111/// The exclusive node itself doesn't do any bespoke execution logic as it's the [`Job.next_node`](crate::workflow::Job.next_node) method
112/// which handles any expressions present on pointers.
113#[derive(Clone, Debug)]
114pub struct Exclusive {
115    id: String,
116    job_channel: Sender<workflow::Message>,
117    position: usize,
118}
119
120/// Used to store the state of the ActivityFuture
121enum FutureStatus {
122    Start,
123    RequestSent,
124}
125
126/// Type which implements Future for an Activity
127struct ActivityFuture {
128    activity: Activity,
129    state: FutureStatus,
130    outputs: Vec<Parameter>,
131}
132
133impl ActivityFuture {
134    fn new(activity: &Activity) -> Self {
135        ActivityFuture {
136            activity: activity.clone(),
137            state: FutureStatus::Start,
138            outputs: vec![],
139        }
140    }
141}
142
143/// We have hand coded a future here but this could easily be awaiting a channel instead
144/// This has simply been done as a learning exercise more than anything else.
145///
146/// todo: We should compare the performance of this vs awaiting a channel
147impl Future for ActivityFuture {
148    type Output = Vec<Parameter>;
149
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        let me = self.get_mut();
152        match me.state {
153            FutureStatus::Start => {
154                let waker = cx.waker().clone();
155                let tx = me.activity.waker_tx.clone();
156                let event = Event::Activity {
157                    node_id: uuid::Uuid::new_v4().to_string(),
158                    activity_id: me.activity.activity_id.clone(),
159                    waker,
160                };
161                tx.try_send(event)
162                    .expect("Unable to send error! Failing workflow");
163
164                me.state = FutureStatus::RequestSent;
165                Poll::Pending
166            }
167            FutureStatus::RequestSent => Poll::Ready(me.outputs.clone()),
168        }
169    }
170}
171
172impl Start {
173    pub fn new(wf: &WorkflowNode, job_channel: Sender<workflow::Message>, position: usize) -> Self {
174        Start {
175            id: wf.id.clone(),
176            job_channel,
177            position,
178        }
179    }
180}
181
182#[async_trait]
183impl Node for Start {
184    fn kind(&self) -> WorkflowNodeType {
185        WorkflowNodeType::Start
186    }
187    fn id(&self) -> &str {
188        &self.id
189    }
190
191    fn position(&self) -> usize {
192        self.position
193    }
194
195    async fn run(&self) -> Result<(), NodeError> {
196        self.job_channel.send(self.create_msg().await).await;
197        Ok(())
198    }
199}
200
201impl End {
202    pub fn new(wf: &WorkflowNode, job_channel: Sender<workflow::Message>, position: usize) -> Self {
203        End {
204            id: wf.id.clone(),
205            job_channel,
206            position,
207        }
208    }
209}
210
211#[async_trait]
212impl Node for End {
213    fn kind(&self) -> WorkflowNodeType {
214        WorkflowNodeType::End
215    }
216    fn id(&self) -> &str {
217        &self.id
218    }
219    fn position(&self) -> usize {
220        self.position
221    }
222    async fn run(&self) -> Result<(), NodeError> {
223        let _ = self.job_channel.send(self.create_msg().await).await;
224        Ok(())
225    }
226}
227
228#[async_trait]
229impl Node for Parallel {
230    fn kind(&self) -> WorkflowNodeType {
231        WorkflowNodeType::Start
232    }
233    fn id(&self) -> &str {
234        match &self {
235            Parallel::Opening { id, .. } => id,
236            Parallel::Closing { id, .. } => id,
237        }
238    }
239
240    fn position(&self) -> usize {
241        match &self {
242            Parallel::Opening { position, .. } => *position,
243            Parallel::Closing { position, .. } => *position,
244        }
245    }
246
247    async fn run(&self) -> Result<(), NodeError> {
248        match &self {
249            Parallel::Opening { job_channel, .. } => {
250                println!("Inside Opening");
251                let _ = job_channel.send(self.create_msg().await).await;
252            }
253            Parallel::Closing {
254                dependencies,
255                dependencies_met,
256                job_channel,
257                ..
258            } => {
259                println!("Inside closing");
260                println!("Dependencies: {:#?}", dependencies);
261                println!("Dependencies met: {:#?}", dependencies_met);
262                let _ = dependencies_met.fetch_add(1, Ordering::Acquire);
263                let new = dependencies_met.load(Ordering::Relaxed);
264                if new == dependencies.load(Ordering::Acquire) {
265                    let _ = job_channel.send(self.create_msg().await).await;
266                    dependencies_met.store(0, Ordering::Relaxed);
267                }
268            }
269        }
270        Ok(())
271    }
272}
273
274impl Activity {
275    pub fn new(
276        wf: &WorkflowNode,
277        waker_channel: &Sender<Event>,
278        job_channel: Sender<workflow::Message>,
279        position: usize,
280    ) -> Self {
281        let mut activity_id: Option<String> = None;
282        let mut description: Option<String> = None;
283        let mut fail_on_error: Option<bool> = None;
284        for param in wf.parameters.as_ref().unwrap().iter() {
285            match param.key.to_lowercase().as_ref() {
286                "activityid" => activity_id = Some(param.value.as_str().unwrap().to_string()),
287                "description" => description = Some(param.value.as_str().unwrap().to_string()),
288                "failonerror" => fail_on_error = Some(param.value.as_bool().unwrap()),
289                _ => continue,
290            }
291        }
292        Activity {
293            id: wf.id.clone(),
294            activity_id: activity_id.unwrap(),
295            description: description.unwrap(),
296            fail_on_error: fail_on_error.unwrap(),
297            waker_tx: waker_channel.clone(),
298            job_channel,
299            position,
300        }
301    }
302}
303
304#[async_trait]
305impl Node for Activity {
306    fn kind(&self) -> WorkflowNodeType {
307        WorkflowNodeType::Activity
308    }
309    fn id(&self) -> &str {
310        &self.id
311    }
312
313    fn position(&self) -> usize {
314        self.position
315    }
316
317    async fn execute(&self) -> Result<Vec<workflow::Parameter>, NodeError> {
318        let future = ActivityFuture::new(self);
319        let outputs = future.await;
320        Ok(outputs)
321    }
322
323    async fn run(&self) -> Result<(), NodeError> {
324        let _ = self.job_channel.send(self.create_msg().await).await;
325        Ok(())
326    }
327}
328
329impl Exclusive {
330    pub fn new(wf: &WorkflowNode, job_channel: Sender<workflow::Message>, position: usize) -> Self {
331        Exclusive {
332            id: wf.id.clone(),
333            job_channel,
334            position,
335        }
336    }
337}
338#[async_trait]
339impl Node for Exclusive {
340    fn kind(&self) -> WorkflowNodeType {
341        WorkflowNodeType::Exclusive
342    }
343
344    fn id(&self) -> &str {
345        &self.id
346    }
347
348    fn position(&self) -> usize {
349        self.position
350    }
351
352    async fn run(&self) -> Result<(), NodeError> {
353        let _ = self.job_channel.send(self.create_msg().await).await;
354        Ok(())
355    }
356}