1mod execute_handler;
2
3use crate::{
4 IntermediateEvent, Process, Symbol,
5 error::{AT_LEAST_TWO_OUTGOING, Error},
6 model::{ActivityType, Bpmn, Event, EventType, Gateway, GatewayType, Id, With},
7};
8use execute_handler::ExecuteHandler;
9use log::{info, warn};
10use std::{borrow::Cow, collections::HashSet, sync::Arc};
11
12use super::{Run, handler::Data};
13
14#[derive(Debug)]
15enum Return<'a> {
16 Fork(Cow<'a, [usize]>),
17 Join(&'a Gateway),
18 End(&'a Event),
19}
20
21macro_rules! maybe_fork {
22 ($self:expr, $outputs:expr, $data:expr, $ty:expr, $noi:expr) => {
23 if $outputs.len() <= 1 {
24 $outputs
25 .first()
26 .ok_or_else(|| Error::MissingOutput($ty.to_string(), $noi.to_string()))?
27 } else {
28 return Ok(Return::Fork(Cow::Borrowed($outputs.ids())));
29 }
30 };
31}
32
33impl<T> Process<T, Run> {
34 pub(super) fn execute<'a>(&'a self, data: ExecuteData<'a, T>) -> ExecuteResult<'a>
35 where
36 T: Send,
37 {
38 let mut end_event = None;
39 let mut handler = ExecuteHandler::new(Cow::from(&[0]));
40 loop {
41 let all_tokens = handler.take();
42 if all_tokens.is_empty() {
43 return end_event.ok_or(Error::MissingEndEvent);
44 }
45
46 let result_iter = {
47 #[cfg(feature = "parallel")]
48 {
49 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
50 let results: Vec<Vec<_>> = all_tokens
51 .par_iter()
52 .map(|tokens| {
53 tokens
54 .par_iter()
55 .map(|token| self.flow(token, &data))
56 .collect()
57 })
58 .collect::<Vec<_>>();
59 results.into_iter()
60 }
61 #[cfg(not(feature = "parallel"))]
62 all_tokens
63 .iter()
64 .map(|tokens| tokens.iter().map(|token| self.flow(token, &data)))
65 };
66
67 for inner_iter in result_iter.rev() {
68 for result in inner_iter {
70 match result {
71 Ok(Return::Join(gateway)) => handler.consume_token(Some(gateway)),
72 Ok(Return::End(event)) => {
73 match event {
74 Event {
75 event_type: EventType::End,
76 symbol: Some(Symbol::Terminate),
77 ..
78 } => return Ok(event),
79 _ => {
80 end_event.replace(event);
81 }
82 }
83 handler.consume_token(None);
84 }
85 Ok(Return::Fork(item)) => handler.pending(item),
86 Err(value) => return Err(value),
87 }
88 }
89
90 if let Some(mut gateways) = handler.tokens_consumed()
94 && let Some(
95 gateway @ Gateway {
96 gateway_type,
97 outputs,
98 ..
99 },
100 ) = gateways.pop()
101 {
102 match gateway_type {
104 GatewayType::Parallel | GatewayType::Inclusive if outputs.len() == 1 => {
105 handler.immediate(Cow::Borrowed(outputs.ids()));
106 }
107 GatewayType::Parallel => {
108 handler.pending(Cow::Borrowed(outputs.ids()));
109 }
110 GatewayType::Inclusive => {
112 handler.pending(self.handle_inclusive_gateway(&data, gateway)?);
113 }
114 _ => {}
115 }
116 }
117 }
118 handler.commit();
120 }
121 }
122
123 fn flow<'a: 'b, 'b>(
125 &'a self,
126 mut current_id: &'b usize,
127 data: &ExecuteData<'a, T>,
128 ) -> Result<Return<'a>, Error>
129 where
130 T: Send,
131 {
132 loop {
133 current_id = match data
134 .process_data
135 .get(*current_id)
136 .ok_or_else(|| Error::MisssingBpmnData(current_id.to_string()))?
137 {
138 Bpmn::Event(
139 event @ Event {
140 event_type,
141 symbol,
142 id,
143 name,
144 outputs,
145 ..
146 },
147 ) => {
148 let name_or_id = name.as_deref().unwrap_or(id.bpmn());
149 info!("{event_type}: {name_or_id}");
150 match event_type {
151 EventType::Start | EventType::IntermediateCatch | EventType::Boundary => {
152 maybe_fork!(self, outputs, data, event_type, name_or_id)
153 }
154 EventType::IntermediateThrow => {
155 match (name.as_ref(), symbol.as_ref()) {
156 (Some(name), Some(symbol @ Symbol::Link)) => {
157 self.catch_link_lookup(name, symbol, data.process_id)?
158 }
159 (Some(_), _) => {
161 maybe_fork!(self, outputs, data, event_type, name_or_id)
162 }
163 _ => {
164 Err(Error::MissingIntermediateThrowEventName(id.bpmn().into()))?
165 }
166 }
167 }
168 EventType::End => {
169 return Ok(Return::End(event));
170 }
171 }
172 }
173 Bpmn::Activity {
174 activity_type,
175 id: id @ Id { bpmn_id, local_id },
176 func_idx,
177 name,
178 outputs,
179 ..
180 } => {
181 let name_or_id = name.as_ref().unwrap_or(bpmn_id);
182 info!("{activity_type}: {name_or_id}");
183 match activity_type {
184 ActivityType::Task
185 | ActivityType::ScriptTask
186 | ActivityType::UserTask
187 | ActivityType::ServiceTask
188 | ActivityType::CallActivity
189 | ActivityType::ReceiveTask
190 | ActivityType::SendTask
191 | ActivityType::ManualTask
192 | ActivityType::BusinessRuleTask => {
193 match func_idx
194 .and_then(|index| self.handler.run_task(index, data.user_data()))
195 .ok_or_else(|| {
196 Error::MissingImplementation(
197 activity_type.to_string(),
198 name_or_id.to_string(),
199 )
200 })? {
201 Some(boundary) => self
202 .boundary_lookup(
203 bpmn_id,
204 boundary.name(),
205 boundary.symbol(),
206 data.process_data,
207 )
208 .ok_or_else(|| {
209 Error::MissingBoundary(
210 boundary.to_string(),
211 name_or_id.into(),
212 )
213 })?,
214 None => maybe_fork!(self, outputs, data, activity_type, name_or_id),
215 }
216 }
217 ActivityType::SubProcess => {
218 let sp_data = self
219 .diagram
220 .get_process(*local_id)
221 .ok_or_else(|| Error::MissingProcessData(bpmn_id.into()))?;
222
223 if let Event {
224 event_type: EventType::End,
225 symbol:
226 Some(
227 symbol @ (Symbol::Cancel
228 | Symbol::Compensation
229 | Symbol::Conditional
230 | Symbol::Error
231 | Symbol::Escalation
232 | Symbol::Message
233 | Symbol::Signal
234 | Symbol::Timer),
235 ),
236 name,
237 ..
238 } = self.execute(ExecuteData::new(sp_data, id, data.user_data()))?
239 {
240 self.boundary_lookup(
241 bpmn_id,
242 name.as_deref(),
243 symbol,
244 data.process_data,
245 )
246 .ok_or_else(|| {
247 Error::MissingBoundary(symbol.to_string(), name_or_id.into())
248 })?
249 } else {
250 maybe_fork!(self, outputs, data, activity_type, name_or_id)
252 }
253 }
254 }
255 }
256
257 Bpmn::Gateway(
258 gateway @ Gateway {
259 gateway_type,
260 id,
261 func_idx,
262 name,
263 default,
264 outputs,
265 inputs,
266 },
267 ) => {
268 let name_or_id = name.as_deref().unwrap_or(id.bpmn());
269 info!("{gateway_type}: {name_or_id}");
270 match gateway_type {
271 _ if outputs.len() == 0 => {
272 return Err(Error::MissingOutput(
273 gateway_type.to_string(),
274 name_or_id.to_string(),
275 ));
276 }
277 _ if outputs.len() == 1 && inputs.len() == 1 => outputs.first().unwrap(),
279 GatewayType::Exclusive if outputs.len() == 1 => outputs.first().unwrap(),
280 GatewayType::Exclusive => {
281 match func_idx
282 .and_then(|index| {
283 self.handler.run_exclusive(index, data.user_data())
284 })
285 .ok_or_else(|| {
286 Error::MissingImplementation(
287 gateway_type.to_string(),
288 name_or_id.to_string(),
289 )
290 })? {
291 Some(value) => {
292 output_by_name_or_id(value, outputs.ids(), data.process_data)
293 .ok_or_else(|| {
294 Error::MissingOutput(
295 gateway_type.to_string(),
296 name_or_id.to_string(),
297 )
298 })?
299 }
300 None => default_path(default, gateway_type, name_or_id)?,
301 }
302 }
303 GatewayType::Parallel | GatewayType::Inclusive if inputs.len() > 1 => {
305 return Ok(Return::Join(gateway));
306 }
307 GatewayType::Parallel => {
308 return Ok(Return::Fork(Cow::Borrowed(outputs.ids())));
309 }
310 GatewayType::Inclusive => {
311 return Ok(Return::Fork(self.handle_inclusive_gateway(data, gateway)?));
312 }
313 GatewayType::EventBased if outputs.len() == 1 => {
314 return Err(Error::BpmnRequirement(AT_LEAST_TWO_OUTGOING.into()));
315 }
316 GatewayType::EventBased => {
317 let value = func_idx
318 .and_then(|index| {
319 self.handler.run_event_based(index, data.user_data())
320 })
321 .ok_or_else(|| {
322 Error::MissingImplementation(
323 gateway_type.to_string(),
324 name_or_id.to_string(),
325 )
326 })?;
327
328 output_by_symbol(&value, outputs.ids(), data.process_data).ok_or_else(
329 || {
330 Error::MissingIntermediateEvent(
331 gateway_type.to_string(),
332 name_or_id.to_string(),
333 value.to_string(),
334 )
335 },
336 )?
337 }
338 }
339 }
340 Bpmn::SequenceFlow {
341 id,
342 name,
343 target_ref,
344 ..
345 } => {
346 info!("SequenceFlow: {}", name.as_deref().unwrap_or(id.bpmn()));
347 target_ref.local()
348 }
349 bpmn => return Err(Error::TypeNotImplemented(format!("{bpmn:?}"))),
350 };
351 }
352 }
353
354 fn handle_inclusive_gateway<'a>(
355 &'a self,
356 data: &ExecuteData<'a, T>,
357 Gateway {
358 gateway_type,
359 id,
360 func_idx,
361 name,
362 default,
363 outputs,
364 ..
365 }: &'a Gateway,
366 ) -> Result<Cow<'a, [usize]>, Error> {
367 let name_or_id = name.as_deref().unwrap_or(id.bpmn());
368 let find_flow = |value| {
369 output_by_name_or_id(value, outputs.ids(), data.process_data).ok_or_else(|| {
370 Error::MissingOutput(gateway_type.to_string(), name_or_id.to_string())
371 })
372 };
373
374 let value = match func_idx
375 .and_then(|index| self.handler.run_inclusive(index, data.user_data()))
376 .ok_or_else(|| {
377 Error::MissingImplementation(gateway_type.to_string(), name_or_id.to_string())
378 })? {
379 With::Flow(value) => find_flow(value)?,
380 With::Fork(values) => match values.as_slice() {
381 [] => default_path(default, gateway_type, name_or_id)?,
382 [value] => find_flow(value)?,
383 [..] => {
384 let mut outputs = HashSet::with_capacity(values.len());
385 for &value in values.iter() {
386 if !outputs.insert(*find_flow(value)?) {
388 warn!(
390 "Inclusive Gateway {name_or_id} used flow {value} multiple times. Discarded the duplicates."
391 );
392 }
393 }
394 return Ok(Cow::Owned(outputs.into_iter().collect()));
395 }
396 },
397 With::Default => default_path(default, gateway_type, name_or_id)?,
398 };
399 Ok(Cow::Owned(vec![*value]))
400 }
401
402 fn boundary_lookup<'a>(
403 &'a self,
404 activity_id: &str,
405 search_name: Option<&str>,
406 search_symbol: &Symbol,
407 process_data: &'a [Bpmn],
408 ) -> Option<&'a usize> {
409 self.diagram
410 .boundaries()
411 .get(activity_id)?
412 .iter()
413 .filter_map(|index| process_data.get(*index))
414 .find_map(|bpmn| match bpmn {
415 Bpmn::Event(Event {
416 symbol: Some(symbol),
417 id,
418 name,
419 ..
420 }) if symbol == search_symbol && search_name == name.as_deref() => Some(id.local()),
421 _ => None,
422 })
423 }
424
425 fn catch_link_lookup(
427 &self,
428 throw_event_name: &str,
429 symbol: &Symbol,
430 process_id: &Id,
431 ) -> Result<&usize, Error> {
432 self.diagram
433 .catch_event_links()
434 .get(process_id.bpmn())
435 .and_then(|links| links.get(throw_event_name))
436 .ok_or_else(|| {
437 Error::MissingIntermediateCatchEvent(symbol.to_string(), throw_event_name.into())
438 })
439 }
440}
441
442fn default_path<'a>(
443 default: &'a Option<Id>,
444 gateway: &GatewayType,
445 name_or_id: &str,
446) -> Result<&'a usize, Error> {
447 default
448 .as_ref()
449 .map(Id::local)
450 .ok_or_else(|| Error::MissingDefault(gateway.to_string(), name_or_id.to_string()))
451}
452
453fn output_by_symbol<'a>(
454 search: &IntermediateEvent,
455 outputs: &'a [usize],
456 process_data: &'a [Bpmn],
457) -> Option<&'a usize> {
458 outputs.iter().find(|index| {
459 process_data
460 .get(**index)
461 .and_then(|bpmn| {
462 if let Bpmn::SequenceFlow { target_ref, .. } = bpmn {
463 return process_data.get(*target_ref.local());
464 }
465 None
466 })
467 .is_some_and(|bpmn| match bpmn {
468 Bpmn::Activity {
470 activity_type: ActivityType::ReceiveTask,
471 name: Some(name),
472 ..
473 } => search.1 == Symbol::Message && name.as_str() == search.0,
474 Bpmn::Event(Event {
475 symbol:
476 Some(
477 symbol @ (Symbol::Message
478 | Symbol::Signal
479 | Symbol::Timer
480 | Symbol::Conditional),
481 ),
482 name: Some(name),
483 ..
484 }) => *symbol == search.1 && name.as_str() == search.0,
485 _ => false,
486 })
487 })
488}
489
490fn output_by_name_or_id<'a>(
491 search: impl AsRef<str>,
492 outputs: &'a [usize],
493 process_data: &'a [Bpmn],
494) -> Option<&'a usize> {
495 outputs.iter().find(|index| {
496 if let Some(Bpmn::SequenceFlow { id, name, .. }) = process_data.get(**index) {
497 return name.as_deref().is_some_and(|name| name == search.as_ref())
498 || id.bpmn() == search.as_ref();
499 }
500 false
501 })
502}
503
504pub(super) type ExecuteResult<'a> = Result<&'a Event, Error>;
505
506pub(super) struct ExecuteData<'a, T> {
508 process_data: &'a Vec<Bpmn>,
509 process_id: &'a Id,
510 user_data: Data<T>,
511}
512
513impl<'a, T> ExecuteData<'a, T> {
514 pub(super) fn new(process_data: &'a Vec<Bpmn>, process_id: &'a Id, user_data: Data<T>) -> Self {
515 Self {
516 process_data,
517 process_id,
518 user_data,
519 }
520 }
521
522 fn user_data(&self) -> Data<T> {
523 Arc::clone(&self.user_data)
524 }
525}