snurr/process/
engine.rs

1mod execute_handler;
2
3use crate::{
4    IntermediateEvent, Process, Symbol,
5    error::{AT_LEAST_TWO_OUTGOING, Error},
6    model::{ActivityType, Bpmn, Event, EventType, Gateway, GatewayType, Id, With},
7};
8use execute_handler::ExecuteHandler;
9use log::{info, warn};
10use std::{borrow::Cow, collections::HashSet, sync::Arc};
11
12use super::{Run, handler::Data};
13
14#[derive(Debug)]
15enum Return<'a> {
16    Fork(Cow<'a, [usize]>),
17    Join(&'a Gateway),
18    End(&'a Event),
19}
20
21macro_rules! maybe_fork {
22    ($self:expr, $outputs:expr, $data:expr, $ty:expr, $noi:expr) => {
23        if $outputs.len() <= 1 {
24            $outputs
25                .first()
26                .ok_or_else(|| Error::MissingOutput($ty.to_string(), $noi.to_string()))?
27        } else {
28            return Ok(Return::Fork(Cow::Borrowed($outputs.ids())));
29        }
30    };
31}
32
33impl<T> Process<T, Run> {
34    pub(super) fn execute<'a>(&'a self, data: ExecuteData<'a, T>) -> ExecuteResult<'a>
35    where
36        T: Send,
37    {
38        let mut end_event = None;
39        let mut handler = ExecuteHandler::new(Cow::from(&[0]));
40        loop {
41            let all_tokens = handler.take();
42            if all_tokens.is_empty() {
43                return end_event.ok_or(Error::MissingEndEvent);
44            }
45
46            let result_iter = {
47                #[cfg(feature = "parallel")]
48                {
49                    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
50                    let results: Vec<Vec<_>> = all_tokens
51                        .par_iter()
52                        .map(|tokens| {
53                            tokens
54                                .par_iter()
55                                .map(|token| self.flow(token, &data))
56                                .collect()
57                        })
58                        .collect::<Vec<_>>();
59                    results.into_iter()
60                }
61                #[cfg(not(feature = "parallel"))]
62                all_tokens
63                    .iter()
64                    .map(|tokens| tokens.iter().map(|token| self.flow(token, &data)))
65            };
66
67            for inner_iter in result_iter.rev() {
68                // Correlate tokens that have arrived
69                for result in inner_iter {
70                    match result {
71                        Ok(Return::Join(gateway)) => handler.consume_token(Some(gateway)),
72                        Ok(Return::End(event)) => {
73                            match event {
74                                Event {
75                                    event_type: EventType::End,
76                                    symbol: Some(Symbol::Terminate),
77                                    ..
78                                } => return Ok(event),
79                                _ => {
80                                    end_event.replace(event);
81                                }
82                            }
83                            handler.consume_token(None);
84                        }
85                        Ok(Return::Fork(item)) => handler.pending(item),
86                        Err(value) => return Err(value),
87                    }
88                }
89
90                // Once all inputs have been merged for a gateway, then proceed with its outputs.
91                // The gateway vector contains all the gateways involved. Right now we are using balanced diagram
92                // and do not need to investigate further.
93                if let Some(mut gateways) = handler.tokens_consumed()
94                    && let Some(
95                        gateway @ Gateway {
96                            gateway_type,
97                            outputs,
98                            ..
99                        },
100                    ) = gateways.pop()
101                {
102                    // We cannot add new tokens until we have correlated all processed flows.
103                    match gateway_type {
104                        GatewayType::Parallel | GatewayType::Inclusive if outputs.len() == 1 => {
105                            handler.immediate(Cow::Borrowed(outputs.ids()));
106                        }
107                        GatewayType::Parallel => {
108                            handler.pending(Cow::Borrowed(outputs.ids()));
109                        }
110                        // Handle Fork, the user code determine next token(s) to run.
111                        GatewayType::Inclusive => {
112                            handler.pending(self.handle_inclusive_gateway(&data, gateway)?);
113                        }
114                        _ => {}
115                    }
116                }
117            }
118            // Commit pending forks
119            handler.commit();
120        }
121    }
122
123    // Each flow process one "token" and returns on a Fork, Join or End.
124    fn flow<'a: 'b, 'b>(
125        &'a self,
126        mut current_id: &'b usize,
127        data: &ExecuteData<'a, T>,
128    ) -> Result<Return<'a>, Error>
129    where
130        T: Send,
131    {
132        loop {
133            current_id = match data
134                .process_data
135                .get(*current_id)
136                .ok_or_else(|| Error::MisssingBpmnData(current_id.to_string()))?
137            {
138                Bpmn::Event(
139                    event @ Event {
140                        event_type,
141                        symbol,
142                        id,
143                        name,
144                        outputs,
145                        ..
146                    },
147                ) => {
148                    let name_or_id = name.as_deref().unwrap_or(id.bpmn());
149                    info!("{event_type}: {name_or_id}");
150                    match event_type {
151                        EventType::Start | EventType::IntermediateCatch | EventType::Boundary => {
152                            maybe_fork!(self, outputs, data, event_type, name_or_id)
153                        }
154                        EventType::IntermediateThrow => {
155                            match (name.as_ref(), symbol.as_ref()) {
156                                (Some(name), Some(symbol @ Symbol::Link)) => {
157                                    self.catch_link_lookup(name, symbol, data.process_id)?
158                                }
159                                // Follow outputs for other throw events
160                                (Some(_), _) => {
161                                    maybe_fork!(self, outputs, data, event_type, name_or_id)
162                                }
163                                _ => {
164                                    Err(Error::MissingIntermediateThrowEventName(id.bpmn().into()))?
165                                }
166                            }
167                        }
168                        EventType::End => {
169                            return Ok(Return::End(event));
170                        }
171                    }
172                }
173                Bpmn::Activity {
174                    activity_type,
175                    id: id @ Id { bpmn_id, local_id },
176                    func_idx,
177                    name,
178                    outputs,
179                    ..
180                } => {
181                    let name_or_id = name.as_ref().unwrap_or(bpmn_id);
182                    info!("{activity_type}: {name_or_id}");
183                    match activity_type {
184                        ActivityType::Task
185                        | ActivityType::ScriptTask
186                        | ActivityType::UserTask
187                        | ActivityType::ServiceTask
188                        | ActivityType::CallActivity
189                        | ActivityType::ReceiveTask
190                        | ActivityType::SendTask
191                        | ActivityType::ManualTask
192                        | ActivityType::BusinessRuleTask => {
193                            match func_idx
194                                .and_then(|index| self.handler.run_task(index, data.user_data()))
195                                .ok_or_else(|| {
196                                    Error::MissingImplementation(
197                                        activity_type.to_string(),
198                                        name_or_id.to_string(),
199                                    )
200                                })? {
201                                Some(boundary) => self
202                                    .boundary_lookup(
203                                        bpmn_id,
204                                        boundary.name(),
205                                        boundary.symbol(),
206                                        data.process_data,
207                                    )
208                                    .ok_or_else(|| {
209                                        Error::MissingBoundary(
210                                            boundary.to_string(),
211                                            name_or_id.into(),
212                                        )
213                                    })?,
214                                None => maybe_fork!(self, outputs, data, activity_type, name_or_id),
215                            }
216                        }
217                        ActivityType::SubProcess => {
218                            let sp_data = self
219                                .diagram
220                                .get_process(*local_id)
221                                .ok_or_else(|| Error::MissingProcessData(bpmn_id.into()))?;
222
223                            if let Event {
224                                event_type: EventType::End,
225                                symbol:
226                                    Some(
227                                        symbol @ (Symbol::Cancel
228                                        | Symbol::Compensation
229                                        | Symbol::Conditional
230                                        | Symbol::Error
231                                        | Symbol::Escalation
232                                        | Symbol::Message
233                                        | Symbol::Signal
234                                        | Symbol::Timer),
235                                    ),
236                                name,
237                                ..
238                            } = self.execute(ExecuteData::new(sp_data, id, data.user_data()))?
239                            {
240                                self.boundary_lookup(
241                                    bpmn_id,
242                                    name.as_deref(),
243                                    symbol,
244                                    data.process_data,
245                                )
246                                .ok_or_else(|| {
247                                    Error::MissingBoundary(symbol.to_string(), name_or_id.into())
248                                })?
249                            } else {
250                                // Continue from subprocess
251                                maybe_fork!(self, outputs, data, activity_type, name_or_id)
252                            }
253                        }
254                    }
255                }
256
257                Bpmn::Gateway(
258                    gateway @ Gateway {
259                        gateway_type,
260                        id,
261                        func_idx,
262                        name,
263                        default,
264                        outputs,
265                        inputs,
266                    },
267                ) => {
268                    let name_or_id = name.as_deref().unwrap_or(id.bpmn());
269                    info!("{gateway_type}: {name_or_id}");
270                    match gateway_type {
271                        _ if outputs.len() == 0 => {
272                            return Err(Error::MissingOutput(
273                                gateway_type.to_string(),
274                                name_or_id.to_string(),
275                            ));
276                        }
277                        // Handle 1 to 1, probably a temporary design or mistake
278                        _ if outputs.len() == 1 && inputs.len() == 1 => outputs.first().unwrap(),
279                        GatewayType::Exclusive if outputs.len() == 1 => outputs.first().unwrap(),
280                        GatewayType::Exclusive => {
281                            match func_idx
282                                .and_then(|index| {
283                                    self.handler.run_exclusive(index, data.user_data())
284                                })
285                                .ok_or_else(|| {
286                                    Error::MissingImplementation(
287                                        gateway_type.to_string(),
288                                        name_or_id.to_string(),
289                                    )
290                                })? {
291                                Some(value) => {
292                                    output_by_name_or_id(value, outputs.ids(), data.process_data)
293                                        .ok_or_else(|| {
294                                            Error::MissingOutput(
295                                                gateway_type.to_string(),
296                                                name_or_id.to_string(),
297                                            )
298                                        })?
299                                }
300                                None => default_path(default, gateway_type, name_or_id)?,
301                            }
302                        }
303                        // Handle a regular Join or a JoinFork. In both cases, we need to wait for all tokens.
304                        GatewayType::Parallel | GatewayType::Inclusive if inputs.len() > 1 => {
305                            return Ok(Return::Join(gateway));
306                        }
307                        GatewayType::Parallel => {
308                            return Ok(Return::Fork(Cow::Borrowed(outputs.ids())));
309                        }
310                        GatewayType::Inclusive => {
311                            return Ok(Return::Fork(self.handle_inclusive_gateway(data, gateway)?));
312                        }
313                        GatewayType::EventBased if outputs.len() == 1 => {
314                            return Err(Error::BpmnRequirement(AT_LEAST_TWO_OUTGOING.into()));
315                        }
316                        GatewayType::EventBased => {
317                            let value = func_idx
318                                .and_then(|index| {
319                                    self.handler.run_event_based(index, data.user_data())
320                                })
321                                .ok_or_else(|| {
322                                    Error::MissingImplementation(
323                                        gateway_type.to_string(),
324                                        name_or_id.to_string(),
325                                    )
326                                })?;
327
328                            output_by_symbol(&value, outputs.ids(), data.process_data).ok_or_else(
329                                || {
330                                    Error::MissingIntermediateEvent(
331                                        gateway_type.to_string(),
332                                        name_or_id.to_string(),
333                                        value.to_string(),
334                                    )
335                                },
336                            )?
337                        }
338                    }
339                }
340                Bpmn::SequenceFlow {
341                    id,
342                    name,
343                    target_ref,
344                    ..
345                } => {
346                    info!("SequenceFlow: {}", name.as_deref().unwrap_or(id.bpmn()));
347                    target_ref.local()
348                }
349                bpmn => return Err(Error::TypeNotImplemented(format!("{bpmn:?}"))),
350            };
351        }
352    }
353
354    fn handle_inclusive_gateway<'a>(
355        &'a self,
356        data: &ExecuteData<'a, T>,
357        Gateway {
358            gateway_type,
359            id,
360            func_idx,
361            name,
362            default,
363            outputs,
364            ..
365        }: &'a Gateway,
366    ) -> Result<Cow<'a, [usize]>, Error> {
367        let name_or_id = name.as_deref().unwrap_or(id.bpmn());
368        let find_flow = |value| {
369            output_by_name_or_id(value, outputs.ids(), data.process_data).ok_or_else(|| {
370                Error::MissingOutput(gateway_type.to_string(), name_or_id.to_string())
371            })
372        };
373
374        let value = match func_idx
375            .and_then(|index| self.handler.run_inclusive(index, data.user_data()))
376            .ok_or_else(|| {
377                Error::MissingImplementation(gateway_type.to_string(), name_or_id.to_string())
378            })? {
379            With::Flow(value) => find_flow(value)?,
380            With::Fork(values) => match values.as_slice() {
381                [] => default_path(default, gateway_type, name_or_id)?,
382                [value] => find_flow(value)?,
383                [..] => {
384                    let mut outputs = HashSet::with_capacity(values.len());
385                    for &value in values.iter() {
386                        // Breaks on first error
387                        if !outputs.insert(*find_flow(value)?) {
388                            // The flow has already been used, we just log an warning and continue.
389                            warn!(
390                                "Inclusive Gateway {name_or_id} used flow {value} multiple times. Discarded the duplicates."
391                            );
392                        }
393                    }
394                    return Ok(Cow::Owned(outputs.into_iter().collect()));
395                }
396            },
397            With::Default => default_path(default, gateway_type, name_or_id)?,
398        };
399        Ok(Cow::Owned(vec![*value]))
400    }
401
402    fn boundary_lookup<'a>(
403        &'a self,
404        activity_id: &str,
405        search_name: Option<&str>,
406        search_symbol: &Symbol,
407        process_data: &'a [Bpmn],
408    ) -> Option<&'a usize> {
409        self.diagram
410            .boundaries()
411            .get(activity_id)?
412            .iter()
413            .filter_map(|index| process_data.get(*index))
414            .find_map(|bpmn| match bpmn {
415                Bpmn::Event(Event {
416                    symbol: Some(symbol),
417                    id,
418                    name,
419                    ..
420                }) if symbol == search_symbol && search_name == name.as_deref() => Some(id.local()),
421                _ => None,
422            })
423    }
424
425    // Links in specified process.
426    fn catch_link_lookup(
427        &self,
428        throw_event_name: &str,
429        symbol: &Symbol,
430        process_id: &Id,
431    ) -> Result<&usize, Error> {
432        self.diagram
433            .catch_event_links()
434            .get(process_id.bpmn())
435            .and_then(|links| links.get(throw_event_name))
436            .ok_or_else(|| {
437                Error::MissingIntermediateCatchEvent(symbol.to_string(), throw_event_name.into())
438            })
439    }
440}
441
442fn default_path<'a>(
443    default: &'a Option<Id>,
444    gateway: &GatewayType,
445    name_or_id: &str,
446) -> Result<&'a usize, Error> {
447    default
448        .as_ref()
449        .map(Id::local)
450        .ok_or_else(|| Error::MissingDefault(gateway.to_string(), name_or_id.to_string()))
451}
452
453fn output_by_symbol<'a>(
454    search: &IntermediateEvent,
455    outputs: &'a [usize],
456    process_data: &'a [Bpmn],
457) -> Option<&'a usize> {
458    outputs.iter().find(|index| {
459        process_data
460            .get(**index)
461            .and_then(|bpmn| {
462                if let Bpmn::SequenceFlow { target_ref, .. } = bpmn {
463                    return process_data.get(*target_ref.local());
464                }
465                None
466            })
467            .is_some_and(|bpmn| match bpmn {
468                // We can target both ReceiveTask or Events.
469                Bpmn::Activity {
470                    activity_type: ActivityType::ReceiveTask,
471                    name: Some(name),
472                    ..
473                } => search.1 == Symbol::Message && name.as_str() == search.0,
474                Bpmn::Event(Event {
475                    symbol:
476                        Some(
477                            symbol @ (Symbol::Message
478                            | Symbol::Signal
479                            | Symbol::Timer
480                            | Symbol::Conditional),
481                        ),
482                    name: Some(name),
483                    ..
484                }) => *symbol == search.1 && name.as_str() == search.0,
485                _ => false,
486            })
487    })
488}
489
490fn output_by_name_or_id<'a>(
491    search: impl AsRef<str>,
492    outputs: &'a [usize],
493    process_data: &'a [Bpmn],
494) -> Option<&'a usize> {
495    outputs.iter().find(|index| {
496        if let Some(Bpmn::SequenceFlow { id, name, .. }) = process_data.get(**index) {
497            return name.as_deref().is_some_and(|name| name == search.as_ref())
498                || id.bpmn() == search.as_ref();
499        }
500        false
501    })
502}
503
504pub(super) type ExecuteResult<'a> = Result<&'a Event, Error>;
505
506// Data for the execution engine.
507pub(super) struct ExecuteData<'a, T> {
508    process_data: &'a Vec<Bpmn>,
509    process_id: &'a Id,
510    user_data: Data<T>,
511}
512
513impl<'a, T> ExecuteData<'a, T> {
514    pub(super) fn new(process_data: &'a Vec<Bpmn>, process_id: &'a Id, user_data: Data<T>) -> Self {
515        Self {
516            process_data,
517            process_id,
518            user_data,
519        }
520    }
521
522    fn user_data(&self) -> Data<T> {
523        Arc::clone(&self.user_data)
524    }
525}