rust-viewflow 0.1.0

Rust workflow library inspired by Viewflow, compatible with Axum and Actix-web
Documentation
use async_trait::async_trait;
use chrono;
use serde_json;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use uuid::Uuid;

use crate::core::{
    TaskState, TaskStatus, WorkflowDefinition, WorkflowEngine, WorkflowResult, WorkflowState,
};
use crate::db::WorkflowDatabase;

/// Default workflow engine implementation
pub struct DefaultWorkflowEngine {
    database: Arc<dyn WorkflowDatabase>,
    workflows: RwLock<HashMap<String, Arc<dyn WorkflowDefinition>>>,
}

impl DefaultWorkflowEngine {
    pub fn new(database: Arc<dyn WorkflowDatabase>) -> Self {
        Self {
            database,
            workflows: RwLock::new(HashMap::new()),
        }
    }

    pub fn register_workflow(&self, workflow: Arc<dyn WorkflowDefinition>) {
        self.workflows
            .write()
            .expect("workflows registry lock poisoned")
            .insert(workflow.name().to_string(), workflow);
    }
}

#[async_trait]
impl WorkflowEngine for DefaultWorkflowEngine {
    async fn create_workflow(
        &self,
        definition_name: &str,
        data: serde_json::Value,
    ) -> WorkflowResult<WorkflowState> {
        let workflow_def = self
            .workflows
            .read()
            .expect("workflows registry lock poisoned")
            .get(definition_name)
            .cloned()
            .ok_or_else(|| format!("Workflow definition '{}' not found", definition_name))?;

        let workflow = workflow_def.start(data).await?;

        Ok(workflow)
    }

    async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
        self.database.get_workflow(workflow_id).await
    }

    async fn update_workflow(
        &self,
        workflow_id: &str,
        data: serde_json::Value,
    ) -> WorkflowResult<WorkflowState> {
        let mut workflow = self.database.get_workflow(workflow_id).await?;
        workflow.data = data;
        workflow.updated_at = chrono::Utc::now();

        self.database.update_workflow(&workflow).await?;

        Ok(workflow)
    }

    async fn complete_task(
        &self,
        task_id: &str,
        data: serde_json::Value,
    ) -> WorkflowResult<TaskState> {
        let task = self.database.get_task(task_id).await?;
        let workflow = self.database.get_workflow(&task.workflow_id).await?;

        let workflow_def = self
            .workflows
            .read()
            .expect("workflows registry lock poisoned")
            .get(&workflow.name)
            .cloned();

        if let Some(definition) = workflow_def {
            return definition.execute_task(task_id, data).await;
        }

        let mut fallback_task = task;
        fallback_task.data = data;
        fallback_task.status = TaskStatus::Completed;
        fallback_task.completed_at = Some(chrono::Utc::now());
        fallback_task.updated_at = chrono::Utc::now();

        self.database.update_task(&fallback_task).await?;

        Ok(fallback_task)
    }

    async fn get_tasks(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
        self.database.get_tasks_by_workflow(workflow_id).await
    }
}

/// Helper function to generate unique IDs
pub fn generate_id() -> String {
    Uuid::new_v4().to_string()
}