1use crate::core::{TaskState, WorkflowResult, WorkflowState};
2use crate::db::WorkflowDatabase;
3use chrono;
4use serde_json;
5use sqlx::Row;
6
7#[async_trait::async_trait]
8impl WorkflowDatabase for super::SqliteDatabase {
9 async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
10 sqlx::query(
11 r#"
12 INSERT INTO workflows (id, name, status, data, created_at, updated_at)
13 VALUES (?, ?, ?, ?, ?, ?)
14 "#,
15 )
16 .bind(&workflow.id)
17 .bind(&workflow.name)
18 .bind(serde_json::to_string(&workflow.status)?)
19 .bind(serde_json::to_string(&workflow.data)?)
20 .bind(workflow.created_at.to_rfc3339())
21 .bind(workflow.updated_at.to_rfc3339())
22 .execute(&self.pool)
23 .await?;
24 Ok(())
25 }
26
27 async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
28 let row = sqlx::query(
29 r#"
30 SELECT id, name, status, data, created_at, updated_at
31 FROM workflows
32 WHERE id = ?
33 "#,
34 )
35 .bind(workflow_id)
36 .fetch_one(&self.pool)
37 .await?;
38
39 Ok(WorkflowState {
40 id: row.get(0),
41 name: row.get(1),
42 status: serde_json::from_str(&row.get::<String, _>(2))?,
43 data: serde_json::from_str(&row.get::<String, _>(3))?,
44 created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(4))?.into(),
45 updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(5))?.into(),
46 })
47 }
48
49 async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
50 sqlx::query(
51 r#"
52 UPDATE workflows
53 SET name = ?, status = ?, data = ?, updated_at = ?
54 WHERE id = ?
55 "#,
56 )
57 .bind(&workflow.name)
58 .bind(serde_json::to_string(&workflow.status)?)
59 .bind(serde_json::to_string(&workflow.data)?)
60 .bind(workflow.updated_at.to_rfc3339())
61 .bind(&workflow.id)
62 .execute(&self.pool)
63 .await?;
64 Ok(())
65 }
66
67 async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
68 let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());
69
70 sqlx::query(r#"
71 INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
72 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
73 "#)
74 .bind(&task.id)
75 .bind(&task.workflow_id)
76 .bind(&task.name)
77 .bind(serde_json::to_string(&task.status)?)
78 .bind(&task.assignee)
79 .bind(serde_json::to_string(&task.data)?)
80 .bind(task.created_at.to_rfc3339())
81 .bind(task.updated_at.to_rfc3339())
82 .bind(completed_at_str)
83 .execute(&self.pool)
84 .await?;
85 Ok(())
86 }
87
88 async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
89 let row = sqlx::query(r#"
90 SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
91 FROM tasks
92 WHERE id = ?
93 "#)
94 .bind(task_id)
95 .fetch_one(&self.pool)
96 .await?;
97
98 let completed_at = row
99 .get::<Option<String>, _>(8)
100 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
101 .map(|dt| dt.into());
102
103 Ok(TaskState {
104 id: row.get(0),
105 workflow_id: row.get(1),
106 name: row.get(2),
107 status: serde_json::from_str(&row.get::<String, _>(3))?,
108 assignee: row.get(4),
109 data: serde_json::from_str(&row.get::<String, _>(5))?,
110 created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
111 updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
112 completed_at,
113 })
114 }
115
116 async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
117 let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());
118
119 sqlx::query(
120 r#"
121 UPDATE tasks
122 SET name = ?, status = ?, assignee = ?, data = ?, updated_at = ?, completed_at = ?
123 WHERE id = ? AND workflow_id = ?
124 "#,
125 )
126 .bind(&task.name)
127 .bind(serde_json::to_string(&task.status)?)
128 .bind(&task.assignee)
129 .bind(serde_json::to_string(&task.data)?)
130 .bind(task.updated_at.to_rfc3339())
131 .bind(completed_at_str)
132 .bind(&task.id)
133 .bind(&task.workflow_id)
134 .execute(&self.pool)
135 .await?;
136 Ok(())
137 }
138
139 async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
140 let rows = sqlx::query(r#"
141 SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
142 FROM tasks
143 WHERE workflow_id = ?
144 "#)
145 .bind(workflow_id)
146 .fetch_all(&self.pool)
147 .await?;
148
149 let mut tasks = Vec::with_capacity(rows.len());
150 for row in rows {
151 let completed_at = row
152 .get::<Option<String>, _>(8)
153 .map(|s| chrono::DateTime::parse_from_rfc3339(&s))
154 .transpose()?
155 .map(Into::into);
156
157 tasks.push(TaskState {
158 id: row.get(0),
159 workflow_id: row.get(1),
160 name: row.get(2),
161 status: serde_json::from_str(&row.get::<String, _>(3))?,
162 assignee: row.get(4),
163 data: serde_json::from_str(&row.get::<String, _>(5))?,
164 created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
165 updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
166 completed_at,
167 });
168 }
169
170 Ok(tasks)
171 }
172}