Skip to main content

rust_viewflow/db/
sqlite.rs

1use crate::core::{TaskState, WorkflowResult, WorkflowState};
2use crate::db::WorkflowDatabase;
3use chrono;
4use serde_json;
5use sqlx::Row;
6
7#[async_trait::async_trait]
8impl WorkflowDatabase for super::SqliteDatabase {
9    async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
10        sqlx::query(
11            r#"
12            INSERT INTO workflows (id, name, status, data, created_at, updated_at)
13            VALUES (?, ?, ?, ?, ?, ?)
14        "#,
15        )
16        .bind(&workflow.id)
17        .bind(&workflow.name)
18        .bind(serde_json::to_string(&workflow.status)?)
19        .bind(serde_json::to_string(&workflow.data)?)
20        .bind(workflow.created_at.to_rfc3339())
21        .bind(workflow.updated_at.to_rfc3339())
22        .execute(&self.pool)
23        .await?;
24        Ok(())
25    }
26
27    async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
28        let row = sqlx::query(
29            r#"
30            SELECT id, name, status, data, created_at, updated_at
31            FROM workflows
32            WHERE id = ?
33        "#,
34        )
35        .bind(workflow_id)
36        .fetch_one(&self.pool)
37        .await?;
38
39        Ok(WorkflowState {
40            id: row.get(0),
41            name: row.get(1),
42            status: serde_json::from_str(&row.get::<String, _>(2))?,
43            data: serde_json::from_str(&row.get::<String, _>(3))?,
44            created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(4))?.into(),
45            updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(5))?.into(),
46        })
47    }
48
49    async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
50        sqlx::query(
51            r#"
52            UPDATE workflows
53            SET name = ?, status = ?, data = ?, updated_at = ?
54            WHERE id = ?
55        "#,
56        )
57        .bind(&workflow.name)
58        .bind(serde_json::to_string(&workflow.status)?)
59        .bind(serde_json::to_string(&workflow.data)?)
60        .bind(workflow.updated_at.to_rfc3339())
61        .bind(&workflow.id)
62        .execute(&self.pool)
63        .await?;
64        Ok(())
65    }
66
67    async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
68        let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());
69
70        sqlx::query(r#"
71            INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
72            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
73        "#)
74        .bind(&task.id)
75        .bind(&task.workflow_id)
76        .bind(&task.name)
77        .bind(serde_json::to_string(&task.status)?)
78        .bind(&task.assignee)
79        .bind(serde_json::to_string(&task.data)?)
80        .bind(task.created_at.to_rfc3339())
81        .bind(task.updated_at.to_rfc3339())
82        .bind(completed_at_str)
83        .execute(&self.pool)
84        .await?;
85        Ok(())
86    }
87
88    async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
89        let row = sqlx::query(r#"
90            SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
91            FROM tasks
92            WHERE id = ?
93        "#)
94        .bind(task_id)
95        .fetch_one(&self.pool)
96        .await?;
97
98        let completed_at = row
99            .get::<Option<String>, _>(8)
100            .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
101            .map(|dt| dt.into());
102
103        Ok(TaskState {
104            id: row.get(0),
105            workflow_id: row.get(1),
106            name: row.get(2),
107            status: serde_json::from_str(&row.get::<String, _>(3))?,
108            assignee: row.get(4),
109            data: serde_json::from_str(&row.get::<String, _>(5))?,
110            created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
111            updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
112            completed_at,
113        })
114    }
115
116    async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
117        let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());
118
119        sqlx::query(
120            r#"
121            UPDATE tasks
122            SET name = ?, status = ?, assignee = ?, data = ?, updated_at = ?, completed_at = ?
123            WHERE id = ? AND workflow_id = ?
124        "#,
125        )
126        .bind(&task.name)
127        .bind(serde_json::to_string(&task.status)?)
128        .bind(&task.assignee)
129        .bind(serde_json::to_string(&task.data)?)
130        .bind(task.updated_at.to_rfc3339())
131        .bind(completed_at_str)
132        .bind(&task.id)
133        .bind(&task.workflow_id)
134        .execute(&self.pool)
135        .await?;
136        Ok(())
137    }
138
139    async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
140        let rows = sqlx::query(r#"
141            SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
142            FROM tasks
143            WHERE workflow_id = ?
144        "#)
145        .bind(workflow_id)
146        .fetch_all(&self.pool)
147        .await?;
148
149        let mut tasks = Vec::with_capacity(rows.len());
150        for row in rows {
151            let completed_at = row
152                .get::<Option<String>, _>(8)
153                .map(|s| chrono::DateTime::parse_from_rfc3339(&s))
154                .transpose()?
155                .map(Into::into);
156
157            tasks.push(TaskState {
158                id: row.get(0),
159                workflow_id: row.get(1),
160                name: row.get(2),
161                status: serde_json::from_str(&row.get::<String, _>(3))?,
162                assignee: row.get(4),
163                data: serde_json::from_str(&row.get::<String, _>(5))?,
164                created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
165                updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
166                completed_at,
167            });
168        }
169
170        Ok(tasks)
171    }
172}