rust_rule_engine/engine/
workflow.rs

1use crate::types::Value;
2use std::collections::HashMap;
3use std::time::{Duration, Instant};
4
5/// Represents the status of a workflow
6#[derive(Debug, Clone, PartialEq)]
7pub enum WorkflowStatus {
8    /// Workflow is currently running
9    Running,
10    /// Workflow has completed successfully
11    Completed,
12    /// Workflow has failed
13    Failed,
14    /// Workflow is paused
15    Paused,
16    /// Workflow is waiting for a scheduled task
17    Waiting,
18}
19
20/// Represents a scheduled task in the workflow
21#[derive(Debug, Clone)]
22pub struct ScheduledTask {
23    /// Name of the rule to execute
24    pub rule_name: String,
25    /// When to execute the task
26    pub execute_at: Instant,
27    /// Associated workflow ID
28    pub workflow_id: Option<String>,
29}
30
31/// Tracks the state of a workflow execution
32#[derive(Debug, Clone)]
33pub struct WorkflowState {
34    /// Unique workflow identifier
35    pub workflow_id: String,
36    /// Current active step/agenda group
37    pub current_step: Option<String>,
38    /// List of completed steps
39    pub completed_steps: Vec<String>,
40    /// Workflow-specific data storage
41    pub workflow_data: HashMap<String, Value>,
42    /// Current status of the workflow
43    pub status: WorkflowStatus,
44    /// Workflow start time
45    pub started_at: Instant,
46    /// Workflow completion time
47    pub completed_at: Option<Instant>,
48}
49
50impl WorkflowState {
51    /// Create a new workflow state
52    pub fn new(workflow_id: String) -> Self {
53        Self {
54            workflow_id,
55            current_step: None,
56            completed_steps: Vec::new(),
57            workflow_data: HashMap::new(),
58            status: WorkflowStatus::Running,
59            started_at: Instant::now(),
60            completed_at: None,
61        }
62    }
63
64    /// Mark a step as completed
65    pub fn complete_step(&mut self, step: String) {
66        if let Some(current) = &self.current_step {
67            if current == &step {
68                self.completed_steps.push(step);
69                self.current_step = None;
70            }
71        }
72    }
73
74    /// Set the current active step
75    pub fn set_current_step(&mut self, step: String) {
76        self.current_step = Some(step);
77    }
78
79    /// Complete the workflow
80    pub fn complete(&mut self) {
81        self.status = WorkflowStatus::Completed;
82        self.completed_at = Some(Instant::now());
83        self.current_step = None;
84    }
85
86    /// Fail the workflow
87    pub fn fail(&mut self) {
88        self.status = WorkflowStatus::Failed;
89        self.completed_at = Some(Instant::now());
90        self.current_step = None;
91    }
92
93    /// Set workflow data
94    pub fn set_data(&mut self, key: String, value: Value) {
95        self.workflow_data.insert(key, value);
96    }
97
98    /// Get workflow data
99    pub fn get_data(&self, key: &str) -> Option<&Value> {
100        self.workflow_data.get(key)
101    }
102
103    /// Get workflow duration
104    pub fn duration(&self) -> Duration {
105        match self.completed_at {
106            Some(end) => end.duration_since(self.started_at),
107            None => Instant::now().duration_since(self.started_at),
108        }
109    }
110}
111
112/// Manages workflow execution and scheduling
113#[derive(Debug)]
114pub struct WorkflowEngine {
115    /// Active workflows by ID
116    workflows: HashMap<String, WorkflowState>,
117    /// Scheduled tasks queue
118    scheduled_tasks: Vec<ScheduledTask>,
119    /// Queue of agenda groups to activate
120    agenda_activation_queue: Vec<String>,
121    /// Workflow execution counter
122    workflow_counter: u64,
123}
124
125impl WorkflowEngine {
126    /// Create a new workflow engine
127    pub fn new() -> Self {
128        Self {
129            workflows: HashMap::new(),
130            scheduled_tasks: Vec::new(),
131            agenda_activation_queue: Vec::new(),
132            workflow_counter: 0,
133        }
134    }
135
136    /// Start a new workflow
137    pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
138        self.workflow_counter += 1;
139        let workflow_id =
140            workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
141
142        let workflow_state = WorkflowState::new(workflow_id.clone());
143        self.workflows.insert(workflow_id.clone(), workflow_state);
144
145        println!("๐Ÿ”„ Started workflow: {}", workflow_id);
146        workflow_id
147    }
148
149    /// Activate an agenda group for workflow progression
150    pub fn activate_agenda_group(&mut self, group: String) {
151        self.agenda_activation_queue.push(group.clone());
152        println!("๐ŸŽฏ Queued agenda group activation: {}", group);
153    }
154
155    /// Schedule a rule to execute after a delay
156    pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
157        let task = ScheduledTask {
158            rule_name: rule_name.clone(),
159            execute_at: Instant::now() + Duration::from_millis(delay_ms),
160            workflow_id,
161        };
162
163        self.scheduled_tasks.push(task);
164        println!(
165            "โฐ Scheduled rule '{}' to execute in {}ms",
166            rule_name, delay_ms
167        );
168    }
169
170    /// Complete a workflow
171    pub fn complete_workflow(&mut self, workflow_name: String) {
172        if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
173            workflow.complete();
174            println!("โœ… Completed workflow: {}", workflow_name);
175        }
176    }
177
178    /// Set workflow data
179    pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
180        if let Some(workflow) = self.workflows.get_mut(workflow_id) {
181            workflow.set_data(key.clone(), value);
182            println!(
183                "๐Ÿ’พ Set workflow data: {} = {:?}",
184                key,
185                workflow.get_data(&key)
186            );
187        }
188    }
189
190    /// Get the next agenda group to activate
191    pub fn get_next_agenda_group(&mut self) -> Option<String> {
192        if !self.agenda_activation_queue.is_empty() {
193            Some(self.agenda_activation_queue.remove(0))
194        } else {
195            None
196        }
197    }
198
199    /// Get ready scheduled tasks
200    pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
201        let now = Instant::now();
202        let mut ready_tasks = Vec::new();
203
204        self.scheduled_tasks.retain(|task| {
205            if task.execute_at <= now {
206                ready_tasks.push(task.clone());
207                false // Remove from queue
208            } else {
209                true // Keep in queue
210            }
211        });
212
213        if !ready_tasks.is_empty() {
214            println!(
215                "โšก {} scheduled tasks are ready for execution",
216                ready_tasks.len()
217            );
218        }
219
220        ready_tasks
221    }
222
223    /// Get the next pending agenda activation (for syncing with agenda manager)
224    pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
225        if !self.agenda_activation_queue.is_empty() {
226            Some(self.agenda_activation_queue.remove(0))
227        } else {
228            None
229        }
230    }
231
232    /// Get workflow state by ID
233    pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
234        self.workflows.get(workflow_id)
235    }
236
237    /// Get all active workflows
238    pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
239        self.workflows
240            .values()
241            .filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
242            .collect()
243    }
244
245    /// Get workflow statistics
246    pub fn get_workflow_stats(&self) -> WorkflowStats {
247        let total = self.workflows.len();
248        let running = self
249            .workflows
250            .values()
251            .filter(|w| w.status == WorkflowStatus::Running)
252            .count();
253        let completed = self
254            .workflows
255            .values()
256            .filter(|w| w.status == WorkflowStatus::Completed)
257            .count();
258        let failed = self
259            .workflows
260            .values()
261            .filter(|w| w.status == WorkflowStatus::Failed)
262            .count();
263        let scheduled_tasks = self.scheduled_tasks.len();
264
265        WorkflowStats {
266            total_workflows: total,
267            running_workflows: running,
268            completed_workflows: completed,
269            failed_workflows: failed,
270            pending_scheduled_tasks: scheduled_tasks,
271            pending_agenda_activations: self.agenda_activation_queue.len(),
272        }
273    }
274
275    /// Clean up completed workflows older than specified duration
276    pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
277        let cutoff = Instant::now() - older_than;
278        let initial_count = self.workflows.len();
279
280        self.workflows.retain(|_, workflow| {
281            if workflow.status == WorkflowStatus::Completed
282                || workflow.status == WorkflowStatus::Failed
283            {
284                if let Some(completed_at) = workflow.completed_at {
285                    completed_at > cutoff
286                } else {
287                    true // Keep if no completion time
288                }
289            } else {
290                true // Keep active workflows
291            }
292        });
293
294        let cleaned = initial_count - self.workflows.len();
295        if cleaned > 0 {
296            println!("๐Ÿงน Cleaned up {} completed workflows", cleaned);
297        }
298    }
299}
300
301impl Default for WorkflowEngine {
302    fn default() -> Self {
303        Self::new()
304    }
305}
306
307/// Workflow execution statistics
308#[derive(Debug, Clone)]
309pub struct WorkflowStats {
310    /// Total number of workflows created
311    pub total_workflows: usize,
312    /// Number of currently running workflows
313    pub running_workflows: usize,
314    /// Number of completed workflows
315    pub completed_workflows: usize,
316    /// Number of failed workflows
317    pub failed_workflows: usize,
318    /// Number of pending scheduled tasks
319    pub pending_scheduled_tasks: usize,
320    /// Number of pending agenda group activations
321    pub pending_agenda_activations: usize,
322}
323
324/// Workflow execution result
325#[derive(Debug, Clone)]
326pub struct WorkflowResult {
327    /// Workflow execution was successful
328    pub success: bool,
329    /// Number of workflow steps executed
330    pub steps_executed: usize,
331    /// Total execution time
332    pub execution_time: Duration,
333    /// Final workflow status
334    pub final_status: WorkflowStatus,
335    /// Any error message if failed
336    pub error_message: Option<String>,
337}
338
339impl WorkflowResult {
340    /// Create a successful workflow result
341    pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
342        Self {
343            success: true,
344            steps_executed,
345            execution_time,
346            final_status: WorkflowStatus::Completed,
347            error_message: None,
348        }
349    }
350
351    /// Create a failed workflow result
352    pub fn failure(error_message: String) -> Self {
353        Self {
354            success: false,
355            steps_executed: 0,
356            execution_time: Duration::from_millis(0),
357            final_status: WorkflowStatus::Failed,
358            error_message: Some(error_message),
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn test_workflow_state_creation() {
369        let workflow = WorkflowState::new("test_workflow".to_string());
370        assert_eq!(workflow.workflow_id, "test_workflow");
371        assert_eq!(workflow.status, WorkflowStatus::Running);
372        assert!(workflow.current_step.is_none());
373        assert!(workflow.completed_steps.is_empty());
374    }
375
376    #[test]
377    fn test_workflow_engine_creation() {
378        let engine = WorkflowEngine::new();
379        assert_eq!(engine.workflows.len(), 0);
380        assert_eq!(engine.scheduled_tasks.len(), 0);
381    }
382
383    #[test]
384    fn test_start_workflow() {
385        let mut engine = WorkflowEngine::new();
386        let workflow_id = engine.start_workflow(Some("test".to_string()));
387        assert_eq!(workflow_id, "test");
388        assert!(engine.get_workflow("test").is_some());
389    }
390
391    #[test]
392    fn test_schedule_rule() {
393        let mut engine = WorkflowEngine::new();
394        engine.schedule_rule("test_rule".to_string(), 1000, None);
395        assert_eq!(engine.scheduled_tasks.len(), 1);
396    }
397
398    #[test]
399    fn test_workflow_stats() {
400        let mut engine = WorkflowEngine::new();
401        engine.start_workflow(Some("test1".to_string()));
402        engine.start_workflow(Some("test2".to_string()));
403
404        let stats = engine.get_workflow_stats();
405        assert_eq!(stats.total_workflows, 2);
406        assert_eq!(stats.running_workflows, 2);
407    }
408}