rust_rule_engine/engine/
workflow.rs

1use crate::errors::Result;
2use crate::types::Value;
3use chrono::{DateTime, Utc};
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7/// Represents the status of a workflow
8#[derive(Debug, Clone, PartialEq)]
9pub enum WorkflowStatus {
10    /// Workflow is currently running
11    Running,
12    /// Workflow has completed successfully
13    Completed,
14    /// Workflow has failed
15    Failed,
16    /// Workflow is paused
17    Paused,
18    /// Workflow is waiting for a scheduled task
19    Waiting,
20}
21
22/// Represents a scheduled task in the workflow
23#[derive(Debug, Clone)]
24pub struct ScheduledTask {
25    /// Name of the rule to execute
26    pub rule_name: String,
27    /// When to execute the task
28    pub execute_at: Instant,
29    /// Associated workflow ID
30    pub workflow_id: Option<String>,
31}
32
33/// Tracks the state of a workflow execution
34#[derive(Debug, Clone)]
35pub struct WorkflowState {
36    /// Unique workflow identifier
37    pub workflow_id: String,
38    /// Current active step/agenda group
39    pub current_step: Option<String>,
40    /// List of completed steps
41    pub completed_steps: Vec<String>,
42    /// Workflow-specific data storage
43    pub workflow_data: HashMap<String, Value>,
44    /// Current status of the workflow
45    pub status: WorkflowStatus,
46    /// Workflow start time
47    pub started_at: Instant,
48    /// Workflow completion time
49    pub completed_at: Option<Instant>,
50}
51
52impl WorkflowState {
53    /// Create a new workflow state
54    pub fn new(workflow_id: String) -> Self {
55        Self {
56            workflow_id,
57            current_step: None,
58            completed_steps: Vec::new(),
59            workflow_data: HashMap::new(),
60            status: WorkflowStatus::Running,
61            started_at: Instant::now(),
62            completed_at: None,
63        }
64    }
65
66    /// Mark a step as completed
67    pub fn complete_step(&mut self, step: String) {
68        if let Some(current) = &self.current_step {
69            if current == &step {
70                self.completed_steps.push(step);
71                self.current_step = None;
72            }
73        }
74    }
75
76    /// Set the current active step
77    pub fn set_current_step(&mut self, step: String) {
78        self.current_step = Some(step);
79    }
80
81    /// Complete the workflow
82    pub fn complete(&mut self) {
83        self.status = WorkflowStatus::Completed;
84        self.completed_at = Some(Instant::now());
85        self.current_step = None;
86    }
87
88    /// Fail the workflow
89    pub fn fail(&mut self) {
90        self.status = WorkflowStatus::Failed;
91        self.completed_at = Some(Instant::now());
92        self.current_step = None;
93    }
94
95    /// Set workflow data
96    pub fn set_data(&mut self, key: String, value: Value) {
97        self.workflow_data.insert(key, value);
98    }
99
100    /// Get workflow data
101    pub fn get_data(&self, key: &str) -> Option<&Value> {
102        self.workflow_data.get(key)
103    }
104
105    /// Get workflow duration
106    pub fn duration(&self) -> Duration {
107        match self.completed_at {
108            Some(end) => end.duration_since(self.started_at),
109            None => Instant::now().duration_since(self.started_at),
110        }
111    }
112}
113
114/// Manages workflow execution and scheduling
115#[derive(Debug)]
116pub struct WorkflowEngine {
117    /// Active workflows by ID
118    workflows: HashMap<String, WorkflowState>,
119    /// Scheduled tasks queue
120    scheduled_tasks: Vec<ScheduledTask>,
121    /// Queue of agenda groups to activate
122    agenda_activation_queue: Vec<String>,
123    /// Workflow execution counter
124    workflow_counter: u64,
125}
126
127impl WorkflowEngine {
128    /// Create a new workflow engine
129    pub fn new() -> Self {
130        Self {
131            workflows: HashMap::new(),
132            scheduled_tasks: Vec::new(),
133            agenda_activation_queue: Vec::new(),
134            workflow_counter: 0,
135        }
136    }
137
138    /// Start a new workflow
139    pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
140        self.workflow_counter += 1;
141        let workflow_id =
142            workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
143
144        let workflow_state = WorkflowState::new(workflow_id.clone());
145        self.workflows.insert(workflow_id.clone(), workflow_state);
146
147        println!("๐Ÿ”„ Started workflow: {}", workflow_id);
148        workflow_id
149    }
150
151    /// Activate an agenda group for workflow progression
152    pub fn activate_agenda_group(&mut self, group: String) {
153        self.agenda_activation_queue.push(group.clone());
154        println!("๐ŸŽฏ Queued agenda group activation: {}", group);
155    }
156
157    /// Schedule a rule to execute after a delay
158    pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
159        let task = ScheduledTask {
160            rule_name: rule_name.clone(),
161            execute_at: Instant::now() + Duration::from_millis(delay_ms),
162            workflow_id,
163        };
164
165        self.scheduled_tasks.push(task);
166        println!(
167            "โฐ Scheduled rule '{}' to execute in {}ms",
168            rule_name, delay_ms
169        );
170    }
171
172    /// Complete a workflow
173    pub fn complete_workflow(&mut self, workflow_name: String) {
174        if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
175            workflow.complete();
176            println!("โœ… Completed workflow: {}", workflow_name);
177        }
178    }
179
180    /// Set workflow data
181    pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
182        if let Some(workflow) = self.workflows.get_mut(workflow_id) {
183            workflow.set_data(key.clone(), value);
184            println!(
185                "๐Ÿ’พ Set workflow data: {} = {:?}",
186                key,
187                workflow.get_data(&key)
188            );
189        }
190    }
191
192    /// Get the next agenda group to activate
193    pub fn get_next_agenda_group(&mut self) -> Option<String> {
194        if !self.agenda_activation_queue.is_empty() {
195            Some(self.agenda_activation_queue.remove(0))
196        } else {
197            None
198        }
199    }
200
201    /// Get ready scheduled tasks
202    pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
203        let now = Instant::now();
204        let mut ready_tasks = Vec::new();
205
206        self.scheduled_tasks.retain(|task| {
207            if task.execute_at <= now {
208                ready_tasks.push(task.clone());
209                false // Remove from queue
210            } else {
211                true // Keep in queue
212            }
213        });
214
215        if !ready_tasks.is_empty() {
216            println!(
217                "โšก {} scheduled tasks are ready for execution",
218                ready_tasks.len()
219            );
220        }
221
222        ready_tasks
223    }
224
225    /// Get the next pending agenda activation (for syncing with agenda manager)
226    pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
227        if !self.agenda_activation_queue.is_empty() {
228            Some(self.agenda_activation_queue.remove(0))
229        } else {
230            None
231        }
232    }
233
234    /// Get workflow state by ID
235    pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
236        self.workflows.get(workflow_id)
237    }
238
239    /// Get all active workflows
240    pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
241        self.workflows
242            .values()
243            .filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
244            .collect()
245    }
246
247    /// Get workflow statistics
248    pub fn get_workflow_stats(&self) -> WorkflowStats {
249        let total = self.workflows.len();
250        let running = self
251            .workflows
252            .values()
253            .filter(|w| w.status == WorkflowStatus::Running)
254            .count();
255        let completed = self
256            .workflows
257            .values()
258            .filter(|w| w.status == WorkflowStatus::Completed)
259            .count();
260        let failed = self
261            .workflows
262            .values()
263            .filter(|w| w.status == WorkflowStatus::Failed)
264            .count();
265        let scheduled_tasks = self.scheduled_tasks.len();
266
267        WorkflowStats {
268            total_workflows: total,
269            running_workflows: running,
270            completed_workflows: completed,
271            failed_workflows: failed,
272            pending_scheduled_tasks: scheduled_tasks,
273            pending_agenda_activations: self.agenda_activation_queue.len(),
274        }
275    }
276
277    /// Clean up completed workflows older than specified duration
278    pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
279        let cutoff = Instant::now() - older_than;
280        let initial_count = self.workflows.len();
281
282        self.workflows.retain(|_, workflow| {
283            if workflow.status == WorkflowStatus::Completed
284                || workflow.status == WorkflowStatus::Failed
285            {
286                if let Some(completed_at) = workflow.completed_at {
287                    completed_at > cutoff
288                } else {
289                    true // Keep if no completion time
290                }
291            } else {
292                true // Keep active workflows
293            }
294        });
295
296        let cleaned = initial_count - self.workflows.len();
297        if cleaned > 0 {
298            println!("๐Ÿงน Cleaned up {} completed workflows", cleaned);
299        }
300    }
301}
302
303impl Default for WorkflowEngine {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309/// Workflow execution statistics
310#[derive(Debug, Clone)]
311pub struct WorkflowStats {
312    /// Total number of workflows created
313    pub total_workflows: usize,
314    /// Number of currently running workflows
315    pub running_workflows: usize,
316    /// Number of completed workflows
317    pub completed_workflows: usize,
318    /// Number of failed workflows
319    pub failed_workflows: usize,
320    /// Number of pending scheduled tasks
321    pub pending_scheduled_tasks: usize,
322    /// Number of pending agenda group activations
323    pub pending_agenda_activations: usize,
324}
325
326/// Workflow execution result
327#[derive(Debug, Clone)]
328pub struct WorkflowResult {
329    /// Workflow execution was successful
330    pub success: bool,
331    /// Number of workflow steps executed
332    pub steps_executed: usize,
333    /// Total execution time
334    pub execution_time: Duration,
335    /// Final workflow status
336    pub final_status: WorkflowStatus,
337    /// Any error message if failed
338    pub error_message: Option<String>,
339}
340
341impl WorkflowResult {
342    /// Create a successful workflow result
343    pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
344        Self {
345            success: true,
346            steps_executed,
347            execution_time,
348            final_status: WorkflowStatus::Completed,
349            error_message: None,
350        }
351    }
352
353    /// Create a failed workflow result
354    pub fn failure(error_message: String) -> Self {
355        Self {
356            success: false,
357            steps_executed: 0,
358            execution_time: Duration::from_millis(0),
359            final_status: WorkflowStatus::Failed,
360            error_message: Some(error_message),
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn test_workflow_state_creation() {
371        let workflow = WorkflowState::new("test_workflow".to_string());
372        assert_eq!(workflow.workflow_id, "test_workflow");
373        assert_eq!(workflow.status, WorkflowStatus::Running);
374        assert!(workflow.current_step.is_none());
375        assert!(workflow.completed_steps.is_empty());
376    }
377
378    #[test]
379    fn test_workflow_engine_creation() {
380        let engine = WorkflowEngine::new();
381        assert_eq!(engine.workflows.len(), 0);
382        assert_eq!(engine.scheduled_tasks.len(), 0);
383    }
384
385    #[test]
386    fn test_start_workflow() {
387        let mut engine = WorkflowEngine::new();
388        let workflow_id = engine.start_workflow(Some("test".to_string()));
389        assert_eq!(workflow_id, "test");
390        assert!(engine.get_workflow("test").is_some());
391    }
392
393    #[test]
394    fn test_schedule_rule() {
395        let mut engine = WorkflowEngine::new();
396        engine.schedule_rule("test_rule".to_string(), 1000, None);
397        assert_eq!(engine.scheduled_tasks.len(), 1);
398    }
399
400    #[test]
401    fn test_workflow_stats() {
402        let mut engine = WorkflowEngine::new();
403        engine.start_workflow(Some("test1".to_string()));
404        engine.start_workflow(Some("test2".to_string()));
405
406        let stats = engine.get_workflow_stats();
407        assert_eq!(stats.total_workflows, 2);
408        assert_eq!(stats.running_workflows, 2);
409    }
410}