snurr/process/
engine.rs

1mod bpmn_queue;
2
3use bpmn_queue::BpmnQueue;
4use log::info;
5use std::{borrow::Cow, ops::ControlFlow, sync::Arc};
6
7#[cfg(feature = "trace")]
8use crate::process::replay;
9#[cfg(feature = "trace")]
10use std::sync::mpsc::Sender;
11
12use crate::{
13    Data, Eventhandler, Process, Symbol,
14    error::{
15        AT_LEAST_TWO_OUTGOING, Error, cannot_do_events, cannot_fork, cannot_use_cond_expr,
16        cannot_use_default,
17    },
18    model::{ActivityType, Bpmn, BpmnLocal, EventType, Gateway, GatewayType, With},
19};
20
21#[derive(Debug)]
22enum Return<'a> {
23    Fork(Cow<'a, [usize]>),
24    Join(&'a Gateway),
25    End(Option<&'a Bpmn>),
26}
27
28macro_rules! maybe_fork {
29    ($self:expr, $outputs:expr, $data:expr, $ty:expr, $noi:expr) => {
30        if $outputs.len() <= 1 {
31            $outputs
32                .first()
33                .ok_or_else(|| Error::MissingOutput($ty.to_string(), $noi.to_string()))?
34        } else {
35            return Ok(Return::Fork(Cow::Borrowed($outputs.ids())));
36        }
37    };
38}
39
40impl Process {
41    pub(super) fn execute<'a, T>(&'a self, data: &ExecuteData<'a, T>) -> ExecuteResult<'a>
42    where
43        T: Send,
44    {
45        let mut queue = BpmnQueue::new(Cow::from(&[0]));
46        while let Some(tokens) = queue.pop() {
47            let num_tokens = tokens.len();
48
49            // Run flow single or multi threaded
50            let results = {
51                #[cfg(feature = "parallel")]
52                {
53                    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
54                    // Need to collect the parallel iterator first.
55                    let results: Vec<_> = tokens
56                        .par_iter()
57                        .map(|token| self.flow(token, data))
58                        .collect::<Vec<_>>();
59                    results.into_iter()
60                }
61                #[cfg(not(feature = "parallel"))]
62                tokens.iter().map(|token| self.flow(token, data))
63            };
64
65            for result in results {
66                match result {
67                    Ok(Return::Join(gateway)) => queue.join_token(gateway),
68                    // Currently only a subprocess use the end symbol and should not end with multiple tokens.
69                    Ok(Return::End(symbol @ Some(_))) if queue.is_empty() && num_tokens == 1 => {
70                        return Ok(symbol);
71                    }
72                    Ok(Return::End(_)) => queue.end_token(),
73                    Ok(Return::Fork(item)) => queue.add_pending_fork(item),
74                    Err(value) => return Err(value),
75                }
76            }
77
78            // Once all inputs have been merged for a gateway, then proceed with its outputs.
79            // The gateway vector contains all the gateways involved. Right now we are using balanced diagram
80            // and do not need to investigate further.
81            if let Some(mut gateways) = queue.tokens_consumed() {
82                if let Some(
83                    gw @ Gateway {
84                        gateway, outputs, ..
85                    },
86                ) = gateways.pop()
87                {
88                    match gateway {
89                        GatewayType::Parallel => {
90                            queue.push(Cow::Borrowed(outputs.ids()));
91                        }
92                        GatewayType::Inclusive if outputs.len() == 1 => {
93                            queue.push(Cow::Borrowed(outputs.ids()));
94                        }
95                        // Handle Fork, the user code determine next token(s) to run.
96                        GatewayType::Inclusive if outputs.len() > 1 => {
97                            match self.handle_inclusive_gateway(data, gw)? {
98                                ControlFlow::Continue(value) => {
99                                    queue.push(Cow::Owned(vec![*value]));
100                                }
101                                ControlFlow::Break(Return::Fork(value)) => {
102                                    queue.push(value);
103                                }
104                                _ => {}
105                            }
106                        }
107                        _ => {}
108                    }
109                }
110            }
111
112            // Commit pending forks
113            queue.commit_forks();
114        }
115        Ok(None)
116    }
117
118    // Each flow process one "token" and returns on a Fork, Join or End.
119    fn flow<'a: 'b, 'b, T>(
120        &'a self,
121        mut current_id: &'b usize,
122        data: &ExecuteData<'a, T>,
123    ) -> Result<Return<'a>, Error>
124    where
125        T: Send,
126    {
127        loop {
128            current_id = match data
129                .process_data
130                .get(*current_id)
131                .ok_or_else(|| Error::MisssingBpmnData(current_id.to_string()))?
132            {
133                bpmn @ Bpmn::Event {
134                    event,
135                    symbol,
136                    id: BpmnLocal(bid, _),
137                    name,
138                    outputs,
139                    ..
140                } => {
141                    let name_or_id = name.as_ref().unwrap_or(bid);
142                    info!("{}: {}", event, name_or_id);
143                    match event {
144                        EventType::Start | EventType::IntermediateCatch | EventType::Boundary => {
145                            maybe_fork!(self, outputs, data, event, name_or_id)
146                        }
147                        EventType::IntermediateThrow => {
148                            match (name.as_ref(), symbol.as_ref()) {
149                                (Some(name), Some(symbol @ Symbol::Link)) => {
150                                    self.catch_link_lookup(name, symbol, data.process_id)?
151                                }
152                                // Follow outputs for other throw events
153                                (Some(_), _) => {
154                                    maybe_fork!(self, outputs, data, event, name_or_id)
155                                }
156                                _ => Err(Error::MissingIntermediateThrowEventName(bid.into()))?,
157                            }
158                        }
159                        EventType::End => {
160                            if symbol.is_some() {
161                                return Ok(Return::End(Some(bpmn)));
162                            }
163                            break;
164                        }
165                    }
166                }
167                Bpmn::Activity {
168                    activity,
169                    id,
170                    name,
171                    outputs,
172                    ..
173                } => {
174                    let name_or_id = name.as_ref().unwrap_or(id);
175                    info!("{}: {}", activity, name_or_id);
176                    match activity {
177                        ActivityType::Task
178                        | ActivityType::ScriptTask
179                        | ActivityType::UserTask
180                        | ActivityType::ServiceTask
181                        | ActivityType::CallActivity
182                        | ActivityType::ReceiveTask
183                        | ActivityType::SendTask
184                        | ActivityType::ManualTask
185                        | ActivityType::BusinessRuleTask => {
186                            #[cfg(feature = "trace")]
187                            data.trace(replay::TASK, name_or_id)?;
188                            if let Some(boundary) =
189                                data.handler.run_task(name_or_id, data.user_data())
190                            {
191                                self.boundary_lookup(id, boundary.0, &boundary.1, data.process_data)
192                                    .ok_or_else(|| {
193                                        Error::MissingBoundary(
194                                            format!("{}", boundary),
195                                            name_or_id.into(),
196                                        )
197                                    })?
198                            } else {
199                                maybe_fork!(self, outputs, data, activity, name_or_id)
200                            }
201                        }
202                        ActivityType::SubProcess => {
203                            let sp_data = self
204                                .data
205                                .get(id)
206                                .ok_or_else(|| Error::MissingProcessData(id.into()))?;
207
208                            if let Some(Bpmn::Event {
209                                event: EventType::End,
210                                symbol: Some(symbol),
211                                name,
212                                ..
213                            }) = self.execute(&data.update(id, sp_data))?
214                            {
215                                self.boundary_lookup(id, name.as_deref(), symbol, data.process_data)
216                                    .ok_or_else(|| {
217                                        Error::MissingBoundary(
218                                            symbol.to_string(),
219                                            name_or_id.into(),
220                                        )
221                                    })?
222                            } else {
223                                // Continue from subprocess
224                                maybe_fork!(self, outputs, data, activity, name_or_id)
225                            }
226                        }
227                    }
228                }
229
230                Bpmn::Gateway(
231                    gw @ Gateway {
232                        gateway,
233                        id,
234                        name,
235                        default,
236                        outputs,
237                        inputs,
238                    },
239                ) => {
240                    let name_or_id = name.as_ref().unwrap_or(id);
241                    info!("{}: {}", gateway, name_or_id);
242                    #[cfg(feature = "trace")]
243                    data.trace(replay::GATEWAY, name_or_id)?;
244
245                    match gateway {
246                        _ if outputs.len() == 0 => {
247                            return Err(Error::MissingOutput(
248                                gateway.to_string(),
249                                name_or_id.to_string(),
250                            ));
251                        }
252                        // Handle 1 to 1, probably a temporary design or mistake
253                        _ if outputs.len() == 1 && inputs.len() == 1 => outputs.first().unwrap(),
254                        GatewayType::Exclusive if outputs.len() == 1 => outputs.first().unwrap(),
255                        GatewayType::Exclusive => {
256                            let response = data.handler.run_gateway(name_or_id, data.user_data());
257                            match response {
258                                With::Flow(value) => {
259                                    output_by_name_or_id(value, outputs.ids(), data.process_data)
260                                        .ok_or_else(|| {
261                                            Error::MissingOutput(
262                                                gateway.to_string(),
263                                                name_or_id.to_string(),
264                                            )
265                                        })?
266                                }
267                                With::Default => default_path(default, gateway, name_or_id)?,
268                                With::Fork(_) => return Err(cannot_fork(gateway)),
269                                With::Symbol(_, _) => return Err(cannot_do_events(gateway)),
270                            }
271                        }
272                        // Handle a regular Join or a JoinFork. In both cases, we need to wait for all tokens.
273                        GatewayType::Parallel | GatewayType::Inclusive if inputs.len() > 1 => {
274                            return Ok(Return::Join(gw));
275                        }
276                        GatewayType::Parallel => {
277                            return Ok(Return::Fork(Cow::Borrowed(outputs.ids())));
278                        }
279                        GatewayType::Inclusive => match self.handle_inclusive_gateway(data, gw)? {
280                            ControlFlow::Continue(value) => value,
281                            ControlFlow::Break(value) => return Ok(value),
282                        },
283                        GatewayType::EventBased if outputs.len() == 1 => {
284                            return Err(Error::BpmnRequirement(AT_LEAST_TWO_OUTGOING.into()));
285                        }
286                        GatewayType::EventBased => {
287                            let response = data.handler.run_gateway(name_or_id, data.user_data());
288                            match response {
289                                With::Symbol(_, _) => {
290                                    output_by_symbol(&response, outputs.ids(), data.process_data)
291                                        .ok_or_else(|| {
292                                            Error::MissingOutput(
293                                                gateway.to_string(),
294                                                name_or_id.to_string(),
295                                            )
296                                        })?
297                                }
298                                With::Default => return Err(cannot_use_default(gateway)),
299                                With::Flow(_) => return Err(cannot_use_cond_expr(gateway)),
300                                With::Fork(_) => return Err(cannot_fork(gateway)),
301                            }
302                        }
303                    }
304                }
305                Bpmn::SequenceFlow {
306                    id,
307                    name,
308                    target_ref,
309                    ..
310                } => {
311                    info!("SequenceFlow: {}", name.as_ref().unwrap_or(id));
312                    target_ref.local()
313                }
314                bpmn => return Err(Error::TypeNotImplemented(format!("{bpmn:?}"))),
315            };
316        }
317        Ok(Return::End(None))
318    }
319
320    fn handle_inclusive_gateway<'a, T>(
321        &'a self,
322        data: &ExecuteData<'a, T>,
323        Gateway {
324            gateway,
325            id,
326            name,
327            default,
328            outputs,
329            ..
330        }: &'a Gateway,
331    ) -> Result<ControlFlow<Return<'a>, &'a usize>, Error> {
332        let name_or_id = name.as_ref().unwrap_or(id);
333        let response = data.handler.run_gateway(name_or_id, data.user_data());
334        let value = match response {
335            With::Flow(value) => output_by_name_or_id(value, outputs.ids(), data.process_data)
336                .ok_or_else(|| Error::MissingOutput(gateway.to_string(), name_or_id.to_string()))?,
337            With::Fork(value) => {
338                if value.is_empty() {
339                    default_path(default, gateway, name_or_id)?
340                } else {
341                    let outputs = outputs.ids();
342                    let responses: Vec<_> = value
343                        .iter()
344                        .filter_map(|&response| {
345                            output_by_name_or_id(response, outputs, data.process_data)
346                        })
347                        .collect();
348
349                    if responses.len() <= 1 {
350                        *responses.first().ok_or_else(|| {
351                            Error::MissingOutput(gateway.to_string(), name_or_id.to_string())
352                        })?
353                    } else {
354                        return Ok(ControlFlow::Break(Return::Fork(Cow::Owned(
355                            responses.into_iter().cloned().collect(),
356                        ))));
357                    }
358                }
359            }
360            With::Default => default_path(default, gateway, name_or_id)?,
361            With::Symbol(_, _) => return Err(cannot_do_events(gateway)),
362        };
363        Ok(ControlFlow::Continue(value))
364    }
365
366    fn boundary_lookup<'a>(
367        &'a self,
368        activity_id: &str,
369        search_name: Option<&str>,
370        search_symbol: &Symbol,
371        process_data: &'a [Bpmn],
372    ) -> Option<&'a usize> {
373        self.boundaries
374            .get(activity_id)?
375            .iter()
376            .filter_map(|index| process_data.get(*index))
377            .filter_map(|bpmn| {
378                // TODO rewrite with let chains when stable
379                if let Bpmn::Event {
380                    symbol, id, name, ..
381                } = bpmn
382                {
383                    if symbol.as_ref() == Some(search_symbol) && search_name == name.as_deref() {
384                        return Some(id.local());
385                    }
386                }
387                None
388            })
389            .next()
390    }
391
392    // Links in specified process.
393    fn catch_link_lookup(
394        &self,
395        throw_event_name: &str,
396        symbol: &Symbol,
397        process_id: &str,
398    ) -> Result<&usize, Error> {
399        self.catch_event_links
400            .get(process_id)
401            .and_then(|links| links.get(throw_event_name))
402            .ok_or_else(|| {
403                Error::MissingIntermediateCatchEvent(symbol.to_string(), throw_event_name.into())
404            })
405    }
406}
407
408fn default_path<'a>(
409    default: &'a Option<BpmnLocal>,
410    gateway: &GatewayType,
411    name_or_id: &String,
412) -> Result<&'a usize, Error> {
413    default
414        .as_ref()
415        .map(BpmnLocal::local)
416        .ok_or_else(|| Error::MissingDefault(gateway.to_string(), name_or_id.to_string()))
417}
418
419fn output_by_symbol<'a>(
420    search: &With,
421    outputs: &'a [usize],
422    process_data: &'a [Bpmn],
423) -> Option<&'a usize> {
424    outputs.iter().find(|index| match search {
425        With::Symbol(search, search_symbol) => process_data
426            .get(**index)
427            .and_then(|bpmn| {
428                if let Bpmn::SequenceFlow { target_ref, .. } = bpmn {
429                    return process_data.get(*target_ref.local());
430                }
431                None
432            })
433            .filter(|bpmn| match bpmn {
434                // We can target both ReceiveTask or Events.
435                Bpmn::Activity {
436                    id, activity, name, ..
437                } => {
438                    activity == &ActivityType::ReceiveTask
439                        && search_symbol == &Symbol::Message
440                        && (search.filter(|&sn| sn == id).is_some() || *search == name.as_deref())
441                }
442                Bpmn::Event {
443                    id,
444                    symbol:
445                        Some(
446                            symbol @ (Symbol::Message
447                            | Symbol::Signal
448                            | Symbol::Timer
449                            | Symbol::Conditional),
450                        ),
451                    name,
452                    ..
453                } => {
454                    symbol == search_symbol
455                        && (search.filter(|&sn| sn == id.bpmn()).is_some()
456                            || *search == name.as_deref())
457                }
458                _ => false,
459            })
460            .is_some(),
461        _ => false,
462    })
463}
464
465fn output_by_name_or_id<'a>(
466    search: impl AsRef<str>,
467    outputs: &'a [usize],
468    process_data: &'a [Bpmn],
469) -> Option<&'a usize> {
470    outputs.iter().find(|index| {
471        if let Some(Bpmn::SequenceFlow { id, name, .. }) = process_data.get(**index) {
472            return name
473                .as_deref()
474                .filter(|&name| name == search.as_ref())
475                .is_some()
476                || id == search.as_ref();
477        }
478        false
479    })
480}
481
482pub(super) type ExecuteResult<'a> = Result<Option<&'a Bpmn>, Error>;
483
484// Data for the execution engine.
485pub(super) struct ExecuteData<'a, T> {
486    process_data: &'a Vec<Bpmn>,
487    process_id: &'a str,
488    handler: &'a Eventhandler<T>,
489    user_data: Data<T>,
490    #[cfg(feature = "trace")]
491    trace: Sender<(&'static str, String)>,
492}
493
494impl<'a, T> ExecuteData<'a, T> {
495    pub(super) fn new(
496        process_data: &'a Vec<Bpmn>,
497        process_id: &'a str,
498        handler: &'a Eventhandler<T>,
499        user_data: Data<T>,
500        #[cfg(feature = "trace")] trace: Sender<(&'static str, String)>,
501    ) -> Self {
502        Self {
503            process_data,
504            process_id,
505            handler,
506            user_data,
507            #[cfg(feature = "trace")]
508            trace,
509        }
510    }
511
512    // When we change to a sub process we must change process id and data.
513    fn update(&self, process_id: &'a str, process_data: &'a Vec<Bpmn>) -> Self {
514        Self {
515            process_data,
516            process_id,
517            handler: self.handler,
518            user_data: self.user_data(),
519            #[cfg(feature = "trace")]
520            trace: self.trace.clone(),
521        }
522    }
523
524    #[cfg(feature = "trace")]
525    fn trace(&self, bpmn_type: &'static str, value: impl Into<String>) -> Result<(), Error> {
526        Ok(self.trace.send((bpmn_type, value.into()))?)
527    }
528
529    fn user_data(&self) -> Data<T> {
530        Arc::clone(&self.user_data)
531    }
532}