Skip to main content

rust_viewflow/db/
mysql.rs

1use crate::core::{TaskState, WorkflowResult, WorkflowState};
2use crate::db::WorkflowDatabase;
3use serde_json;
4use sqlx::Row;
5
6#[async_trait::async_trait]
7impl WorkflowDatabase for super::MySqlDatabase {
8    async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
9        sqlx::query(
10            r#"
11            INSERT INTO workflows (id, name, status, data, created_at, updated_at)
12            VALUES (?, ?, ?, ?, ?, ?)
13        "#,
14        )
15        .bind(&workflow.id)
16        .bind(&workflow.name)
17        .bind(serde_json::to_string(&workflow.status)?)
18        .bind(&workflow.data)
19        .bind(&workflow.created_at)
20        .bind(&workflow.updated_at)
21        .execute(&self.pool)
22        .await?;
23        Ok(())
24    }
25
26    async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
27        let row = sqlx::query(
28            r#"
29            SELECT id, name, status, data, created_at, updated_at
30            FROM workflows
31            WHERE id = ?
32        "#,
33        )
34        .bind(workflow_id)
35        .fetch_one(&self.pool)
36        .await?;
37
38        Ok(WorkflowState {
39            id: row.get(0),
40            name: row.get(1),
41            status: serde_json::from_str(&row.get::<String, _>(2))?,
42            data: row.get(3),
43            created_at: row.get(4),
44            updated_at: row.get(5),
45        })
46    }
47
48    async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
49        sqlx::query(
50            r#"
51            UPDATE workflows
52            SET name = ?, status = ?, data = ?, updated_at = ?
53            WHERE id = ?
54        "#,
55        )
56        .bind(&workflow.name)
57        .bind(serde_json::to_string(&workflow.status)?)
58        .bind(&workflow.data)
59        .bind(&workflow.updated_at)
60        .bind(&workflow.id)
61        .execute(&self.pool)
62        .await?;
63        Ok(())
64    }
65
66    async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
67        sqlx::query(r#"
68            INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
69            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
70        "#)
71        .bind(&task.id)
72        .bind(&task.workflow_id)
73        .bind(&task.name)
74        .bind(serde_json::to_string(&task.status)?)
75        .bind(&task.assignee)
76        .bind(&task.data)
77        .bind(&task.created_at)
78        .bind(&task.updated_at)
79        .bind(&task.completed_at)
80        .execute(&self.pool)
81        .await?;
82        Ok(())
83    }
84
85    async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
86        let row = sqlx::query(r#"
87            SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
88            FROM tasks
89            WHERE id = ?
90        "#)
91        .bind(task_id)
92        .fetch_one(&self.pool)
93        .await?;
94
95        Ok(TaskState {
96            id: row.get(0),
97            workflow_id: row.get(1),
98            name: row.get(2),
99            status: serde_json::from_str(&row.get::<String, _>(3))?,
100            assignee: row.get(4),
101            data: row.get(5),
102            created_at: row.get(6),
103            updated_at: row.get(7),
104            completed_at: row.get(8),
105        })
106    }
107
108    async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
109        sqlx::query(
110            r#"
111            UPDATE tasks
112            SET name = ?, status = ?, assignee = ?, data = ?, updated_at = ?, completed_at = ?
113            WHERE id = ? AND workflow_id = ?
114        "#,
115        )
116        .bind(&task.name)
117        .bind(serde_json::to_string(&task.status)?)
118        .bind(&task.assignee)
119        .bind(&task.data)
120        .bind(&task.updated_at)
121        .bind(&task.completed_at)
122        .bind(&task.id)
123        .bind(&task.workflow_id)
124        .execute(&self.pool)
125        .await?;
126        Ok(())
127    }
128
129    async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
130        let rows = sqlx::query(r#"
131            SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
132            FROM tasks
133            WHERE workflow_id = ?
134        "#)
135        .bind(workflow_id)
136        .fetch_all(&self.pool)
137        .await?;
138
139        let mut tasks = Vec::with_capacity(rows.len());
140        for row in rows {
141            tasks.push(TaskState {
142                id: row.get(0),
143                workflow_id: row.get(1),
144                name: row.get(2),
145                status: serde_json::from_str(&row.get::<String, _>(3))?,
146                assignee: row.get(4),
147                data: row.get(5),
148                created_at: row.get(6),
149                updated_at: row.get(7),
150                completed_at: row.get(8),
151            });
152        }
153
154        Ok(tasks)
155    }
156}