rust-viewflow 0.1.0

Rust workflow library inspired by Viewflow, compatible with Axum and Actix-web
Documentation
use crate::core::{TaskState, WorkflowResult, WorkflowState};
use crate::db::WorkflowDatabase;
use chrono;
use serde_json;
use sqlx::Row;

#[async_trait::async_trait]
impl WorkflowDatabase for super::SqliteDatabase {
    async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
        sqlx::query(
            r#"
            INSERT INTO workflows (id, name, status, data, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?)
        "#,
        )
        .bind(&workflow.id)
        .bind(&workflow.name)
        .bind(serde_json::to_string(&workflow.status)?)
        .bind(serde_json::to_string(&workflow.data)?)
        .bind(workflow.created_at.to_rfc3339())
        .bind(workflow.updated_at.to_rfc3339())
        .execute(&self.pool)
        .await?;
        Ok(())
    }

    async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
        let row = sqlx::query(
            r#"
            SELECT id, name, status, data, created_at, updated_at
            FROM workflows
            WHERE id = ?
        "#,
        )
        .bind(workflow_id)
        .fetch_one(&self.pool)
        .await?;

        Ok(WorkflowState {
            id: row.get(0),
            name: row.get(1),
            status: serde_json::from_str(&row.get::<String, _>(2))?,
            data: serde_json::from_str(&row.get::<String, _>(3))?,
            created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(4))?.into(),
            updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(5))?.into(),
        })
    }

    async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
        sqlx::query(
            r#"
            UPDATE workflows
            SET name = ?, status = ?, data = ?, updated_at = ?
            WHERE id = ?
        "#,
        )
        .bind(&workflow.name)
        .bind(serde_json::to_string(&workflow.status)?)
        .bind(serde_json::to_string(&workflow.data)?)
        .bind(workflow.updated_at.to_rfc3339())
        .bind(&workflow.id)
        .execute(&self.pool)
        .await?;
        Ok(())
    }

    async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
        let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());

        sqlx::query(r#"
            INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        "#)
        .bind(&task.id)
        .bind(&task.workflow_id)
        .bind(&task.name)
        .bind(serde_json::to_string(&task.status)?)
        .bind(&task.assignee)
        .bind(serde_json::to_string(&task.data)?)
        .bind(task.created_at.to_rfc3339())
        .bind(task.updated_at.to_rfc3339())
        .bind(completed_at_str)
        .execute(&self.pool)
        .await?;
        Ok(())
    }

    async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
        let row = sqlx::query(r#"
            SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
            FROM tasks
            WHERE id = ?
        "#)
        .bind(task_id)
        .fetch_one(&self.pool)
        .await?;

        let completed_at = row
            .get::<Option<String>, _>(8)
            .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
            .map(|dt| dt.into());

        Ok(TaskState {
            id: row.get(0),
            workflow_id: row.get(1),
            name: row.get(2),
            status: serde_json::from_str(&row.get::<String, _>(3))?,
            assignee: row.get(4),
            data: serde_json::from_str(&row.get::<String, _>(5))?,
            created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
            updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
            completed_at,
        })
    }

    async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
        let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());

        sqlx::query(
            r#"
            UPDATE tasks
            SET name = ?, status = ?, assignee = ?, data = ?, updated_at = ?, completed_at = ?
            WHERE id = ? AND workflow_id = ?
        "#,
        )
        .bind(&task.name)
        .bind(serde_json::to_string(&task.status)?)
        .bind(&task.assignee)
        .bind(serde_json::to_string(&task.data)?)
        .bind(task.updated_at.to_rfc3339())
        .bind(completed_at_str)
        .bind(&task.id)
        .bind(&task.workflow_id)
        .execute(&self.pool)
        .await?;
        Ok(())
    }

    async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
        let rows = sqlx::query(r#"
            SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
            FROM tasks
            WHERE workflow_id = ?
        "#)
        .bind(workflow_id)
        .fetch_all(&self.pool)
        .await?;

        let mut tasks = Vec::with_capacity(rows.len());
        for row in rows {
            let completed_at = row
                .get::<Option<String>, _>(8)
                .map(|s| chrono::DateTime::parse_from_rfc3339(&s))
                .transpose()?
                .map(Into::into);

            tasks.push(TaskState {
                id: row.get(0),
                workflow_id: row.get(1),
                name: row.get(2),
                status: serde_json::from_str(&row.get::<String, _>(3))?,
                assignee: row.get(4),
                data: serde_json::from_str(&row.get::<String, _>(5))?,
                created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
                updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
                completed_at,
            });
        }

        Ok(tasks)
    }
}