use crate::core::{TaskState, WorkflowResult, WorkflowState};
use sqlx::{MySql, Pool, Postgres, Sqlite};
#[async_trait::async_trait]
pub trait WorkflowDatabase: Send + Sync {
async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()>;
async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState>;
async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()>;
async fn create_task(&self, task: &TaskState) -> WorkflowResult<()>;
async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState>;
async fn update_task(&self, task: &TaskState) -> WorkflowResult<()>;
async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>>;
}
pub struct PostgresDatabase {
pool: Pool<Postgres>,
}
impl PostgresDatabase {
pub fn new(pool: Pool<Postgres>) -> Self {
Self { pool }
}
pub fn pool(&self) -> &Pool<Postgres> {
&self.pool
}
}
pub struct MySqlDatabase {
pool: Pool<MySql>,
}
impl MySqlDatabase {
pub fn new(pool: Pool<MySql>) -> Self {
Self { pool }
}
pub fn pool(&self) -> &Pool<MySql> {
&self.pool
}
}
pub struct SqliteDatabase {
pool: Pool<Sqlite>,
}
impl SqliteDatabase {
pub fn new(pool: Pool<Sqlite>) -> Self {
Self { pool }
}
pub fn pool(&self) -> &Pool<Sqlite> {
&self.pool
}
}
pub mod memory;
pub mod mysql;
pub mod postgres;
pub mod sqlite;
pub use memory::MemoryDatabase;
pub async fn migrate_postgres(pool: &Pool<Postgres>) -> WorkflowResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
name TEXT NOT NULL,
status TEXT NOT NULL,
assignee TEXT,
data JSONB NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn migrate_mysql(pool: &Pool<MySql>) -> WorkflowResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS workflows (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
data JSON NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS tasks (
id VARCHAR(255) PRIMARY KEY,
workflow_id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
assignee VARCHAR(255),
data JSON NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
completed_at DATETIME,
FOREIGN KEY (workflow_id) REFERENCES workflows(id)
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn migrate_sqlite(pool: &Pool<Sqlite>) -> WorkflowResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
data TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT NOT NULL,
assignee TEXT,
data TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT,
FOREIGN KEY (workflow_id) REFERENCES workflows(id)
)
"#,
)
.execute(pool)
.await?;
Ok(())
}