bpmn_engine/elements/
gateway.rs1use crate::activity::{Activity, ActivityError, ActivityResult};
6use crate::capability::{Capability, CapabilityError, CapabilityResult, CapabilityProvider};
7use crate::engine::ExecutionContext;
8use crate::model::{ExclusiveGateway, ParallelGateway, InclusiveGateway};
9use async_trait::async_trait;
10use std::collections::HashMap;
11
12pub struct ExclusiveGatewayActivity {
17 gateway: ExclusiveGateway,
18}
19
20impl ExclusiveGatewayActivity {
21 pub fn new(gateway: ExclusiveGateway) -> Self {
22 Self { gateway }
23 }
24}
25
26#[async_trait]
27impl Activity for ExclusiveGatewayActivity {
28 async fn execute(&self, context: &mut ExecutionContext) -> Result<ActivityResult, ActivityError> {
29 let definition = &context.process_definition;
30 let outgoing_flows = definition.get_outgoing_flows(&self.gateway.base.id);
31
32 let mut selected_flow: Option<String> = None;
34
35 for flow in outgoing_flows {
36 if Some(&flow.id) == self.gateway.default_flow.as_ref() {
38 if selected_flow.is_none() {
40 selected_flow = Some(flow.target_ref.clone());
41 }
42 continue;
43 }
44
45 if let Some(condition) = &flow.condition_expression {
47 if selected_flow.is_none() {
50 selected_flow = Some(flow.target_ref.clone());
51 break;
52 }
53 } else {
54 if selected_flow.is_none() {
56 selected_flow = Some(flow.target_ref.clone());
57 }
58 }
59 }
60
61 match selected_flow {
62 Some(target) => Ok(ActivityResult::Continue {
63 next_elements: vec![target],
64 }),
65 None => Err(ActivityError::ExecutionFailed(
66 "No outgoing flow selected from exclusive gateway".to_string(),
67 )),
68 }
69 }
70
71 fn id(&self) -> &str {
72 &self.gateway.base.id
73 }
74
75 fn name(&self) -> Option<&str> {
76 self.gateway.base.name.as_deref()
77 }
78}
79
80pub struct ParallelGatewayActivity {
85 gateway: ParallelGateway,
86}
87
88impl ParallelGatewayActivity {
89 pub fn new(gateway: ParallelGateway) -> Self {
90 Self { gateway }
91 }
92}
93
94#[async_trait]
95impl Activity for ParallelGatewayActivity {
96 async fn execute(&self, context: &mut ExecutionContext) -> Result<ActivityResult, ActivityError> {
97 let definition = &context.process_definition;
98 let incoming_flows = definition.get_incoming_flows(&self.gateway.base.id);
99 let outgoing_flows = definition.get_outgoing_flows(&self.gateway.base.id);
100
101 if !outgoing_flows.is_empty() {
104 let next_elements: Vec<String> = outgoing_flows
105 .iter()
106 .map(|flow| flow.target_ref.clone())
107 .collect();
108 Ok(ActivityResult::Continue { next_elements })
109 } else {
110 Ok(ActivityResult::Completed { output_variables: None })
113 }
114 }
115
116 fn id(&self) -> &str {
117 &self.gateway.base.id
118 }
119
120 fn name(&self) -> Option<&str> {
121 self.gateway.base.name.as_deref()
122 }
123}
124
125pub struct InclusiveGatewayActivity {
130 gateway: InclusiveGateway,
131}
132
133impl InclusiveGatewayActivity {
134 pub fn new(gateway: InclusiveGateway) -> Self {
135 Self { gateway }
136 }
137}
138
139#[async_trait]
140impl Activity for InclusiveGatewayActivity {
141 async fn execute(&self, context: &mut ExecutionContext) -> Result<ActivityResult, ActivityError> {
142 let definition = &context.process_definition;
143 let outgoing_flows = definition.get_outgoing_flows(&self.gateway.base.id);
144
145 let mut selected_targets = Vec::new();
147
148 for flow in outgoing_flows {
149 if Some(&flow.id) == self.gateway.default_flow.as_ref() {
151 if selected_targets.is_empty() {
153 selected_targets.push(flow.target_ref.clone());
154 }
155 continue;
156 }
157
158 if let Some(condition) = &flow.condition_expression {
160 selected_targets.push(flow.target_ref.clone());
163 } else {
164 selected_targets.push(flow.target_ref.clone());
166 }
167 }
168
169 if selected_targets.is_empty() {
170 Err(ActivityError::ExecutionFailed(
171 "No outgoing flow selected from inclusive gateway".to_string(),
172 ))
173 } else {
174 Ok(ActivityResult::Continue {
175 next_elements: selected_targets,
176 })
177 }
178 }
179
180 fn id(&self) -> &str {
181 &self.gateway.base.id
182 }
183
184 fn name(&self) -> Option<&str> {
185 self.gateway.base.name.as_deref()
186 }
187}