1mod bpmn_queue;
2
3use bpmn_queue::BpmnQueue;
4use log::info;
5use std::{borrow::Cow, ops::ControlFlow, sync::Arc};
6
7#[cfg(feature = "trace")]
8use crate::process::replay;
9#[cfg(feature = "trace")]
10use std::sync::mpsc::Sender;
11
12use crate::{
13 Data, Eventhandler, Process, Symbol,
14 error::{
15 AT_LEAST_TWO_OUTGOING, Error, cannot_do_events, cannot_fork, cannot_use_cond_expr,
16 cannot_use_default,
17 },
18 model::{ActivityType, Bpmn, BpmnLocal, EventType, Gateway, GatewayType, With},
19};
20
21#[derive(Debug)]
22enum Return<'a> {
23 Fork(Cow<'a, [usize]>),
24 Join(&'a Gateway),
25 End(Option<&'a Bpmn>),
26}
27
28macro_rules! maybe_fork {
29 ($self:expr, $outputs:expr, $data:expr, $ty:expr, $noi:expr) => {
30 if $outputs.len() <= 1 {
31 $outputs
32 .first()
33 .ok_or_else(|| Error::MissingOutput($ty.to_string(), $noi.to_string()))?
34 } else {
35 return Ok(Return::Fork(Cow::Borrowed($outputs.ids())));
36 }
37 };
38}
39
40impl Process {
41 pub(super) fn execute<'a, T>(&'a self, data: &ExecuteData<'a, T>) -> ExecuteResult<'a>
42 where
43 T: Send,
44 {
45 let mut queue = BpmnQueue::new(Cow::from(&[0]));
46 while let Some(tokens) = queue.pop() {
47 let num_tokens = tokens.len();
48
49 let results = {
51 #[cfg(feature = "parallel")]
52 {
53 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
54 let results: Vec<_> = tokens
56 .par_iter()
57 .map(|token| self.flow(token, data))
58 .collect::<Vec<_>>();
59 results.into_iter()
60 }
61 #[cfg(not(feature = "parallel"))]
62 tokens.iter().map(|token| self.flow(token, data))
63 };
64
65 for result in results {
66 match result {
67 Ok(Return::Join(gateway)) => queue.join_token(gateway),
68 Ok(Return::End(symbol @ Some(_))) if queue.is_empty() && num_tokens == 1 => {
70 return Ok(symbol);
71 }
72 Ok(Return::End(_)) => queue.end_token(),
73 Ok(Return::Fork(item)) => queue.add_pending_fork(item),
74 Err(value) => return Err(value),
75 }
76 }
77
78 if let Some(mut gateways) = queue.tokens_consumed() {
82 if let Some(
83 gw @ Gateway {
84 gateway, outputs, ..
85 },
86 ) = gateways.pop()
87 {
88 match gateway {
89 GatewayType::Parallel => {
90 queue.push(Cow::Borrowed(outputs.ids()));
91 }
92 GatewayType::Inclusive if outputs.len() == 1 => {
93 queue.push(Cow::Borrowed(outputs.ids()));
94 }
95 GatewayType::Inclusive if outputs.len() > 1 => {
97 match self.handle_inclusive_gateway(data, gw)? {
98 ControlFlow::Continue(value) => {
99 queue.push(Cow::Owned(vec![*value]));
100 }
101 ControlFlow::Break(Return::Fork(value)) => {
102 queue.push(value);
103 }
104 _ => {}
105 }
106 }
107 _ => {}
108 }
109 }
110 }
111
112 queue.commit_forks();
114 }
115 Ok(None)
116 }
117
118 fn flow<'a: 'b, 'b, T>(
120 &'a self,
121 mut current_id: &'b usize,
122 data: &ExecuteData<'a, T>,
123 ) -> Result<Return<'a>, Error>
124 where
125 T: Send,
126 {
127 loop {
128 current_id = match data
129 .process_data
130 .get(*current_id)
131 .ok_or_else(|| Error::MisssingBpmnData(current_id.to_string()))?
132 {
133 bpmn @ Bpmn::Event {
134 event,
135 symbol,
136 id: BpmnLocal(bid, _),
137 name,
138 outputs,
139 ..
140 } => {
141 let name_or_id = name.as_ref().unwrap_or(bid);
142 info!("{}: {}", event, name_or_id);
143 match event {
144 EventType::Start | EventType::IntermediateCatch | EventType::Boundary => {
145 maybe_fork!(self, outputs, data, event, name_or_id)
146 }
147 EventType::IntermediateThrow => {
148 match (name.as_ref(), symbol.as_ref()) {
149 (Some(name), Some(symbol @ Symbol::Link)) => {
150 self.catch_link_lookup(name, symbol, data.process_id)?
151 }
152 (Some(_), _) => {
154 maybe_fork!(self, outputs, data, event, name_or_id)
155 }
156 _ => Err(Error::MissingIntermediateThrowEventName(bid.into()))?,
157 }
158 }
159 EventType::End => {
160 if symbol.is_some() {
161 return Ok(Return::End(Some(bpmn)));
162 }
163 break;
164 }
165 }
166 }
167 Bpmn::Activity {
168 activity,
169 id,
170 name,
171 outputs,
172 ..
173 } => {
174 let name_or_id = name.as_ref().unwrap_or(id);
175 info!("{}: {}", activity, name_or_id);
176 match activity {
177 ActivityType::Task
178 | ActivityType::ScriptTask
179 | ActivityType::UserTask
180 | ActivityType::ServiceTask
181 | ActivityType::CallActivity
182 | ActivityType::ReceiveTask
183 | ActivityType::SendTask
184 | ActivityType::ManualTask
185 | ActivityType::BusinessRuleTask => {
186 #[cfg(feature = "trace")]
187 data.trace(replay::TASK, name_or_id)?;
188 if let Some(boundary) =
189 data.handler.run_task(name_or_id, data.user_data())
190 {
191 self.boundary_lookup(id, boundary.0, &boundary.1, data.process_data)
192 .ok_or_else(|| {
193 Error::MissingBoundary(
194 format!("{}", boundary),
195 name_or_id.into(),
196 )
197 })?
198 } else {
199 maybe_fork!(self, outputs, data, activity, name_or_id)
200 }
201 }
202 ActivityType::SubProcess => {
203 let sp_data = self
204 .data
205 .get(id)
206 .ok_or_else(|| Error::MissingProcessData(id.into()))?;
207
208 if let Some(Bpmn::Event {
209 event: EventType::End,
210 symbol: Some(symbol),
211 name,
212 ..
213 }) = self.execute(&data.update(id, sp_data))?
214 {
215 self.boundary_lookup(id, name.as_deref(), symbol, data.process_data)
216 .ok_or_else(|| {
217 Error::MissingBoundary(
218 symbol.to_string(),
219 name_or_id.into(),
220 )
221 })?
222 } else {
223 maybe_fork!(self, outputs, data, activity, name_or_id)
225 }
226 }
227 }
228 }
229
230 Bpmn::Gateway(
231 gw @ Gateway {
232 gateway,
233 id,
234 name,
235 default,
236 outputs,
237 inputs,
238 },
239 ) => {
240 let name_or_id = name.as_ref().unwrap_or(id);
241 info!("{}: {}", gateway, name_or_id);
242 #[cfg(feature = "trace")]
243 data.trace(replay::GATEWAY, name_or_id)?;
244
245 match gateway {
246 _ if outputs.len() == 0 => {
247 return Err(Error::MissingOutput(
248 gateway.to_string(),
249 name_or_id.to_string(),
250 ));
251 }
252 _ if outputs.len() == 1 && inputs.len() == 1 => outputs.first().unwrap(),
254 GatewayType::Exclusive if outputs.len() == 1 => outputs.first().unwrap(),
255 GatewayType::Exclusive => {
256 let response = data.handler.run_gateway(name_or_id, data.user_data());
257 match response {
258 With::Flow(value) => {
259 output_by_name_or_id(value, outputs.ids(), data.process_data)
260 .ok_or_else(|| {
261 Error::MissingOutput(
262 gateway.to_string(),
263 name_or_id.to_string(),
264 )
265 })?
266 }
267 With::Default => default_path(default, gateway, name_or_id)?,
268 With::Fork(_) => return Err(cannot_fork(gateway)),
269 With::Symbol(_, _) => return Err(cannot_do_events(gateway)),
270 }
271 }
272 GatewayType::Parallel | GatewayType::Inclusive if inputs.len() > 1 => {
274 return Ok(Return::Join(gw));
275 }
276 GatewayType::Parallel => {
277 return Ok(Return::Fork(Cow::Borrowed(outputs.ids())));
278 }
279 GatewayType::Inclusive => match self.handle_inclusive_gateway(data, gw)? {
280 ControlFlow::Continue(value) => value,
281 ControlFlow::Break(value) => return Ok(value),
282 },
283 GatewayType::EventBased if outputs.len() == 1 => {
284 return Err(Error::BpmnRequirement(AT_LEAST_TWO_OUTGOING.into()));
285 }
286 GatewayType::EventBased => {
287 let response = data.handler.run_gateway(name_or_id, data.user_data());
288 match response {
289 With::Symbol(_, _) => {
290 output_by_symbol(&response, outputs.ids(), data.process_data)
291 .ok_or_else(|| {
292 Error::MissingOutput(
293 gateway.to_string(),
294 name_or_id.to_string(),
295 )
296 })?
297 }
298 With::Default => return Err(cannot_use_default(gateway)),
299 With::Flow(_) => return Err(cannot_use_cond_expr(gateway)),
300 With::Fork(_) => return Err(cannot_fork(gateway)),
301 }
302 }
303 }
304 }
305 Bpmn::SequenceFlow {
306 id,
307 name,
308 target_ref,
309 ..
310 } => {
311 info!("SequenceFlow: {}", name.as_ref().unwrap_or(id));
312 target_ref.local()
313 }
314 bpmn => return Err(Error::TypeNotImplemented(format!("{bpmn:?}"))),
315 };
316 }
317 Ok(Return::End(None))
318 }
319
320 fn handle_inclusive_gateway<'a, T>(
321 &'a self,
322 data: &ExecuteData<'a, T>,
323 Gateway {
324 gateway,
325 id,
326 name,
327 default,
328 outputs,
329 ..
330 }: &'a Gateway,
331 ) -> Result<ControlFlow<Return<'a>, &'a usize>, Error> {
332 let name_or_id = name.as_ref().unwrap_or(id);
333 let response = data.handler.run_gateway(name_or_id, data.user_data());
334 let value = match response {
335 With::Flow(value) => output_by_name_or_id(value, outputs.ids(), data.process_data)
336 .ok_or_else(|| Error::MissingOutput(gateway.to_string(), name_or_id.to_string()))?,
337 With::Fork(value) => {
338 if value.is_empty() {
339 default_path(default, gateway, name_or_id)?
340 } else {
341 let outputs = outputs.ids();
342 let responses: Vec<_> = value
343 .iter()
344 .filter_map(|&response| {
345 output_by_name_or_id(response, outputs, data.process_data)
346 })
347 .collect();
348
349 if responses.len() <= 1 {
350 *responses.first().ok_or_else(|| {
351 Error::MissingOutput(gateway.to_string(), name_or_id.to_string())
352 })?
353 } else {
354 return Ok(ControlFlow::Break(Return::Fork(Cow::Owned(
355 responses.into_iter().cloned().collect(),
356 ))));
357 }
358 }
359 }
360 With::Default => default_path(default, gateway, name_or_id)?,
361 With::Symbol(_, _) => return Err(cannot_do_events(gateway)),
362 };
363 Ok(ControlFlow::Continue(value))
364 }
365
366 fn boundary_lookup<'a>(
367 &'a self,
368 activity_id: &str,
369 search_name: Option<&str>,
370 search_symbol: &Symbol,
371 process_data: &'a [Bpmn],
372 ) -> Option<&'a usize> {
373 self.boundaries
374 .get(activity_id)?
375 .iter()
376 .filter_map(|index| process_data.get(*index))
377 .filter_map(|bpmn| {
378 if let Bpmn::Event {
380 symbol, id, name, ..
381 } = bpmn
382 {
383 if symbol.as_ref() == Some(search_symbol) && search_name == name.as_deref() {
384 return Some(id.local());
385 }
386 }
387 None
388 })
389 .next()
390 }
391
392 fn catch_link_lookup(
394 &self,
395 throw_event_name: &str,
396 symbol: &Symbol,
397 process_id: &str,
398 ) -> Result<&usize, Error> {
399 self.catch_event_links
400 .get(process_id)
401 .and_then(|links| links.get(throw_event_name))
402 .ok_or_else(|| {
403 Error::MissingIntermediateCatchEvent(symbol.to_string(), throw_event_name.into())
404 })
405 }
406}
407
408fn default_path<'a>(
409 default: &'a Option<BpmnLocal>,
410 gateway: &GatewayType,
411 name_or_id: &String,
412) -> Result<&'a usize, Error> {
413 default
414 .as_ref()
415 .map(BpmnLocal::local)
416 .ok_or_else(|| Error::MissingDefault(gateway.to_string(), name_or_id.to_string()))
417}
418
419fn output_by_symbol<'a>(
420 search: &With,
421 outputs: &'a [usize],
422 process_data: &'a [Bpmn],
423) -> Option<&'a usize> {
424 outputs.iter().find(|index| match search {
425 With::Symbol(search, search_symbol) => process_data
426 .get(**index)
427 .and_then(|bpmn| {
428 if let Bpmn::SequenceFlow { target_ref, .. } = bpmn {
429 return process_data.get(*target_ref.local());
430 }
431 None
432 })
433 .filter(|bpmn| match bpmn {
434 Bpmn::Activity {
436 id, activity, name, ..
437 } => {
438 activity == &ActivityType::ReceiveTask
439 && search_symbol == &Symbol::Message
440 && (search.filter(|&sn| sn == id).is_some() || *search == name.as_deref())
441 }
442 Bpmn::Event {
443 id,
444 symbol:
445 Some(
446 symbol @ (Symbol::Message
447 | Symbol::Signal
448 | Symbol::Timer
449 | Symbol::Conditional),
450 ),
451 name,
452 ..
453 } => {
454 symbol == search_symbol
455 && (search.filter(|&sn| sn == id.bpmn()).is_some()
456 || *search == name.as_deref())
457 }
458 _ => false,
459 })
460 .is_some(),
461 _ => false,
462 })
463}
464
465fn output_by_name_or_id<'a>(
466 search: impl AsRef<str>,
467 outputs: &'a [usize],
468 process_data: &'a [Bpmn],
469) -> Option<&'a usize> {
470 outputs.iter().find(|index| {
471 if let Some(Bpmn::SequenceFlow { id, name, .. }) = process_data.get(**index) {
472 return name
473 .as_deref()
474 .filter(|&name| name == search.as_ref())
475 .is_some()
476 || id == search.as_ref();
477 }
478 false
479 })
480}
481
482pub(super) type ExecuteResult<'a> = Result<Option<&'a Bpmn>, Error>;
483
484pub(super) struct ExecuteData<'a, T> {
486 process_data: &'a Vec<Bpmn>,
487 process_id: &'a str,
488 handler: &'a Eventhandler<T>,
489 user_data: Data<T>,
490 #[cfg(feature = "trace")]
491 trace: Sender<(&'static str, String)>,
492}
493
494impl<'a, T> ExecuteData<'a, T> {
495 pub(super) fn new(
496 process_data: &'a Vec<Bpmn>,
497 process_id: &'a str,
498 handler: &'a Eventhandler<T>,
499 user_data: Data<T>,
500 #[cfg(feature = "trace")] trace: Sender<(&'static str, String)>,
501 ) -> Self {
502 Self {
503 process_data,
504 process_id,
505 handler,
506 user_data,
507 #[cfg(feature = "trace")]
508 trace,
509 }
510 }
511
512 fn update(&self, process_id: &'a str, process_data: &'a Vec<Bpmn>) -> Self {
514 Self {
515 process_data,
516 process_id,
517 handler: self.handler,
518 user_data: self.user_data(),
519 #[cfg(feature = "trace")]
520 trace: self.trace.clone(),
521 }
522 }
523
524 #[cfg(feature = "trace")]
525 fn trace(&self, bpmn_type: &'static str, value: impl Into<String>) -> Result<(), Error> {
526 Ok(self.trace.send((bpmn_type, value.into()))?)
527 }
528
529 fn user_data(&self) -> Data<T> {
530 Arc::clone(&self.user_data)
531 }
532}