Skip to main content

bpmn_engine/engine/
executor.rs

1//! Process Executor
2//!
3//! Core execution engine for BPMN processes.
4
5use crate::activity::{Activity, ActivityError, ActivityFactory, ActivityResult, DefaultActivityFactory};
6use crate::engine::context::{ExecutionContext, ProcessInstanceState};
7use crate::engine::instance::ProcessInstance;
8use crate::model::ProcessDefinition;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13/// Engine
14///
15/// Main BPMN execution engine.
16#[derive(Debug)]
17pub struct Engine {
18    /// Process instances (in-memory storage)
19    instances: Arc<RwLock<HashMap<String, Arc<ProcessInstance>>>>,
20    /// Activity factory
21    activity_factory: Arc<dyn ActivityFactory>,
22}
23
24impl Engine {
25    /// Create a new engine
26    pub fn new() -> Self {
27        Self {
28            instances: Arc::new(RwLock::new(HashMap::new())),
29            activity_factory: Arc::new(DefaultActivityFactory::new()),
30        }
31    }
32
33    /// Create a new engine with custom activity factory
34    pub fn with_activity_factory(factory: Arc<dyn ActivityFactory>) -> Self {
35        Self {
36            instances: Arc::new(RwLock::new(HashMap::new())),
37            activity_factory: factory,
38        }
39    }
40
41    /// Start a new process instance
42    ///
43    /// # Arguments
44    /// * `definition` - Process definition to execute
45    /// * `initial_variables` - Initial process variables
46    ///
47    /// # Returns
48    /// * `Ok(ProcessInstance)` - Created process instance
49    /// * `Err(EngineError)` - Engine error
50    pub async fn start_process(
51        &self,
52        definition: ProcessDefinition,
53        initial_variables: Option<HashMap<String, serde_json::Value>>,
54    ) -> Result<Arc<ProcessInstance>, EngineError> {
55        let instance_id = format!("instance_{}", uuid::Uuid::new_v4());
56        let definition = Arc::new(definition);
57        let instance = ProcessInstance::new(definition, instance_id.clone());
58
59        // Set initial variables
60        {
61            let mut context = instance.context_mut().await;
62            if let Some(vars) = initial_variables {
63                for (name, value) in vars {
64                    context.set_variable(name, value);
65                }
66            }
67        }
68
69        // Store instance
70        {
71            let mut instances = self.instances.write().await;
72            instances.insert(instance_id.clone(), Arc::new(instance.clone()));
73        }
74
75        // Start execution
76        self.execute_process(Arc::new(instance.clone())).await?;
77
78        Ok(Arc::new(instance))
79    }
80
81    /// Execute a process instance
82    ///
83    /// This is the main execution loop that processes the BPMN process.
84    async fn execute_process(&self, instance: Arc<ProcessInstance>) -> Result<(), EngineError> {
85        let definition = instance.definition();
86        
87        // Find start events
88        let start_events: Vec<String> = definition
89            .elements
90            .values()
91            .filter_map(|elem| {
92                match elem {
93                    crate::model::ProcessElement::StartEvent(_) => Some(elem.id().to_string()),
94                    _ => None,
95                }
96            })
97            .collect();
98
99        if start_events.is_empty() {
100            return Err(EngineError::NoStartEvent);
101        }
102
103        // Set current elements to start events
104        {
105            let mut context = instance.context_mut().await;
106            context.set_current_elements(start_events);
107        }
108
109        // Execute process loop
110        loop {
111            let should_continue = {
112                let mut context = instance.context_mut().await;
113                
114                if context.state != ProcessInstanceState::Active {
115                    break;
116                }
117
118                let current_elements = context.current_elements.clone();
119                context.clear_current_elements();
120
121                // Process each current element
122                for element_id in current_elements {
123                    // Get element from definition
124                    let element = match definition.get_element(&element_id) {
125                        Some(e) => e,
126                        None => {
127                            context.state = ProcessInstanceState::Failed;
128                            return Err(EngineError::ElementNotFound(element_id));
129                        }
130                    };
131
132                    // Create activity from element
133                    let activity = match self.activity_factory.create_activity(element) {
134                        Ok(a) => a,
135                        Err(e) => {
136                            context.state = ProcessInstanceState::Failed;
137                            return Err(EngineError::ActivityExecutionError(e));
138                        }
139                    };
140
141                    // Execute activity
142                    let activity_result = match activity.execute(&mut context).await {
143                        Ok(result) => result,
144                        Err(e) => {
145                            context.state = ProcessInstanceState::Failed;
146                            return Err(EngineError::ActivityExecutionError(e));
147                        }
148                    };
149
150                    // Record execution step
151                    let step_result = match &activity_result {
152                        ActivityResult::Completed { .. } => {
153                            crate::engine::context::ExecutionStepResult::Completed
154                        }
155                        ActivityResult::Waiting { reason } => {
156                            crate::engine::context::ExecutionStepResult::Waiting(reason.clone())
157                        }
158                        ActivityResult::Continue { .. } => {
159                            crate::engine::context::ExecutionStepResult::Completed
160                        }
161                    };
162                    context.add_execution_step(crate::engine::context::ExecutionStep {
163                        element_id: element_id.clone(),
164                        timestamp: std::time::SystemTime::now(),
165                        result: step_result,
166                    });
167
168                    // Handle activity result
169                    match activity_result {
170                        ActivityResult::Completed { .. } => {
171                            // Get outgoing flows
172                            let outgoing_flows = definition.get_outgoing_flows(&element_id);
173                            let mut next_elements = Vec::new();
174
175                            for flow in outgoing_flows {
176                                // Check condition if present
177                                if let Some(_condition) = &flow.condition_expression {
178                                    // TODO: Evaluate condition
179                                    // For now, assume condition is true
180                                }
181                                next_elements.push(flow.target_ref.clone());
182                            }
183
184                            // Check if this is an end event
185                            match element {
186                                crate::model::ProcessElement::EndEvent(_) => {
187                                    context.state = ProcessInstanceState::Completed;
188                                    return Ok(());
189                                }
190                                _ => {
191                                    context.current_elements.extend(next_elements);
192                                }
193                            }
194                        }
195                        ActivityResult::Waiting { .. } => {
196                            // Process is waiting, pause execution
197                            break;
198                        }
199                        ActivityResult::Continue { next_elements } => {
200                            // Check if any next element is an end event
201                            let mut has_end_event = false;
202                            for next_id in &next_elements {
203                                if let Some(next_elem) = definition.get_element(next_id) {
204                                    if matches!(next_elem, crate::model::ProcessElement::EndEvent(_)) {
205                                        has_end_event = true;
206                                        break;
207                                    }
208                                }
209                            }
210
211                            if has_end_event {
212                                context.state = ProcessInstanceState::Completed;
213                                return Ok(());
214                            }
215
216                            context.current_elements.extend(next_elements);
217                        }
218                    }
219                }
220
221                !context.current_elements.is_empty()
222            };
223
224            if !should_continue {
225                break;
226            }
227        }
228
229        Ok(())
230    }
231
232    /// Get a process instance by ID
233    pub async fn get_instance(&self, instance_id: &str) -> Option<Arc<ProcessInstance>> {
234        let instances = self.instances.read().await;
235        instances.get(instance_id).cloned()
236    }
237}
238
239impl Default for Engine {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245/// Engine Builder
246///
247/// Builder for creating Engine instances with custom configuration.
248#[derive(Debug, Default)]
249pub struct EngineBuilder {
250    // Future: Add configuration options
251}
252
253impl EngineBuilder {
254    /// Create a new builder
255    pub fn new() -> Self {
256        Self::default()
257    }
258
259    /// Build the engine
260    pub fn build(self) -> Engine {
261        Engine::new()
262    }
263}
264
265/// Engine Error
266#[derive(Debug, thiserror::Error)]
267pub enum EngineError {
268    #[error("No start event found in process")]
269    NoStartEvent,
270    #[error("Element not found: {0}")]
271    ElementNotFound(String),
272    #[error("Activity execution error: {0}")]
273    ActivityExecutionError(#[from] ActivityError),
274    #[error("Process execution failed: {0}")]
275    ExecutionFailed(String),
276}
277