bpmn_engine/engine/
executor.rs1use crate::activity::{Activity, ActivityError, ActivityFactory, ActivityResult, DefaultActivityFactory};
6use crate::engine::context::{ExecutionContext, ProcessInstanceState};
7use crate::engine::instance::ProcessInstance;
8use crate::model::ProcessDefinition;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13#[derive(Debug)]
17pub struct Engine {
18 instances: Arc<RwLock<HashMap<String, Arc<ProcessInstance>>>>,
20 activity_factory: Arc<dyn ActivityFactory>,
22}
23
24impl Engine {
25 pub fn new() -> Self {
27 Self {
28 instances: Arc::new(RwLock::new(HashMap::new())),
29 activity_factory: Arc::new(DefaultActivityFactory::new()),
30 }
31 }
32
33 pub fn with_activity_factory(factory: Arc<dyn ActivityFactory>) -> Self {
35 Self {
36 instances: Arc::new(RwLock::new(HashMap::new())),
37 activity_factory: factory,
38 }
39 }
40
41 pub async fn start_process(
51 &self,
52 definition: ProcessDefinition,
53 initial_variables: Option<HashMap<String, serde_json::Value>>,
54 ) -> Result<Arc<ProcessInstance>, EngineError> {
55 let instance_id = format!("instance_{}", uuid::Uuid::new_v4());
56 let definition = Arc::new(definition);
57 let instance = ProcessInstance::new(definition, instance_id.clone());
58
59 {
61 let mut context = instance.context_mut().await;
62 if let Some(vars) = initial_variables {
63 for (name, value) in vars {
64 context.set_variable(name, value);
65 }
66 }
67 }
68
69 {
71 let mut instances = self.instances.write().await;
72 instances.insert(instance_id.clone(), Arc::new(instance.clone()));
73 }
74
75 self.execute_process(Arc::new(instance.clone())).await?;
77
78 Ok(Arc::new(instance))
79 }
80
81 async fn execute_process(&self, instance: Arc<ProcessInstance>) -> Result<(), EngineError> {
85 let definition = instance.definition();
86
87 let start_events: Vec<String> = definition
89 .elements
90 .values()
91 .filter_map(|elem| {
92 match elem {
93 crate::model::ProcessElement::StartEvent(_) => Some(elem.id().to_string()),
94 _ => None,
95 }
96 })
97 .collect();
98
99 if start_events.is_empty() {
100 return Err(EngineError::NoStartEvent);
101 }
102
103 {
105 let mut context = instance.context_mut().await;
106 context.set_current_elements(start_events);
107 }
108
109 loop {
111 let should_continue = {
112 let mut context = instance.context_mut().await;
113
114 if context.state != ProcessInstanceState::Active {
115 break;
116 }
117
118 let current_elements = context.current_elements.clone();
119 context.clear_current_elements();
120
121 for element_id in current_elements {
123 let element = match definition.get_element(&element_id) {
125 Some(e) => e,
126 None => {
127 context.state = ProcessInstanceState::Failed;
128 return Err(EngineError::ElementNotFound(element_id));
129 }
130 };
131
132 let activity = match self.activity_factory.create_activity(element) {
134 Ok(a) => a,
135 Err(e) => {
136 context.state = ProcessInstanceState::Failed;
137 return Err(EngineError::ActivityExecutionError(e));
138 }
139 };
140
141 let activity_result = match activity.execute(&mut context).await {
143 Ok(result) => result,
144 Err(e) => {
145 context.state = ProcessInstanceState::Failed;
146 return Err(EngineError::ActivityExecutionError(e));
147 }
148 };
149
150 let step_result = match &activity_result {
152 ActivityResult::Completed { .. } => {
153 crate::engine::context::ExecutionStepResult::Completed
154 }
155 ActivityResult::Waiting { reason } => {
156 crate::engine::context::ExecutionStepResult::Waiting(reason.clone())
157 }
158 ActivityResult::Continue { .. } => {
159 crate::engine::context::ExecutionStepResult::Completed
160 }
161 };
162 context.add_execution_step(crate::engine::context::ExecutionStep {
163 element_id: element_id.clone(),
164 timestamp: std::time::SystemTime::now(),
165 result: step_result,
166 });
167
168 match activity_result {
170 ActivityResult::Completed { .. } => {
171 let outgoing_flows = definition.get_outgoing_flows(&element_id);
173 let mut next_elements = Vec::new();
174
175 for flow in outgoing_flows {
176 if let Some(_condition) = &flow.condition_expression {
178 }
181 next_elements.push(flow.target_ref.clone());
182 }
183
184 match element {
186 crate::model::ProcessElement::EndEvent(_) => {
187 context.state = ProcessInstanceState::Completed;
188 return Ok(());
189 }
190 _ => {
191 context.current_elements.extend(next_elements);
192 }
193 }
194 }
195 ActivityResult::Waiting { .. } => {
196 break;
198 }
199 ActivityResult::Continue { next_elements } => {
200 let mut has_end_event = false;
202 for next_id in &next_elements {
203 if let Some(next_elem) = definition.get_element(next_id) {
204 if matches!(next_elem, crate::model::ProcessElement::EndEvent(_)) {
205 has_end_event = true;
206 break;
207 }
208 }
209 }
210
211 if has_end_event {
212 context.state = ProcessInstanceState::Completed;
213 return Ok(());
214 }
215
216 context.current_elements.extend(next_elements);
217 }
218 }
219 }
220
221 !context.current_elements.is_empty()
222 };
223
224 if !should_continue {
225 break;
226 }
227 }
228
229 Ok(())
230 }
231
232 pub async fn get_instance(&self, instance_id: &str) -> Option<Arc<ProcessInstance>> {
234 let instances = self.instances.read().await;
235 instances.get(instance_id).cloned()
236 }
237}
238
239impl Default for Engine {
240 fn default() -> Self {
241 Self::new()
242 }
243}
244
245#[derive(Debug, Default)]
249pub struct EngineBuilder {
250 }
252
253impl EngineBuilder {
254 pub fn new() -> Self {
256 Self::default()
257 }
258
259 pub fn build(self) -> Engine {
261 Engine::new()
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum EngineError {
268 #[error("No start event found in process")]
269 NoStartEvent,
270 #[error("Element not found: {0}")]
271 ElementNotFound(String),
272 #[error("Activity execution error: {0}")]
273 ActivityExecutionError(#[from] ActivityError),
274 #[error("Process execution failed: {0}")]
275 ExecutionFailed(String),
276}
277