1use crate::types::Value;
2use std::collections::HashMap;
3use std::time::{Duration, Instant};
4
5#[derive(Debug, Clone, PartialEq)]
7pub enum WorkflowStatus {
8 Running,
10 Completed,
12 Failed,
14 Paused,
16 Waiting,
18}
19
20#[derive(Debug, Clone)]
22pub struct ScheduledTask {
23 pub rule_name: String,
25 pub execute_at: Instant,
27 pub workflow_id: Option<String>,
29}
30
31#[derive(Debug, Clone)]
33pub struct WorkflowState {
34 pub workflow_id: String,
36 pub current_step: Option<String>,
38 pub completed_steps: Vec<String>,
40 pub workflow_data: HashMap<String, Value>,
42 pub status: WorkflowStatus,
44 pub started_at: Instant,
46 pub completed_at: Option<Instant>,
48}
49
50impl WorkflowState {
51 pub fn new(workflow_id: String) -> Self {
53 Self {
54 workflow_id,
55 current_step: None,
56 completed_steps: Vec::new(),
57 workflow_data: HashMap::new(),
58 status: WorkflowStatus::Running,
59 started_at: Instant::now(),
60 completed_at: None,
61 }
62 }
63
64 pub fn complete_step(&mut self, step: String) {
66 if let Some(current) = &self.current_step {
67 if current == &step {
68 self.completed_steps.push(step);
69 self.current_step = None;
70 }
71 }
72 }
73
74 pub fn set_current_step(&mut self, step: String) {
76 self.current_step = Some(step);
77 }
78
79 pub fn complete(&mut self) {
81 self.status = WorkflowStatus::Completed;
82 self.completed_at = Some(Instant::now());
83 self.current_step = None;
84 }
85
86 pub fn fail(&mut self) {
88 self.status = WorkflowStatus::Failed;
89 self.completed_at = Some(Instant::now());
90 self.current_step = None;
91 }
92
93 pub fn set_data(&mut self, key: String, value: Value) {
95 self.workflow_data.insert(key, value);
96 }
97
98 pub fn get_data(&self, key: &str) -> Option<&Value> {
100 self.workflow_data.get(key)
101 }
102
103 pub fn duration(&self) -> Duration {
105 match self.completed_at {
106 Some(end) => end.duration_since(self.started_at),
107 None => Instant::now().duration_since(self.started_at),
108 }
109 }
110}
111
112#[derive(Debug)]
114pub struct WorkflowEngine {
115 workflows: HashMap<String, WorkflowState>,
117 scheduled_tasks: Vec<ScheduledTask>,
119 agenda_activation_queue: Vec<String>,
121 workflow_counter: u64,
123}
124
125impl WorkflowEngine {
126 pub fn new() -> Self {
128 Self {
129 workflows: HashMap::new(),
130 scheduled_tasks: Vec::new(),
131 agenda_activation_queue: Vec::new(),
132 workflow_counter: 0,
133 }
134 }
135
136 pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
138 self.workflow_counter += 1;
139 let workflow_id =
140 workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
141
142 let workflow_state = WorkflowState::new(workflow_id.clone());
143 self.workflows.insert(workflow_id.clone(), workflow_state);
144
145 println!("๐ Started workflow: {}", workflow_id);
146 workflow_id
147 }
148
149 pub fn activate_agenda_group(&mut self, group: String) {
151 self.agenda_activation_queue.push(group.clone());
152 println!("๐ฏ Queued agenda group activation: {}", group);
153 }
154
155 pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
157 let task = ScheduledTask {
158 rule_name: rule_name.clone(),
159 execute_at: Instant::now() + Duration::from_millis(delay_ms),
160 workflow_id,
161 };
162
163 self.scheduled_tasks.push(task);
164 println!(
165 "โฐ Scheduled rule '{}' to execute in {}ms",
166 rule_name, delay_ms
167 );
168 }
169
170 pub fn complete_workflow(&mut self, workflow_name: String) {
172 if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
173 workflow.complete();
174 println!("โ
Completed workflow: {}", workflow_name);
175 }
176 }
177
178 pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
180 if let Some(workflow) = self.workflows.get_mut(workflow_id) {
181 workflow.set_data(key.clone(), value);
182 println!(
183 "๐พ Set workflow data: {} = {:?}",
184 key,
185 workflow.get_data(&key)
186 );
187 }
188 }
189
190 pub fn get_next_agenda_group(&mut self) -> Option<String> {
192 if !self.agenda_activation_queue.is_empty() {
193 Some(self.agenda_activation_queue.remove(0))
194 } else {
195 None
196 }
197 }
198
199 pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
201 let now = Instant::now();
202 let mut ready_tasks = Vec::new();
203
204 self.scheduled_tasks.retain(|task| {
205 if task.execute_at <= now {
206 ready_tasks.push(task.clone());
207 false } else {
209 true }
211 });
212
213 if !ready_tasks.is_empty() {
214 println!(
215 "โก {} scheduled tasks are ready for execution",
216 ready_tasks.len()
217 );
218 }
219
220 ready_tasks
221 }
222
223 pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
225 if !self.agenda_activation_queue.is_empty() {
226 Some(self.agenda_activation_queue.remove(0))
227 } else {
228 None
229 }
230 }
231
232 pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
234 self.workflows.get(workflow_id)
235 }
236
237 pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
239 self.workflows
240 .values()
241 .filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
242 .collect()
243 }
244
245 pub fn get_workflow_stats(&self) -> WorkflowStats {
247 let total = self.workflows.len();
248 let running = self
249 .workflows
250 .values()
251 .filter(|w| w.status == WorkflowStatus::Running)
252 .count();
253 let completed = self
254 .workflows
255 .values()
256 .filter(|w| w.status == WorkflowStatus::Completed)
257 .count();
258 let failed = self
259 .workflows
260 .values()
261 .filter(|w| w.status == WorkflowStatus::Failed)
262 .count();
263 let scheduled_tasks = self.scheduled_tasks.len();
264
265 WorkflowStats {
266 total_workflows: total,
267 running_workflows: running,
268 completed_workflows: completed,
269 failed_workflows: failed,
270 pending_scheduled_tasks: scheduled_tasks,
271 pending_agenda_activations: self.agenda_activation_queue.len(),
272 }
273 }
274
275 pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
277 let cutoff = Instant::now() - older_than;
278 let initial_count = self.workflows.len();
279
280 self.workflows.retain(|_, workflow| {
281 if workflow.status == WorkflowStatus::Completed
282 || workflow.status == WorkflowStatus::Failed
283 {
284 if let Some(completed_at) = workflow.completed_at {
285 completed_at > cutoff
286 } else {
287 true }
289 } else {
290 true }
292 });
293
294 let cleaned = initial_count - self.workflows.len();
295 if cleaned > 0 {
296 println!("๐งน Cleaned up {} completed workflows", cleaned);
297 }
298 }
299}
300
301impl Default for WorkflowEngine {
302 fn default() -> Self {
303 Self::new()
304 }
305}
306
307#[derive(Debug, Clone)]
309pub struct WorkflowStats {
310 pub total_workflows: usize,
312 pub running_workflows: usize,
314 pub completed_workflows: usize,
316 pub failed_workflows: usize,
318 pub pending_scheduled_tasks: usize,
320 pub pending_agenda_activations: usize,
322}
323
324#[derive(Debug, Clone)]
326pub struct WorkflowResult {
327 pub success: bool,
329 pub steps_executed: usize,
331 pub execution_time: Duration,
333 pub final_status: WorkflowStatus,
335 pub error_message: Option<String>,
337}
338
339impl WorkflowResult {
340 pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
342 Self {
343 success: true,
344 steps_executed,
345 execution_time,
346 final_status: WorkflowStatus::Completed,
347 error_message: None,
348 }
349 }
350
351 pub fn failure(error_message: String) -> Self {
353 Self {
354 success: false,
355 steps_executed: 0,
356 execution_time: Duration::from_millis(0),
357 final_status: WorkflowStatus::Failed,
358 error_message: Some(error_message),
359 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn test_workflow_state_creation() {
369 let workflow = WorkflowState::new("test_workflow".to_string());
370 assert_eq!(workflow.workflow_id, "test_workflow");
371 assert_eq!(workflow.status, WorkflowStatus::Running);
372 assert!(workflow.current_step.is_none());
373 assert!(workflow.completed_steps.is_empty());
374 }
375
376 #[test]
377 fn test_workflow_engine_creation() {
378 let engine = WorkflowEngine::new();
379 assert_eq!(engine.workflows.len(), 0);
380 assert_eq!(engine.scheduled_tasks.len(), 0);
381 }
382
383 #[test]
384 fn test_start_workflow() {
385 let mut engine = WorkflowEngine::new();
386 let workflow_id = engine.start_workflow(Some("test".to_string()));
387 assert_eq!(workflow_id, "test");
388 assert!(engine.get_workflow("test").is_some());
389 }
390
391 #[test]
392 fn test_schedule_rule() {
393 let mut engine = WorkflowEngine::new();
394 engine.schedule_rule("test_rule".to_string(), 1000, None);
395 assert_eq!(engine.scheduled_tasks.len(), 1);
396 }
397
398 #[test]
399 fn test_workflow_stats() {
400 let mut engine = WorkflowEngine::new();
401 engine.start_workflow(Some("test1".to_string()));
402 engine.start_workflow(Some("test2".to_string()));
403
404 let stats = engine.get_workflow_stats();
405 assert_eq!(stats.total_workflows, 2);
406 assert_eq!(stats.running_workflows, 2);
407 }
408}