use crate::core::{TaskState, WorkflowResult, WorkflowState};
use crate::db::WorkflowDatabase;
use serde_json;
use sqlx::Row;
#[async_trait::async_trait]
impl WorkflowDatabase for super::PostgresDatabase {
async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
sqlx::query(
r#"
INSERT INTO workflows (id, name, status, data, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
)
.bind(&workflow.id)
.bind(&workflow.name)
.bind(serde_json::to_string(&workflow.status)?)
.bind(&workflow.data)
.bind(&workflow.created_at)
.bind(&workflow.updated_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
let row = sqlx::query(
r#"
SELECT id, name, status, data, created_at, updated_at
FROM workflows
WHERE id = $1
"#,
)
.bind(workflow_id)
.fetch_one(&self.pool)
.await?;
Ok(WorkflowState {
id: row.get(0),
name: row.get(1),
status: serde_json::from_str(&row.get::<String, _>(2))?,
data: row.get(3),
created_at: row.get(4),
updated_at: row.get(5),
})
}
async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
sqlx::query(
r#"
UPDATE workflows
SET name = $2, status = $3, data = $4, updated_at = $5
WHERE id = $1
"#,
)
.bind(&workflow.id)
.bind(&workflow.name)
.bind(serde_json::to_string(&workflow.status)?)
.bind(&workflow.data)
.bind(&workflow.updated_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
sqlx::query(r#"
INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#)
.bind(&task.id)
.bind(&task.workflow_id)
.bind(&task.name)
.bind(serde_json::to_string(&task.status)?)
.bind(&task.assignee)
.bind(&task.data)
.bind(&task.created_at)
.bind(&task.updated_at)
.bind(&task.completed_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
let row = sqlx::query(r#"
SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
FROM tasks
WHERE id = $1
"#)
.bind(task_id)
.fetch_one(&self.pool)
.await?;
Ok(TaskState {
id: row.get(0),
workflow_id: row.get(1),
name: row.get(2),
status: serde_json::from_str(&row.get::<String, _>(3))?,
assignee: row.get(4),
data: row.get(5),
created_at: row.get(6),
updated_at: row.get(7),
completed_at: row.get(8),
})
}
async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
sqlx::query(
r#"
UPDATE tasks
SET name = $3, status = $4, assignee = $5, data = $6, updated_at = $7, completed_at = $8
WHERE id = $1 AND workflow_id = $2
"#,
)
.bind(&task.id)
.bind(&task.workflow_id)
.bind(&task.name)
.bind(serde_json::to_string(&task.status)?)
.bind(&task.assignee)
.bind(&task.data)
.bind(&task.updated_at)
.bind(&task.completed_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
let rows = sqlx::query(r#"
SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
FROM tasks
WHERE workflow_id = $1
"#)
.bind(workflow_id)
.fetch_all(&self.pool)
.await?;
let mut tasks = Vec::with_capacity(rows.len());
for row in rows {
tasks.push(TaskState {
id: row.get(0),
workflow_id: row.get(1),
name: row.get(2),
status: serde_json::from_str(&row.get::<String, _>(3))?,
assignee: row.get(4),
data: row.get(5),
created_at: row.get(6),
updated_at: row.get(7),
completed_at: row.get(8),
});
}
Ok(tasks)
}
}