rust_rule_engine/engine/
workflow.rs

1use crate::errors::Result;
2use crate::types::Value;
3use chrono::{DateTime, Utc};
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7/// Represents the status of a workflow
8#[derive(Debug, Clone, PartialEq)]
9pub enum WorkflowStatus {
10    /// Workflow is currently running
11    Running,
12    /// Workflow has completed successfully
13    Completed,
14    /// Workflow has failed
15    Failed,
16    /// Workflow is paused
17    Paused,
18    /// Workflow is waiting for a scheduled task
19    Waiting,
20}
21
22/// Represents a scheduled task in the workflow
23#[derive(Debug, Clone)]
24pub struct ScheduledTask {
25    /// Name of the rule to execute
26    pub rule_name: String,
27    /// When to execute the task
28    pub execute_at: Instant,
29    /// Associated workflow ID
30    pub workflow_id: Option<String>,
31}
32
33/// Tracks the state of a workflow execution
34#[derive(Debug, Clone)]
35pub struct WorkflowState {
36    /// Unique workflow identifier
37    pub workflow_id: String,
38    /// Current active step/agenda group
39    pub current_step: Option<String>,
40    /// List of completed steps
41    pub completed_steps: Vec<String>,
42    /// Workflow-specific data storage
43    pub workflow_data: HashMap<String, Value>,
44    /// Current status of the workflow
45    pub status: WorkflowStatus,
46    /// Workflow start time
47    pub started_at: Instant,
48    /// Workflow completion time
49    pub completed_at: Option<Instant>,
50}
51
52impl WorkflowState {
53    /// Create a new workflow state
54    pub fn new(workflow_id: String) -> Self {
55        Self {
56            workflow_id,
57            current_step: None,
58            completed_steps: Vec::new(),
59            workflow_data: HashMap::new(),
60            status: WorkflowStatus::Running,
61            started_at: Instant::now(),
62            completed_at: None,
63        }
64    }
65
66    /// Mark a step as completed
67    pub fn complete_step(&mut self, step: String) {
68        if let Some(current) = &self.current_step {
69            if current == &step {
70                self.completed_steps.push(step);
71                self.current_step = None;
72            }
73        }
74    }
75
76    /// Set the current active step
77    pub fn set_current_step(&mut self, step: String) {
78        self.current_step = Some(step);
79    }
80
81    /// Complete the workflow
82    pub fn complete(&mut self) {
83        self.status = WorkflowStatus::Completed;
84        self.completed_at = Some(Instant::now());
85        self.current_step = None;
86    }
87
88    /// Fail the workflow
89    pub fn fail(&mut self) {
90        self.status = WorkflowStatus::Failed;
91        self.completed_at = Some(Instant::now());
92        self.current_step = None;
93    }
94
95    /// Set workflow data
96    pub fn set_data(&mut self, key: String, value: Value) {
97        self.workflow_data.insert(key, value);
98    }
99
100    /// Get workflow data
101    pub fn get_data(&self, key: &str) -> Option<&Value> {
102        self.workflow_data.get(key)
103    }
104
105    /// Get workflow duration
106    pub fn duration(&self) -> Duration {
107        match self.completed_at {
108            Some(end) => end.duration_since(self.started_at),
109            None => Instant::now().duration_since(self.started_at),
110        }
111    }
112}
113
114/// Manages workflow execution and scheduling
115#[derive(Debug)]
116pub struct WorkflowEngine {
117    /// Active workflows by ID
118    workflows: HashMap<String, WorkflowState>,
119    /// Scheduled tasks queue
120    scheduled_tasks: Vec<ScheduledTask>,
121    /// Queue of agenda groups to activate
122    agenda_activation_queue: Vec<String>,
123    /// Workflow execution counter
124    workflow_counter: u64,
125}
126
127impl WorkflowEngine {
128    /// Create a new workflow engine
129    pub fn new() -> Self {
130        Self {
131            workflows: HashMap::new(),
132            scheduled_tasks: Vec::new(),
133            agenda_activation_queue: Vec::new(),
134            workflow_counter: 0,
135        }
136    }
137
138    /// Start a new workflow
139    pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
140        self.workflow_counter += 1;
141        let workflow_id = workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
142        
143        let workflow_state = WorkflowState::new(workflow_id.clone());
144        self.workflows.insert(workflow_id.clone(), workflow_state);
145        
146        println!("๐Ÿ”„ Started workflow: {}", workflow_id);
147        workflow_id
148    }
149
150    /// Activate an agenda group for workflow progression
151    pub fn activate_agenda_group(&mut self, group: String) {
152        self.agenda_activation_queue.push(group.clone());
153        println!("๐ŸŽฏ Queued agenda group activation: {}", group);
154    }
155
156    /// Schedule a rule to execute after a delay
157    pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
158        let task = ScheduledTask {
159            rule_name: rule_name.clone(),
160            execute_at: Instant::now() + Duration::from_millis(delay_ms),
161            workflow_id,
162        };
163        
164        self.scheduled_tasks.push(task);
165        println!("โฐ Scheduled rule '{}' to execute in {}ms", rule_name, delay_ms);
166    }
167
168    /// Complete a workflow
169    pub fn complete_workflow(&mut self, workflow_name: String) {
170        if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
171            workflow.complete();
172            println!("โœ… Completed workflow: {}", workflow_name);
173        }
174    }
175
176    /// Set workflow data
177    pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
178        if let Some(workflow) = self.workflows.get_mut(workflow_id) {
179            workflow.set_data(key.clone(), value);
180            println!("๐Ÿ’พ Set workflow data: {} = {:?}", key, workflow.get_data(&key));
181        }
182    }
183
184    /// Get the next agenda group to activate
185    pub fn get_next_agenda_group(&mut self) -> Option<String> {
186        if !self.agenda_activation_queue.is_empty() {
187            Some(self.agenda_activation_queue.remove(0))
188        } else {
189            None
190        }
191    }
192
193    /// Get ready scheduled tasks
194    pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
195        let now = Instant::now();
196        let mut ready_tasks = Vec::new();
197        
198        self.scheduled_tasks.retain(|task| {
199            if task.execute_at <= now {
200                ready_tasks.push(task.clone());
201                false // Remove from queue
202            } else {
203                true // Keep in queue
204            }
205        });
206        
207        if !ready_tasks.is_empty() {
208            println!("โšก {} scheduled tasks are ready for execution", ready_tasks.len());
209        }
210        
211        ready_tasks
212    }
213
214    /// Get the next pending agenda activation (for syncing with agenda manager)
215    pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
216        if !self.agenda_activation_queue.is_empty() {
217            Some(self.agenda_activation_queue.remove(0))
218        } else {
219            None
220        }
221    }
222
223    /// Get workflow state by ID
224    pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
225        self.workflows.get(workflow_id)
226    }
227
228    /// Get all active workflows
229    pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
230        self.workflows
231            .values()
232            .filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
233            .collect()
234    }
235
236    /// Get workflow statistics
237    pub fn get_workflow_stats(&self) -> WorkflowStats {
238        let total = self.workflows.len();
239        let running = self.workflows.values().filter(|w| w.status == WorkflowStatus::Running).count();
240        let completed = self.workflows.values().filter(|w| w.status == WorkflowStatus::Completed).count();
241        let failed = self.workflows.values().filter(|w| w.status == WorkflowStatus::Failed).count();
242        let scheduled_tasks = self.scheduled_tasks.len();
243
244        WorkflowStats {
245            total_workflows: total,
246            running_workflows: running,
247            completed_workflows: completed,
248            failed_workflows: failed,
249            pending_scheduled_tasks: scheduled_tasks,
250            pending_agenda_activations: self.agenda_activation_queue.len(),
251        }
252    }
253
254    /// Clean up completed workflows older than specified duration
255    pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
256        let cutoff = Instant::now() - older_than;
257        let initial_count = self.workflows.len();
258        
259        self.workflows.retain(|_, workflow| {
260            if workflow.status == WorkflowStatus::Completed || workflow.status == WorkflowStatus::Failed {
261                if let Some(completed_at) = workflow.completed_at {
262                    completed_at > cutoff
263                } else {
264                    true // Keep if no completion time
265                }
266            } else {
267                true // Keep active workflows
268            }
269        });
270        
271        let cleaned = initial_count - self.workflows.len();
272        if cleaned > 0 {
273            println!("๐Ÿงน Cleaned up {} completed workflows", cleaned);
274        }
275    }
276}
277
278impl Default for WorkflowEngine {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284/// Workflow execution statistics
285#[derive(Debug, Clone)]
286pub struct WorkflowStats {
287    /// Total number of workflows created
288    pub total_workflows: usize,
289    /// Number of currently running workflows
290    pub running_workflows: usize,
291    /// Number of completed workflows
292    pub completed_workflows: usize,
293    /// Number of failed workflows
294    pub failed_workflows: usize,
295    /// Number of pending scheduled tasks
296    pub pending_scheduled_tasks: usize,
297    /// Number of pending agenda group activations
298    pub pending_agenda_activations: usize,
299}
300
301/// Workflow execution result
302#[derive(Debug, Clone)]
303pub struct WorkflowResult {
304    /// Workflow execution was successful
305    pub success: bool,
306    /// Number of workflow steps executed
307    pub steps_executed: usize,
308    /// Total execution time
309    pub execution_time: Duration,
310    /// Final workflow status
311    pub final_status: WorkflowStatus,
312    /// Any error message if failed
313    pub error_message: Option<String>,
314}
315
316impl WorkflowResult {
317    /// Create a successful workflow result
318    pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
319        Self {
320            success: true,
321            steps_executed,
322            execution_time,
323            final_status: WorkflowStatus::Completed,
324            error_message: None,
325        }
326    }
327
328    /// Create a failed workflow result
329    pub fn failure(error_message: String) -> Self {
330        Self {
331            success: false,
332            steps_executed: 0,
333            execution_time: Duration::from_millis(0),
334            final_status: WorkflowStatus::Failed,
335            error_message: Some(error_message),
336        }
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn test_workflow_state_creation() {
346        let workflow = WorkflowState::new("test_workflow".to_string());
347        assert_eq!(workflow.workflow_id, "test_workflow");
348        assert_eq!(workflow.status, WorkflowStatus::Running);
349        assert!(workflow.current_step.is_none());
350        assert!(workflow.completed_steps.is_empty());
351    }
352
353    #[test]
354    fn test_workflow_engine_creation() {
355        let engine = WorkflowEngine::new();
356        assert_eq!(engine.workflows.len(), 0);
357        assert_eq!(engine.scheduled_tasks.len(), 0);
358    }
359
360    #[test]
361    fn test_start_workflow() {
362        let mut engine = WorkflowEngine::new();
363        let workflow_id = engine.start_workflow(Some("test".to_string()));
364        assert_eq!(workflow_id, "test");
365        assert!(engine.get_workflow("test").is_some());
366    }
367
368    #[test]
369    fn test_schedule_rule() {
370        let mut engine = WorkflowEngine::new();
371        engine.schedule_rule("test_rule".to_string(), 1000, None);
372        assert_eq!(engine.scheduled_tasks.len(), 1);
373    }
374
375    #[test]
376    fn test_workflow_stats() {
377        let mut engine = WorkflowEngine::new();
378        engine.start_workflow(Some("test1".to_string()));
379        engine.start_workflow(Some("test2".to_string()));
380        
381        let stats = engine.get_workflow_stats();
382        assert_eq!(stats.total_workflows, 2);
383        assert_eq!(stats.running_workflows, 2);
384    }
385}