1use crate::core::{TaskState, WorkflowResult, WorkflowState};
2use sqlx::{MySql, Pool, Postgres, Sqlite};
3
4#[async_trait::async_trait]
6pub trait WorkflowDatabase: Send + Sync {
7 async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()>;
8 async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState>;
9 async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()>;
10 async fn create_task(&self, task: &TaskState) -> WorkflowResult<()>;
11 async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState>;
12 async fn update_task(&self, task: &TaskState) -> WorkflowResult<()>;
13 async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>>;
14}
15
16pub struct PostgresDatabase {
18 pool: Pool<Postgres>,
19}
20
21impl PostgresDatabase {
22 pub fn new(pool: Pool<Postgres>) -> Self {
23 Self { pool }
24 }
25
26 pub fn pool(&self) -> &Pool<Postgres> {
27 &self.pool
28 }
29}
30
31pub struct MySqlDatabase {
33 pool: Pool<MySql>,
34}
35
36impl MySqlDatabase {
37 pub fn new(pool: Pool<MySql>) -> Self {
38 Self { pool }
39 }
40
41 pub fn pool(&self) -> &Pool<MySql> {
42 &self.pool
43 }
44}
45
46pub struct SqliteDatabase {
48 pool: Pool<Sqlite>,
49}
50
51impl SqliteDatabase {
52 pub fn new(pool: Pool<Sqlite>) -> Self {
53 Self { pool }
54 }
55
56 pub fn pool(&self) -> &Pool<Sqlite> {
57 &self.pool
58 }
59}
60
61pub mod memory;
62pub mod mysql;
63pub mod postgres;
64pub mod sqlite;
65
66pub use memory::MemoryDatabase;
67
68pub async fn migrate_postgres(pool: &Pool<Postgres>) -> WorkflowResult<()> {
70 sqlx::query(
71 r#"
72 CREATE TABLE IF NOT EXISTS workflows (
73 id TEXT PRIMARY KEY,
74 name TEXT NOT NULL,
75 status TEXT NOT NULL,
76 data JSONB NOT NULL,
77 created_at TIMESTAMP NOT NULL,
78 updated_at TIMESTAMP NOT NULL
79 )
80 "#,
81 )
82 .execute(pool)
83 .await?;
84
85 sqlx::query(
86 r#"
87 CREATE TABLE IF NOT EXISTS tasks (
88 id TEXT PRIMARY KEY,
89 workflow_id TEXT NOT NULL REFERENCES workflows(id),
90 name TEXT NOT NULL,
91 status TEXT NOT NULL,
92 assignee TEXT,
93 data JSONB NOT NULL,
94 created_at TIMESTAMP NOT NULL,
95 updated_at TIMESTAMP NOT NULL,
96 completed_at TIMESTAMP
97 )
98 "#,
99 )
100 .execute(pool)
101 .await?;
102
103 Ok(())
104}
105
106pub async fn migrate_mysql(pool: &Pool<MySql>) -> WorkflowResult<()> {
107 sqlx::query(
108 r#"
109 CREATE TABLE IF NOT EXISTS workflows (
110 id VARCHAR(255) PRIMARY KEY,
111 name VARCHAR(255) NOT NULL,
112 status VARCHAR(50) NOT NULL,
113 data JSON NOT NULL,
114 created_at DATETIME NOT NULL,
115 updated_at DATETIME NOT NULL
116 )
117 "#,
118 )
119 .execute(pool)
120 .await?;
121
122 sqlx::query(
123 r#"
124 CREATE TABLE IF NOT EXISTS tasks (
125 id VARCHAR(255) PRIMARY KEY,
126 workflow_id VARCHAR(255) NOT NULL,
127 name VARCHAR(255) NOT NULL,
128 status VARCHAR(50) NOT NULL,
129 assignee VARCHAR(255),
130 data JSON NOT NULL,
131 created_at DATETIME NOT NULL,
132 updated_at DATETIME NOT NULL,
133 completed_at DATETIME,
134 FOREIGN KEY (workflow_id) REFERENCES workflows(id)
135 )
136 "#,
137 )
138 .execute(pool)
139 .await?;
140
141 Ok(())
142}
143
144pub async fn migrate_sqlite(pool: &Pool<Sqlite>) -> WorkflowResult<()> {
145 sqlx::query(
146 r#"
147 CREATE TABLE IF NOT EXISTS workflows (
148 id TEXT PRIMARY KEY,
149 name TEXT NOT NULL,
150 status TEXT NOT NULL,
151 data TEXT NOT NULL,
152 created_at TEXT NOT NULL,
153 updated_at TEXT NOT NULL
154 )
155 "#,
156 )
157 .execute(pool)
158 .await?;
159
160 sqlx::query(
161 r#"
162 CREATE TABLE IF NOT EXISTS tasks (
163 id TEXT PRIMARY KEY,
164 workflow_id TEXT NOT NULL,
165 name TEXT NOT NULL,
166 status TEXT NOT NULL,
167 assignee TEXT,
168 data TEXT NOT NULL,
169 created_at TEXT NOT NULL,
170 updated_at TEXT NOT NULL,
171 completed_at TEXT,
172 FOREIGN KEY (workflow_id) REFERENCES workflows(id)
173 )
174 "#,
175 )
176 .execute(pool)
177 .await?;
178
179 Ok(())
180}