Skip to main content

rust_viewflow/db/
mod.rs

1use crate::core::{TaskState, WorkflowResult, WorkflowState};
2use sqlx::{MySql, Pool, Postgres, Sqlite};
3
4/// Database backend trait
5#[async_trait::async_trait]
6pub trait WorkflowDatabase: Send + Sync {
7    async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()>;
8    async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState>;
9    async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()>;
10    async fn create_task(&self, task: &TaskState) -> WorkflowResult<()>;
11    async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState>;
12    async fn update_task(&self, task: &TaskState) -> WorkflowResult<()>;
13    async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>>;
14}
15
16/// PostgreSQL implementation
17pub struct PostgresDatabase {
18    pool: Pool<Postgres>,
19}
20
21impl PostgresDatabase {
22    pub fn new(pool: Pool<Postgres>) -> Self {
23        Self { pool }
24    }
25
26    pub fn pool(&self) -> &Pool<Postgres> {
27        &self.pool
28    }
29}
30
31/// MySQL implementation
32pub struct MySqlDatabase {
33    pool: Pool<MySql>,
34}
35
36impl MySqlDatabase {
37    pub fn new(pool: Pool<MySql>) -> Self {
38        Self { pool }
39    }
40
41    pub fn pool(&self) -> &Pool<MySql> {
42        &self.pool
43    }
44}
45
46/// SQLite implementation
47pub struct SqliteDatabase {
48    pool: Pool<Sqlite>,
49}
50
51impl SqliteDatabase {
52    pub fn new(pool: Pool<Sqlite>) -> Self {
53        Self { pool }
54    }
55
56    pub fn pool(&self) -> &Pool<Sqlite> {
57        &self.pool
58    }
59}
60
61pub mod memory;
62pub mod mysql;
63pub mod postgres;
64pub mod sqlite;
65
66pub use memory::MemoryDatabase;
67
68/// Database migration functions
69pub async fn migrate_postgres(pool: &Pool<Postgres>) -> WorkflowResult<()> {
70    sqlx::query(
71        r#"
72        CREATE TABLE IF NOT EXISTS workflows (
73            id TEXT PRIMARY KEY,
74            name TEXT NOT NULL,
75            status TEXT NOT NULL,
76            data JSONB NOT NULL,
77            created_at TIMESTAMP NOT NULL,
78            updated_at TIMESTAMP NOT NULL
79        )
80    "#,
81    )
82    .execute(pool)
83    .await?;
84
85    sqlx::query(
86        r#"
87        CREATE TABLE IF NOT EXISTS tasks (
88            id TEXT PRIMARY KEY,
89            workflow_id TEXT NOT NULL REFERENCES workflows(id),
90            name TEXT NOT NULL,
91            status TEXT NOT NULL,
92            assignee TEXT,
93            data JSONB NOT NULL,
94            created_at TIMESTAMP NOT NULL,
95            updated_at TIMESTAMP NOT NULL,
96            completed_at TIMESTAMP
97        )
98    "#,
99    )
100    .execute(pool)
101    .await?;
102
103    Ok(())
104}
105
106pub async fn migrate_mysql(pool: &Pool<MySql>) -> WorkflowResult<()> {
107    sqlx::query(
108        r#"
109        CREATE TABLE IF NOT EXISTS workflows (
110            id VARCHAR(255) PRIMARY KEY,
111            name VARCHAR(255) NOT NULL,
112            status VARCHAR(50) NOT NULL,
113            data JSON NOT NULL,
114            created_at DATETIME NOT NULL,
115            updated_at DATETIME NOT NULL
116        )
117    "#,
118    )
119    .execute(pool)
120    .await?;
121
122    sqlx::query(
123        r#"
124        CREATE TABLE IF NOT EXISTS tasks (
125            id VARCHAR(255) PRIMARY KEY,
126            workflow_id VARCHAR(255) NOT NULL,
127            name VARCHAR(255) NOT NULL,
128            status VARCHAR(50) NOT NULL,
129            assignee VARCHAR(255),
130            data JSON NOT NULL,
131            created_at DATETIME NOT NULL,
132            updated_at DATETIME NOT NULL,
133            completed_at DATETIME,
134            FOREIGN KEY (workflow_id) REFERENCES workflows(id)
135        )
136    "#,
137    )
138    .execute(pool)
139    .await?;
140
141    Ok(())
142}
143
144pub async fn migrate_sqlite(pool: &Pool<Sqlite>) -> WorkflowResult<()> {
145    sqlx::query(
146        r#"
147        CREATE TABLE IF NOT EXISTS workflows (
148            id TEXT PRIMARY KEY,
149            name TEXT NOT NULL,
150            status TEXT NOT NULL,
151            data TEXT NOT NULL,
152            created_at TEXT NOT NULL,
153            updated_at TEXT NOT NULL
154        )
155    "#,
156    )
157    .execute(pool)
158    .await?;
159
160    sqlx::query(
161        r#"
162        CREATE TABLE IF NOT EXISTS tasks (
163            id TEXT PRIMARY KEY,
164            workflow_id TEXT NOT NULL,
165            name TEXT NOT NULL,
166            status TEXT NOT NULL,
167            assignee TEXT,
168            data TEXT NOT NULL,
169            created_at TEXT NOT NULL,
170            updated_at TEXT NOT NULL,
171            completed_at TEXT,
172            FOREIGN KEY (workflow_id) REFERENCES workflows(id)
173        )
174    "#,
175    )
176    .execute(pool)
177    .await?;
178
179    Ok(())
180}