mod execution;
mod graph;
mod node;
pub use execution::{ExecutionContext, ExecutionResult, WorkflowExecutor};
pub use graph::{Workflow, WorkflowBuilder, WorkflowError};
pub use node::{NodeId, NodeStatus, NodeType, WorkflowNode};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct WorkflowRegistry {
workflows: Arc<RwLock<HashMap<String, Workflow>>>,
executions: Arc<RwLock<HashMap<String, WorkflowExecution>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowExecution {
pub id: String,
pub workflow_id: String,
pub status: WorkflowExecutionStatus,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub node_states: HashMap<String, NodeExecutionState>,
pub variables: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum WorkflowExecutionStatus {
Pending,
Running,
Paused,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeExecutionState {
pub node_id: String,
pub status: NodeStatus,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub output: Option<serde_json::Value>,
pub error: Option<String>,
pub retry_count: u32,
}
impl WorkflowRegistry {
pub fn new() -> Self {
Self {
workflows: Arc::new(RwLock::new(HashMap::new())),
executions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register(&self, workflow: Workflow) -> Result<(), WorkflowError> {
let mut workflows = self.workflows.write().await;
if workflows.contains_key(&workflow.id) {
return Err(WorkflowError::DuplicateId(workflow.id));
}
workflows.insert(workflow.id.clone(), workflow);
Ok(())
}
pub async fn get(&self, id: &str) -> Option<Workflow> {
let workflows = self.workflows.read().await;
workflows.get(id).cloned()
}
pub async fn remove(&self, id: &str) -> Option<Workflow> {
let mut workflows = self.workflows.write().await;
workflows.remove(id)
}
pub async fn list(&self) -> Vec<String> {
let workflows = self.workflows.read().await;
workflows.keys().cloned().collect()
}
pub async fn execute(&self, workflow_id: &str) -> Result<String, WorkflowError> {
let workflow = self
.get(workflow_id)
.await
.ok_or_else(|| WorkflowError::NotFound(workflow_id.to_string()))?;
let execution_id = uuid::Uuid::new_v4().to_string();
let now = Utc::now();
let mut node_states = HashMap::new();
for node in &workflow.nodes {
node_states.insert(
node.id.0.clone(),
NodeExecutionState {
node_id: node.id.0.clone(),
status: NodeStatus::Pending,
started_at: None,
completed_at: None,
output: None,
error: None,
retry_count: 0,
},
);
}
let execution = WorkflowExecution {
id: execution_id.clone(),
workflow_id: workflow_id.to_string(),
status: WorkflowExecutionStatus::Pending,
started_at: now,
completed_at: None,
node_states,
variables: HashMap::new(),
};
let mut executions = self.executions.write().await;
executions.insert(execution_id.clone(), execution);
Ok(execution_id)
}
pub async fn get_execution(&self, execution_id: &str) -> Option<WorkflowExecution> {
let executions = self.executions.read().await;
executions.get(execution_id).cloned()
}
pub async fn update_execution(
&self,
execution: WorkflowExecution,
) -> Result<(), WorkflowError> {
let mut executions = self.executions.write().await;
if !executions.contains_key(&execution.id) {
return Err(WorkflowError::NotFound(execution.id));
}
executions.insert(execution.id.clone(), execution);
Ok(())
}
pub async fn cancel_execution(&self, execution_id: &str) -> Result<(), WorkflowError> {
let mut executions = self.executions.write().await;
if let Some(execution) = executions.get_mut(execution_id) {
execution.status = WorkflowExecutionStatus::Cancelled;
execution.completed_at = Some(Utc::now());
Ok(())
} else {
Err(WorkflowError::NotFound(execution_id.to_string()))
}
}
pub async fn workflow_count(&self) -> usize {
let workflows = self.workflows.read().await;
workflows.len()
}
pub async fn active_execution_count(&self) -> usize {
let executions = self.executions.read().await;
executions
.values()
.filter(|e| e.status == WorkflowExecutionStatus::Running)
.count()
}
}
impl Default for WorkflowRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_registry_creation() {
let registry = WorkflowRegistry::new();
assert_eq!(registry.workflow_count().await, 0);
}
#[tokio::test]
async fn test_register_workflow() {
let registry = WorkflowRegistry::new();
let workflow = WorkflowBuilder::new("test-workflow")
.name("Test Workflow")
.build()
.unwrap();
registry.register(workflow).await.unwrap();
assert_eq!(registry.workflow_count().await, 1);
}
#[tokio::test]
async fn test_duplicate_registration() {
let registry = WorkflowRegistry::new();
let workflow1 = WorkflowBuilder::new("test-workflow")
.name("Test 1")
.build()
.unwrap();
let workflow2 = WorkflowBuilder::new("test-workflow")
.name("Test 2")
.build()
.unwrap();
registry.register(workflow1).await.unwrap();
let result = registry.register(workflow2).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_workflow() {
let registry = WorkflowRegistry::new();
let workflow = WorkflowBuilder::new("my-workflow")
.name("My Workflow")
.build()
.unwrap();
registry.register(workflow).await.unwrap();
let retrieved = registry.get("my-workflow").await;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().name, "My Workflow");
let missing = registry.get("nonexistent").await;
assert!(missing.is_none());
}
#[tokio::test]
async fn test_remove_workflow() {
let registry = WorkflowRegistry::new();
let workflow = WorkflowBuilder::new("to-remove")
.name("To Remove")
.build()
.unwrap();
registry.register(workflow).await.unwrap();
assert_eq!(registry.workflow_count().await, 1);
let removed = registry.remove("to-remove").await;
assert!(removed.is_some());
assert_eq!(registry.workflow_count().await, 0);
}
#[tokio::test]
async fn test_list_workflows() {
let registry = WorkflowRegistry::new();
for i in 0..3 {
let workflow = WorkflowBuilder::new(&format!("workflow-{}", i))
.name(&format!("Workflow {}", i))
.build()
.unwrap();
registry.register(workflow).await.unwrap();
}
let list = registry.list().await;
assert_eq!(list.len(), 3);
}
#[tokio::test]
async fn test_execute_workflow() {
let registry = WorkflowRegistry::new();
let workflow = WorkflowBuilder::new("exec-test")
.name("Execution Test")
.build()
.unwrap();
registry.register(workflow).await.unwrap();
let exec_id = registry.execute("exec-test").await.unwrap();
assert!(!exec_id.is_empty());
let execution = registry.get_execution(&exec_id).await;
assert!(execution.is_some());
assert_eq!(execution.unwrap().status, WorkflowExecutionStatus::Pending);
}
#[tokio::test]
async fn test_execute_nonexistent() {
let registry = WorkflowRegistry::new();
let result = registry.execute("nonexistent").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_cancel_execution() {
let registry = WorkflowRegistry::new();
let workflow = WorkflowBuilder::new("cancel-test")
.name("Cancel Test")
.build()
.unwrap();
registry.register(workflow).await.unwrap();
let exec_id = registry.execute("cancel-test").await.unwrap();
registry.cancel_execution(&exec_id).await.unwrap();
let execution = registry.get_execution(&exec_id).await.unwrap();
assert_eq!(execution.status, WorkflowExecutionStatus::Cancelled);
assert!(execution.completed_at.is_some());
}
}