use crate::types::Value;
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq)]
pub enum WorkflowStatus {
Running,
Completed,
Failed,
Paused,
Waiting,
}
#[derive(Debug, Clone)]
pub struct ScheduledTask {
pub rule_name: String,
pub execute_at: Instant,
pub workflow_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct WorkflowState {
pub workflow_id: String,
pub current_step: Option<String>,
pub completed_steps: Vec<String>,
pub workflow_data: HashMap<String, Value>,
pub status: WorkflowStatus,
pub started_at: Instant,
pub completed_at: Option<Instant>,
}
impl WorkflowState {
pub fn new(workflow_id: String) -> Self {
Self {
workflow_id,
current_step: None,
completed_steps: Vec::new(),
workflow_data: HashMap::new(),
status: WorkflowStatus::Running,
started_at: Instant::now(),
completed_at: None,
}
}
pub fn complete_step(&mut self, step: String) {
if let Some(current) = &self.current_step {
if current == &step {
self.completed_steps.push(step);
self.current_step = None;
}
}
}
pub fn set_current_step(&mut self, step: String) {
self.current_step = Some(step);
}
pub fn complete(&mut self) {
self.status = WorkflowStatus::Completed;
self.completed_at = Some(Instant::now());
self.current_step = None;
}
pub fn fail(&mut self) {
self.status = WorkflowStatus::Failed;
self.completed_at = Some(Instant::now());
self.current_step = None;
}
pub fn set_data(&mut self, key: String, value: Value) {
self.workflow_data.insert(key, value);
}
pub fn get_data(&self, key: &str) -> Option<&Value> {
self.workflow_data.get(key)
}
pub fn duration(&self) -> Duration {
match self.completed_at {
Some(end) => end.duration_since(self.started_at),
None => Instant::now().duration_since(self.started_at),
}
}
}
#[derive(Debug)]
pub struct WorkflowEngine {
workflows: HashMap<String, WorkflowState>,
scheduled_tasks: Vec<ScheduledTask>,
agenda_activation_queue: Vec<String>,
workflow_counter: u64,
}
impl WorkflowEngine {
pub fn new() -> Self {
Self {
workflows: HashMap::new(),
scheduled_tasks: Vec::new(),
agenda_activation_queue: Vec::new(),
workflow_counter: 0,
}
}
pub fn start_workflow(&mut self, workflow_name: Option<String>) -> String {
self.workflow_counter += 1;
let workflow_id =
workflow_name.unwrap_or_else(|| format!("workflow_{}", self.workflow_counter));
let workflow_state = WorkflowState::new(workflow_id.clone());
self.workflows.insert(workflow_id.clone(), workflow_state);
println!("🔄 Started workflow: {}", workflow_id);
workflow_id
}
pub fn activate_agenda_group(&mut self, group: String) {
self.agenda_activation_queue.push(group.clone());
println!("🎯 Queued agenda group activation: {}", group);
}
pub fn schedule_rule(&mut self, rule_name: String, delay_ms: u64, workflow_id: Option<String>) {
let task = ScheduledTask {
rule_name: rule_name.clone(),
execute_at: Instant::now() + Duration::from_millis(delay_ms),
workflow_id,
};
self.scheduled_tasks.push(task);
println!(
"⏰ Scheduled rule '{}' to execute in {}ms",
rule_name, delay_ms
);
}
pub fn complete_workflow(&mut self, workflow_name: String) {
if let Some(workflow) = self.workflows.get_mut(&workflow_name) {
workflow.complete();
println!("✅ Completed workflow: {}", workflow_name);
}
}
pub fn set_workflow_data(&mut self, workflow_id: &str, key: String, value: Value) {
if let Some(workflow) = self.workflows.get_mut(workflow_id) {
workflow.set_data(key.clone(), value);
println!(
"💾 Set workflow data: {} = {:?}",
key,
workflow.get_data(&key)
);
}
}
pub fn get_next_agenda_group(&mut self) -> Option<String> {
if !self.agenda_activation_queue.is_empty() {
Some(self.agenda_activation_queue.remove(0))
} else {
None
}
}
pub fn get_ready_tasks(&mut self) -> Vec<ScheduledTask> {
let now = Instant::now();
let mut ready_tasks = Vec::new();
self.scheduled_tasks.retain(|task| {
if task.execute_at <= now {
ready_tasks.push(task.clone());
false } else {
true }
});
if !ready_tasks.is_empty() {
println!(
"⚡ {} scheduled tasks are ready for execution",
ready_tasks.len()
);
}
ready_tasks
}
pub fn get_next_pending_agenda_activation(&mut self) -> Option<String> {
if !self.agenda_activation_queue.is_empty() {
Some(self.agenda_activation_queue.remove(0))
} else {
None
}
}
pub fn get_workflow(&self, workflow_id: &str) -> Option<&WorkflowState> {
self.workflows.get(workflow_id)
}
pub fn get_active_workflows(&self) -> Vec<&WorkflowState> {
self.workflows
.values()
.filter(|w| w.status == WorkflowStatus::Running || w.status == WorkflowStatus::Waiting)
.collect()
}
pub fn get_workflow_stats(&self) -> WorkflowStats {
let total = self.workflows.len();
let running = self
.workflows
.values()
.filter(|w| w.status == WorkflowStatus::Running)
.count();
let completed = self
.workflows
.values()
.filter(|w| w.status == WorkflowStatus::Completed)
.count();
let failed = self
.workflows
.values()
.filter(|w| w.status == WorkflowStatus::Failed)
.count();
let scheduled_tasks = self.scheduled_tasks.len();
WorkflowStats {
total_workflows: total,
running_workflows: running,
completed_workflows: completed,
failed_workflows: failed,
pending_scheduled_tasks: scheduled_tasks,
pending_agenda_activations: self.agenda_activation_queue.len(),
}
}
pub fn cleanup_completed_workflows(&mut self, older_than: Duration) {
let cutoff = Instant::now() - older_than;
let initial_count = self.workflows.len();
self.workflows.retain(|_, workflow| {
if workflow.status == WorkflowStatus::Completed
|| workflow.status == WorkflowStatus::Failed
{
if let Some(completed_at) = workflow.completed_at {
completed_at > cutoff
} else {
true }
} else {
true }
});
let cleaned = initial_count - self.workflows.len();
if cleaned > 0 {
println!("🧹 Cleaned up {} completed workflows", cleaned);
}
}
}
impl Default for WorkflowEngine {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct WorkflowStats {
pub total_workflows: usize,
pub running_workflows: usize,
pub completed_workflows: usize,
pub failed_workflows: usize,
pub pending_scheduled_tasks: usize,
pub pending_agenda_activations: usize,
}
#[derive(Debug, Clone)]
pub struct WorkflowResult {
pub success: bool,
pub steps_executed: usize,
pub execution_time: Duration,
pub final_status: WorkflowStatus,
pub error_message: Option<String>,
}
impl WorkflowResult {
pub fn success(steps_executed: usize, execution_time: Duration) -> Self {
Self {
success: true,
steps_executed,
execution_time,
final_status: WorkflowStatus::Completed,
error_message: None,
}
}
pub fn failure(error_message: String) -> Self {
Self {
success: false,
steps_executed: 0,
execution_time: Duration::from_millis(0),
final_status: WorkflowStatus::Failed,
error_message: Some(error_message),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workflow_state_creation() {
let workflow = WorkflowState::new("test_workflow".to_string());
assert_eq!(workflow.workflow_id, "test_workflow");
assert_eq!(workflow.status, WorkflowStatus::Running);
assert!(workflow.current_step.is_none());
assert!(workflow.completed_steps.is_empty());
}
#[test]
fn test_workflow_engine_creation() {
let engine = WorkflowEngine::new();
assert_eq!(engine.workflows.len(), 0);
assert_eq!(engine.scheduled_tasks.len(), 0);
}
#[test]
fn test_start_workflow() {
let mut engine = WorkflowEngine::new();
let workflow_id = engine.start_workflow(Some("test".to_string()));
assert_eq!(workflow_id, "test");
assert!(engine.get_workflow("test").is_some());
}
#[test]
fn test_schedule_rule() {
let mut engine = WorkflowEngine::new();
engine.schedule_rule("test_rule".to_string(), 1000, None);
assert_eq!(engine.scheduled_tasks.len(), 1);
}
#[test]
fn test_workflow_stats() {
let mut engine = WorkflowEngine::new();
engine.start_workflow(Some("test1".to_string()));
engine.start_workflow(Some("test2".to_string()));
let stats = engine.get_workflow_stats();
assert_eq!(stats.total_workflows, 2);
assert_eq!(stats.running_workflows, 2);
}
}