1use std::future::Future;
16use std::pin::Pin;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::task::{Context, Poll};
19
20use async_trait::async_trait;
21use tokio::sync::mpsc::Sender;
22
23use crate::reactor::Event;
24use crate::workflow;
25use crate::workflow::{Parameter, WorkflowNode, WorkflowNodeType};
26
27use super::{Node, NodeError};
28
29#[derive(Clone, Debug)]
31pub struct Start {
32 id: String,
33 job_channel: Sender<workflow::Message>,
34 position: usize,
35}
36
37#[derive(Clone, Debug)]
39pub struct End {
40 id: String,
41 job_channel: Sender<workflow::Message>,
42 position: usize,
43}
44
45#[derive(Clone, Debug)]
47pub struct Activity {
48 id: String,
49 activity_id: String,
50 description: String,
51 fail_on_error: bool,
52 waker_tx: Sender<Event>,
53 job_channel: Sender<workflow::Message>,
54 position: usize,
55}
56
57#[derive(Debug)]
63pub enum Parallel {
64 Opening {
66 id: String,
67 job_channel: Sender<workflow::Message>,
68 position: usize,
69 },
70 Closing {
75 id: String,
76 job_channel: Sender<workflow::Message>,
77 position: usize,
78 dependencies: AtomicUsize,
80 dependencies_met: AtomicUsize, },
83}
84
85impl Parallel {
86 pub(crate) fn new(
87 wf: &WorkflowNode,
88 job_channel: Sender<workflow::Message>,
89 position: usize,
90 dependencies: Option<usize>,
91 ) -> Self {
92 match dependencies {
93 None => Parallel::Opening {
94 id: wf.id.to_string(),
95 job_channel,
96 position,
97 },
98 Some(dependencies) => Parallel::Closing {
99 id: wf.id.to_string(),
100 job_channel,
101 position,
102 dependencies: AtomicUsize::new(dependencies),
103 dependencies_met: AtomicUsize::new(0),
104 },
105 }
106 }
107}
108
109#[derive(Clone, Debug)]
114pub struct Exclusive {
115 id: String,
116 job_channel: Sender<workflow::Message>,
117 position: usize,
118}
119
120enum FutureStatus {
122 Start,
123 RequestSent,
124}
125
126struct ActivityFuture {
128 activity: Activity,
129 state: FutureStatus,
130 outputs: Vec<Parameter>,
131}
132
133impl ActivityFuture {
134 fn new(activity: &Activity) -> Self {
135 ActivityFuture {
136 activity: activity.clone(),
137 state: FutureStatus::Start,
138 outputs: vec![],
139 }
140 }
141}
142
143impl Future for ActivityFuture {
148 type Output = Vec<Parameter>;
149
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 let me = self.get_mut();
152 match me.state {
153 FutureStatus::Start => {
154 let waker = cx.waker().clone();
155 let tx = me.activity.waker_tx.clone();
156 let event = Event::Activity {
157 node_id: uuid::Uuid::new_v4().to_string(),
158 activity_id: me.activity.activity_id.clone(),
159 waker,
160 };
161 tx.try_send(event)
162 .expect("Unable to send error! Failing workflow");
163
164 me.state = FutureStatus::RequestSent;
165 Poll::Pending
166 }
167 FutureStatus::RequestSent => Poll::Ready(me.outputs.clone()),
168 }
169 }
170}
171
172impl Start {
173 pub fn new(wf: &WorkflowNode, job_channel: Sender<workflow::Message>, position: usize) -> Self {
174 Start {
175 id: wf.id.clone(),
176 job_channel,
177 position,
178 }
179 }
180}
181
182#[async_trait]
183impl Node for Start {
184 fn kind(&self) -> WorkflowNodeType {
185 WorkflowNodeType::Start
186 }
187 fn id(&self) -> &str {
188 &self.id
189 }
190
191 fn position(&self) -> usize {
192 self.position
193 }
194
195 async fn run(&self) -> Result<(), NodeError> {
196 self.job_channel.send(self.create_msg().await).await;
197 Ok(())
198 }
199}
200
201impl End {
202 pub fn new(wf: &WorkflowNode, job_channel: Sender<workflow::Message>, position: usize) -> Self {
203 End {
204 id: wf.id.clone(),
205 job_channel,
206 position,
207 }
208 }
209}
210
211#[async_trait]
212impl Node for End {
213 fn kind(&self) -> WorkflowNodeType {
214 WorkflowNodeType::End
215 }
216 fn id(&self) -> &str {
217 &self.id
218 }
219 fn position(&self) -> usize {
220 self.position
221 }
222 async fn run(&self) -> Result<(), NodeError> {
223 let _ = self.job_channel.send(self.create_msg().await).await;
224 Ok(())
225 }
226}
227
228#[async_trait]
229impl Node for Parallel {
230 fn kind(&self) -> WorkflowNodeType {
231 WorkflowNodeType::Start
232 }
233 fn id(&self) -> &str {
234 match &self {
235 Parallel::Opening { id, .. } => id,
236 Parallel::Closing { id, .. } => id,
237 }
238 }
239
240 fn position(&self) -> usize {
241 match &self {
242 Parallel::Opening { position, .. } => *position,
243 Parallel::Closing { position, .. } => *position,
244 }
245 }
246
247 async fn run(&self) -> Result<(), NodeError> {
248 match &self {
249 Parallel::Opening { job_channel, .. } => {
250 println!("Inside Opening");
251 let _ = job_channel.send(self.create_msg().await).await;
252 }
253 Parallel::Closing {
254 dependencies,
255 dependencies_met,
256 job_channel,
257 ..
258 } => {
259 println!("Inside closing");
260 println!("Dependencies: {:#?}", dependencies);
261 println!("Dependencies met: {:#?}", dependencies_met);
262 let _ = dependencies_met.fetch_add(1, Ordering::Acquire);
263 let new = dependencies_met.load(Ordering::Relaxed);
264 if new == dependencies.load(Ordering::Acquire) {
265 let _ = job_channel.send(self.create_msg().await).await;
266 dependencies_met.store(0, Ordering::Relaxed);
267 }
268 }
269 }
270 Ok(())
271 }
272}
273
274impl Activity {
275 pub fn new(
276 wf: &WorkflowNode,
277 waker_channel: &Sender<Event>,
278 job_channel: Sender<workflow::Message>,
279 position: usize,
280 ) -> Self {
281 let mut activity_id: Option<String> = None;
282 let mut description: Option<String> = None;
283 let mut fail_on_error: Option<bool> = None;
284 for param in wf.parameters.as_ref().unwrap().iter() {
285 match param.key.to_lowercase().as_ref() {
286 "activityid" => activity_id = Some(param.value.as_str().unwrap().to_string()),
287 "description" => description = Some(param.value.as_str().unwrap().to_string()),
288 "failonerror" => fail_on_error = Some(param.value.as_bool().unwrap()),
289 _ => continue,
290 }
291 }
292 Activity {
293 id: wf.id.clone(),
294 activity_id: activity_id.unwrap(),
295 description: description.unwrap(),
296 fail_on_error: fail_on_error.unwrap(),
297 waker_tx: waker_channel.clone(),
298 job_channel,
299 position,
300 }
301 }
302}
303
304#[async_trait]
305impl Node for Activity {
306 fn kind(&self) -> WorkflowNodeType {
307 WorkflowNodeType::Activity
308 }
309 fn id(&self) -> &str {
310 &self.id
311 }
312
313 fn position(&self) -> usize {
314 self.position
315 }
316
317 async fn execute(&self) -> Result<Vec<workflow::Parameter>, NodeError> {
318 let future = ActivityFuture::new(self);
319 let outputs = future.await;
320 Ok(outputs)
321 }
322
323 async fn run(&self) -> Result<(), NodeError> {
324 let _ = self.job_channel.send(self.create_msg().await).await;
325 Ok(())
326 }
327}
328
329impl Exclusive {
330 pub fn new(wf: &WorkflowNode, job_channel: Sender<workflow::Message>, position: usize) -> Self {
331 Exclusive {
332 id: wf.id.clone(),
333 job_channel,
334 position,
335 }
336 }
337}
338#[async_trait]
339impl Node for Exclusive {
340 fn kind(&self) -> WorkflowNodeType {
341 WorkflowNodeType::Exclusive
342 }
343
344 fn id(&self) -> &str {
345 &self.id
346 }
347
348 fn position(&self) -> usize {
349 self.position
350 }
351
352 async fn run(&self) -> Result<(), NodeError> {
353 let _ = self.job_channel.send(self.create_msg().await).await;
354 Ok(())
355 }
356}