1use crate::errors::Result;
2use crate::types::Value;
3use chrono::{DateTime, Utc};
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7#[derive(Debug, Clone, PartialEq)]
9pub enum WorkflowStatus {
10 Running,
12 Completed,
14 Failed,
16 Paused,
18 Waiting,
20}
21
22#[derive(Debug, Clone)]
24pub struct ScheduledTask {
25 pub rule_name: String,
27 pub execute_at: Instant,
29 pub workflow_id: Option<String>,
31}
32
33#[derive(Debug, Clone)]
35pub struct WorkflowState {
36 pub workflow_id: String,
38 pub current_step: Option<String>,
40 pub completed_steps: Vec<String>,
42 pub workflow_data: HashMap<String, Value>,
44 pub status: WorkflowStatus,
46 pub started_at: Instant,
48 pub completed_at: Option<Instant>,
50}
51
52impl WorkflowState {
53 pub fn new(workflow_id: String) -> Self {
55 Self {
56 workflow_id,
57 current_step: None,
58 completed_steps: Vec::new(),
59 workflow_data: HashMap::new(),
60 status: WorkflowStatus::Running,
61 started_at: Instant::now(),
62 completed_at: None,
63 }
64 }
65
66 pub fn complete_step(&mut self, step: String) {
68 if let Some(current) = &self.current_step {
69 if current == &step {
70 self.completed_steps.push(step);
71 self.current_step = None;
72 }
73 }
74 }
75
76 pub fn set_current_step(&mut self, step: String) {
78 self.current_step = Some(step);
79 }
80
81 pub fn complete(&mut self) {
83 self.status = WorkflowStatus::Completed;
84 self.completed_at = Some(Instant::now());
85 self.current_step = None;
86 }
87
88 pub fn fail(&mut self) {
90 self.status = WorkflowStatus::Failed;
91 self.completed_at = Some(Instant::now());
92 self.current_step = None;
93 }
94
95 pub fn set_data(&mut self, key: String, value: Value) {
97 self.workflow_data.insert(key, value);
98 }
99
100 pub fn get_data(&self, key: &str) -> Option<&Value> {
102 self.workflow_data.get(key)
103 }
104
105 pub fn duration(&self) -> Duration {
107 match self.completed_at {
108 Some(end) => end.duration_since(self.started_at),
109 None => Instant::now().duration_since(self.started_at),
110 }
111 }
112}
113
114#[derive(Debug)]
116pub struct WorkflowEngine {
117 workflows: HashMap<String, WorkflowState>,
119 scheduled_tasks: Vec<ScheduledTask>,
121 agenda_activation_queue: Vec<String>,
123 workflow_counter: u64,
125}
126
127impl WorkflowEngine {
128 pub fn new() -> Self {
130 Self {
131 workflows: HashMap::new(),
132 scheduled_tasks: Vec::new(),
133 agenda_activation_queue: Vec::new(),
134 workflow_counter: 0,
135 }
136 }
137
138 pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
140 self.workflow_counter += 1;
141 let workflow_id =
142 workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
143
144 let workflow_state = WorkflowState::new(workflow_id.clone());
145 self.workflows.insert(workflow_id.clone(), workflow_state);
146
147 println!("๐ Started workflow: {}", workflow_id);
148 workflow_id
149 }
150
151 pub fn activate_agenda_group(&mut self, group: String) {
153 self.agenda_activation_queue.push(group.clone());
154 println!("๐ฏ Queued agenda group activation: {}", group);
155 }
156
157 pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
159 let task = ScheduledTask {
160 rule_name: rule_name.clone(),
161 execute_at: Instant::now() + Duration::from_millis(delay_ms),
162 workflow_id,
163 };
164
165 self.scheduled_tasks.push(task);
166 println!(
167 "โฐ Scheduled rule '{}' to execute in {}ms",
168 rule_name, delay_ms
169 );
170 }
171
172 pub fn complete_workflow(&mut self, workflow_name: String) {
174 if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
175 workflow.complete();
176 println!("โ
Completed workflow: {}", workflow_name);
177 }
178 }
179
180 pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
182 if let Some(workflow) = self.workflows.get_mut(workflow_id) {
183 workflow.set_data(key.clone(), value);
184 println!(
185 "๐พ Set workflow data: {} = {:?}",
186 key,
187 workflow.get_data(&key)
188 );
189 }
190 }
191
192 pub fn get_next_agenda_group(&mut self) -> Option<String> {
194 if !self.agenda_activation_queue.is_empty() {
195 Some(self.agenda_activation_queue.remove(0))
196 } else {
197 None
198 }
199 }
200
201 pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
203 let now = Instant::now();
204 let mut ready_tasks = Vec::new();
205
206 self.scheduled_tasks.retain(|task| {
207 if task.execute_at <= now {
208 ready_tasks.push(task.clone());
209 false } else {
211 true }
213 });
214
215 if !ready_tasks.is_empty() {
216 println!(
217 "โก {} scheduled tasks are ready for execution",
218 ready_tasks.len()
219 );
220 }
221
222 ready_tasks
223 }
224
225 pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
227 if !self.agenda_activation_queue.is_empty() {
228 Some(self.agenda_activation_queue.remove(0))
229 } else {
230 None
231 }
232 }
233
234 pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
236 self.workflows.get(workflow_id)
237 }
238
239 pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
241 self.workflows
242 .values()
243 .filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
244 .collect()
245 }
246
247 pub fn get_workflow_stats(&self) -> WorkflowStats {
249 let total = self.workflows.len();
250 let running = self
251 .workflows
252 .values()
253 .filter(|w| w.status == WorkflowStatus::Running)
254 .count();
255 let completed = self
256 .workflows
257 .values()
258 .filter(|w| w.status == WorkflowStatus::Completed)
259 .count();
260 let failed = self
261 .workflows
262 .values()
263 .filter(|w| w.status == WorkflowStatus::Failed)
264 .count();
265 let scheduled_tasks = self.scheduled_tasks.len();
266
267 WorkflowStats {
268 total_workflows: total,
269 running_workflows: running,
270 completed_workflows: completed,
271 failed_workflows: failed,
272 pending_scheduled_tasks: scheduled_tasks,
273 pending_agenda_activations: self.agenda_activation_queue.len(),
274 }
275 }
276
277 pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
279 let cutoff = Instant::now() - older_than;
280 let initial_count = self.workflows.len();
281
282 self.workflows.retain(|_, workflow| {
283 if workflow.status == WorkflowStatus::Completed
284 || workflow.status == WorkflowStatus::Failed
285 {
286 if let Some(completed_at) = workflow.completed_at {
287 completed_at > cutoff
288 } else {
289 true }
291 } else {
292 true }
294 });
295
296 let cleaned = initial_count - self.workflows.len();
297 if cleaned > 0 {
298 println!("๐งน Cleaned up {} completed workflows", cleaned);
299 }
300 }
301}
302
303impl Default for WorkflowEngine {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309#[derive(Debug, Clone)]
311pub struct WorkflowStats {
312 pub total_workflows: usize,
314 pub running_workflows: usize,
316 pub completed_workflows: usize,
318 pub failed_workflows: usize,
320 pub pending_scheduled_tasks: usize,
322 pub pending_agenda_activations: usize,
324}
325
326#[derive(Debug, Clone)]
328pub struct WorkflowResult {
329 pub success: bool,
331 pub steps_executed: usize,
333 pub execution_time: Duration,
335 pub final_status: WorkflowStatus,
337 pub error_message: Option<String>,
339}
340
341impl WorkflowResult {
342 pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
344 Self {
345 success: true,
346 steps_executed,
347 execution_time,
348 final_status: WorkflowStatus::Completed,
349 error_message: None,
350 }
351 }
352
353 pub fn failure(error_message: String) -> Self {
355 Self {
356 success: false,
357 steps_executed: 0,
358 execution_time: Duration::from_millis(0),
359 final_status: WorkflowStatus::Failed,
360 error_message: Some(error_message),
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn test_workflow_state_creation() {
371 let workflow = WorkflowState::new("test_workflow".to_string());
372 assert_eq!(workflow.workflow_id, "test_workflow");
373 assert_eq!(workflow.status, WorkflowStatus::Running);
374 assert!(workflow.current_step.is_none());
375 assert!(workflow.completed_steps.is_empty());
376 }
377
378 #[test]
379 fn test_workflow_engine_creation() {
380 let engine = WorkflowEngine::new();
381 assert_eq!(engine.workflows.len(), 0);
382 assert_eq!(engine.scheduled_tasks.len(), 0);
383 }
384
385 #[test]
386 fn test_start_workflow() {
387 let mut engine = WorkflowEngine::new();
388 let workflow_id = engine.start_workflow(Some("test".to_string()));
389 assert_eq!(workflow_id, "test");
390 assert!(engine.get_workflow("test").is_some());
391 }
392
393 #[test]
394 fn test_schedule_rule() {
395 let mut engine = WorkflowEngine::new();
396 engine.schedule_rule("test_rule".to_string(), 1000, None);
397 assert_eq!(engine.scheduled_tasks.len(), 1);
398 }
399
400 #[test]
401 fn test_workflow_stats() {
402 let mut engine = WorkflowEngine::new();
403 engine.start_workflow(Some("test1".to_string()));
404 engine.start_workflow(Some("test2".to_string()));
405
406 let stats = engine.get_workflow_stats();
407 assert_eq!(stats.total_workflows, 2);
408 assert_eq!(stats.running_workflows, 2);
409 }
410}