1use crate::errors::Result;
2use crate::types::Value;
3use chrono::{DateTime, Utc};
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7#[derive(Debug, Clone, PartialEq)]
9pub enum WorkflowStatus {
10 Running,
12 Completed,
14 Failed,
16 Paused,
18 Waiting,
20}
21
22#[derive(Debug, Clone)]
24pub struct ScheduledTask {
25 pub rule_name: String,
27 pub execute_at: Instant,
29 pub workflow_id: Option<String>,
31}
32
33#[derive(Debug, Clone)]
35pub struct WorkflowState {
36 pub workflow_id: String,
38 pub current_step: Option<String>,
40 pub completed_steps: Vec<String>,
42 pub workflow_data: HashMap<String, Value>,
44 pub status: WorkflowStatus,
46 pub started_at: Instant,
48 pub completed_at: Option<Instant>,
50}
51
52impl WorkflowState {
53 pub fn new(workflow_id: String) -> Self {
55 Self {
56 workflow_id,
57 current_step: None,
58 completed_steps: Vec::new(),
59 workflow_data: HashMap::new(),
60 status: WorkflowStatus::Running,
61 started_at: Instant::now(),
62 completed_at: None,
63 }
64 }
65
66 pub fn complete_step(&mut self, step: String) {
68 if let Some(current) = &self.current_step {
69 if current == &step {
70 self.completed_steps.push(step);
71 self.current_step = None;
72 }
73 }
74 }
75
76 pub fn set_current_step(&mut self, step: String) {
78 self.current_step = Some(step);
79 }
80
81 pub fn complete(&mut self) {
83 self.status = WorkflowStatus::Completed;
84 self.completed_at = Some(Instant::now());
85 self.current_step = None;
86 }
87
88 pub fn fail(&mut self) {
90 self.status = WorkflowStatus::Failed;
91 self.completed_at = Some(Instant::now());
92 self.current_step = None;
93 }
94
95 pub fn set_data(&mut self, key: String, value: Value) {
97 self.workflow_data.insert(key, value);
98 }
99
100 pub fn get_data(&self, key: &str) -> Option<&Value> {
102 self.workflow_data.get(key)
103 }
104
105 pub fn duration(&self) -> Duration {
107 match self.completed_at {
108 Some(end) => end.duration_since(self.started_at),
109 None => Instant::now().duration_since(self.started_at),
110 }
111 }
112}
113
114#[derive(Debug)]
116pub struct WorkflowEngine {
117 workflows: HashMap<String, WorkflowState>,
119 scheduled_tasks: Vec<ScheduledTask>,
121 agenda_activation_queue: Vec<String>,
123 workflow_counter: u64,
125}
126
127impl WorkflowEngine {
128 pub fn new() -> Self {
130 Self {
131 workflows: HashMap::new(),
132 scheduled_tasks: Vec::new(),
133 agenda_activation_queue: Vec::new(),
134 workflow_counter: 0,
135 }
136 }
137
138 pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
140 self.workflow_counter += 1;
141 let workflow_id = workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
142
143 let workflow_state = WorkflowState::new(workflow_id.clone());
144 self.workflows.insert(workflow_id.clone(), workflow_state);
145
146 println!("๐ Started workflow: {}", workflow_id);
147 workflow_id
148 }
149
150 pub fn activate_agenda_group(&mut self, group: String) {
152 self.agenda_activation_queue.push(group.clone());
153 println!("๐ฏ Queued agenda group activation: {}", group);
154 }
155
156 pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
158 let task = ScheduledTask {
159 rule_name: rule_name.clone(),
160 execute_at: Instant::now() + Duration::from_millis(delay_ms),
161 workflow_id,
162 };
163
164 self.scheduled_tasks.push(task);
165 println!("โฐ Scheduled rule '{}' to execute in {}ms", rule_name, delay_ms);
166 }
167
168 pub fn complete_workflow(&mut self, workflow_name: String) {
170 if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
171 workflow.complete();
172 println!("โ
Completed workflow: {}", workflow_name);
173 }
174 }
175
176 pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
178 if let Some(workflow) = self.workflows.get_mut(workflow_id) {
179 workflow.set_data(key.clone(), value);
180 println!("๐พ Set workflow data: {} = {:?}", key, workflow.get_data(&key));
181 }
182 }
183
184 pub fn get_next_agenda_group(&mut self) -> Option<String> {
186 if !self.agenda_activation_queue.is_empty() {
187 Some(self.agenda_activation_queue.remove(0))
188 } else {
189 None
190 }
191 }
192
193 pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
195 let now = Instant::now();
196 let mut ready_tasks = Vec::new();
197
198 self.scheduled_tasks.retain(|task| {
199 if task.execute_at <= now {
200 ready_tasks.push(task.clone());
201 false } else {
203 true }
205 });
206
207 if !ready_tasks.is_empty() {
208 println!("โก {} scheduled tasks are ready for execution", ready_tasks.len());
209 }
210
211 ready_tasks
212 }
213
214 pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
216 if !self.agenda_activation_queue.is_empty() {
217 Some(self.agenda_activation_queue.remove(0))
218 } else {
219 None
220 }
221 }
222
223 pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
225 self.workflows.get(workflow_id)
226 }
227
228 pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
230 self.workflows
231 .values()
232 .filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
233 .collect()
234 }
235
236 pub fn get_workflow_stats(&self) -> WorkflowStats {
238 let total = self.workflows.len();
239 let running = self.workflows.values().filter(|w| w.status == WorkflowStatus::Running).count();
240 let completed = self.workflows.values().filter(|w| w.status == WorkflowStatus::Completed).count();
241 let failed = self.workflows.values().filter(|w| w.status == WorkflowStatus::Failed).count();
242 let scheduled_tasks = self.scheduled_tasks.len();
243
244 WorkflowStats {
245 total_workflows: total,
246 running_workflows: running,
247 completed_workflows: completed,
248 failed_workflows: failed,
249 pending_scheduled_tasks: scheduled_tasks,
250 pending_agenda_activations: self.agenda_activation_queue.len(),
251 }
252 }
253
254 pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
256 let cutoff = Instant::now() - older_than;
257 let initial_count = self.workflows.len();
258
259 self.workflows.retain(|_, workflow| {
260 if workflow.status == WorkflowStatus::Completed || workflow.status == WorkflowStatus::Failed {
261 if let Some(completed_at) = workflow.completed_at {
262 completed_at > cutoff
263 } else {
264 true }
266 } else {
267 true }
269 });
270
271 let cleaned = initial_count - self.workflows.len();
272 if cleaned > 0 {
273 println!("๐งน Cleaned up {} completed workflows", cleaned);
274 }
275 }
276}
277
278impl Default for WorkflowEngine {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284#[derive(Debug, Clone)]
286pub struct WorkflowStats {
287 pub total_workflows: usize,
289 pub running_workflows: usize,
291 pub completed_workflows: usize,
293 pub failed_workflows: usize,
295 pub pending_scheduled_tasks: usize,
297 pub pending_agenda_activations: usize,
299}
300
301#[derive(Debug, Clone)]
303pub struct WorkflowResult {
304 pub success: bool,
306 pub steps_executed: usize,
308 pub execution_time: Duration,
310 pub final_status: WorkflowStatus,
312 pub error_message: Option<String>,
314}
315
316impl WorkflowResult {
317 pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
319 Self {
320 success: true,
321 steps_executed,
322 execution_time,
323 final_status: WorkflowStatus::Completed,
324 error_message: None,
325 }
326 }
327
328 pub fn failure(error_message: String) -> Self {
330 Self {
331 success: false,
332 steps_executed: 0,
333 execution_time: Duration::from_millis(0),
334 final_status: WorkflowStatus::Failed,
335 error_message: Some(error_message),
336 }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn test_workflow_state_creation() {
346 let workflow = WorkflowState::new("test_workflow".to_string());
347 assert_eq!(workflow.workflow_id, "test_workflow");
348 assert_eq!(workflow.status, WorkflowStatus::Running);
349 assert!(workflow.current_step.is_none());
350 assert!(workflow.completed_steps.is_empty());
351 }
352
353 #[test]
354 fn test_workflow_engine_creation() {
355 let engine = WorkflowEngine::new();
356 assert_eq!(engine.workflows.len(), 0);
357 assert_eq!(engine.scheduled_tasks.len(), 0);
358 }
359
360 #[test]
361 fn test_start_workflow() {
362 let mut engine = WorkflowEngine::new();
363 let workflow_id = engine.start_workflow(Some("test".to_string()));
364 assert_eq!(workflow_id, "test");
365 assert!(engine.get_workflow("test").is_some());
366 }
367
368 #[test]
369 fn test_schedule_rule() {
370 let mut engine = WorkflowEngine::new();
371 engine.schedule_rule("test_rule".to_string(), 1000, None);
372 assert_eq!(engine.scheduled_tasks.len(), 1);
373 }
374
375 #[test]
376 fn test_workflow_stats() {
377 let mut engine = WorkflowEngine::new();
378 engine.start_workflow(Some("test1".to_string()));
379 engine.start_workflow(Some("test2".to_string()));
380
381 let stats = engine.get_workflow_stats();
382 assert_eq!(stats.total_workflows, 2);
383 assert_eq!(stats.running_workflows, 2);
384 }
385}