1use crate::core::{TaskState, WorkflowResult, WorkflowState};
2use crate::db::WorkflowDatabase;
3use serde_json;
4use sqlx::Row;
5
6#[async_trait::async_trait]
7impl WorkflowDatabase for super::MySqlDatabase {
8 async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
9 sqlx::query(
10 r#"
11 INSERT INTO workflows (id, name, status, data, created_at, updated_at)
12 VALUES (?, ?, ?, ?, ?, ?)
13 "#,
14 )
15 .bind(&workflow.id)
16 .bind(&workflow.name)
17 .bind(serde_json::to_string(&workflow.status)?)
18 .bind(&workflow.data)
19 .bind(&workflow.created_at)
20 .bind(&workflow.updated_at)
21 .execute(&self.pool)
22 .await?;
23 Ok(())
24 }
25
26 async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
27 let row = sqlx::query(
28 r#"
29 SELECT id, name, status, data, created_at, updated_at
30 FROM workflows
31 WHERE id = ?
32 "#,
33 )
34 .bind(workflow_id)
35 .fetch_one(&self.pool)
36 .await?;
37
38 Ok(WorkflowState {
39 id: row.get(0),
40 name: row.get(1),
41 status: serde_json::from_str(&row.get::<String, _>(2))?,
42 data: row.get(3),
43 created_at: row.get(4),
44 updated_at: row.get(5),
45 })
46 }
47
48 async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
49 sqlx::query(
50 r#"
51 UPDATE workflows
52 SET name = ?, status = ?, data = ?, updated_at = ?
53 WHERE id = ?
54 "#,
55 )
56 .bind(&workflow.name)
57 .bind(serde_json::to_string(&workflow.status)?)
58 .bind(&workflow.data)
59 .bind(&workflow.updated_at)
60 .bind(&workflow.id)
61 .execute(&self.pool)
62 .await?;
63 Ok(())
64 }
65
66 async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
67 sqlx::query(r#"
68 INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
69 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
70 "#)
71 .bind(&task.id)
72 .bind(&task.workflow_id)
73 .bind(&task.name)
74 .bind(serde_json::to_string(&task.status)?)
75 .bind(&task.assignee)
76 .bind(&task.data)
77 .bind(&task.created_at)
78 .bind(&task.updated_at)
79 .bind(&task.completed_at)
80 .execute(&self.pool)
81 .await?;
82 Ok(())
83 }
84
85 async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
86 let row = sqlx::query(r#"
87 SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
88 FROM tasks
89 WHERE id = ?
90 "#)
91 .bind(task_id)
92 .fetch_one(&self.pool)
93 .await?;
94
95 Ok(TaskState {
96 id: row.get(0),
97 workflow_id: row.get(1),
98 name: row.get(2),
99 status: serde_json::from_str(&row.get::<String, _>(3))?,
100 assignee: row.get(4),
101 data: row.get(5),
102 created_at: row.get(6),
103 updated_at: row.get(7),
104 completed_at: row.get(8),
105 })
106 }
107
108 async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
109 sqlx::query(
110 r#"
111 UPDATE tasks
112 SET name = ?, status = ?, assignee = ?, data = ?, updated_at = ?, completed_at = ?
113 WHERE id = ? AND workflow_id = ?
114 "#,
115 )
116 .bind(&task.name)
117 .bind(serde_json::to_string(&task.status)?)
118 .bind(&task.assignee)
119 .bind(&task.data)
120 .bind(&task.updated_at)
121 .bind(&task.completed_at)
122 .bind(&task.id)
123 .bind(&task.workflow_id)
124 .execute(&self.pool)
125 .await?;
126 Ok(())
127 }
128
129 async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
130 let rows = sqlx::query(r#"
131 SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
132 FROM tasks
133 WHERE workflow_id = ?
134 "#)
135 .bind(workflow_id)
136 .fetch_all(&self.pool)
137 .await?;
138
139 let mut tasks = Vec::with_capacity(rows.len());
140 for row in rows {
141 tasks.push(TaskState {
142 id: row.get(0),
143 workflow_id: row.get(1),
144 name: row.get(2),
145 status: serde_json::from_str(&row.get::<String, _>(3))?,
146 assignee: row.get(4),
147 data: row.get(5),
148 created_at: row.get(6),
149 updated_at: row.get(7),
150 completed_at: row.get(8),
151 });
152 }
153
154 Ok(tasks)
155 }
156}