use crate::core::{TaskState, WorkflowResult, WorkflowState};
use crate::db::WorkflowDatabase;
use chrono;
use serde_json;
use sqlx::Row;
#[async_trait::async_trait]
impl WorkflowDatabase for super::SqliteDatabase {
async fn create_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
sqlx::query(
r#"
INSERT INTO workflows (id, name, status, data, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
"#,
)
.bind(&workflow.id)
.bind(&workflow.name)
.bind(serde_json::to_string(&workflow.status)?)
.bind(serde_json::to_string(&workflow.data)?)
.bind(workflow.created_at.to_rfc3339())
.bind(workflow.updated_at.to_rfc3339())
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
let row = sqlx::query(
r#"
SELECT id, name, status, data, created_at, updated_at
FROM workflows
WHERE id = ?
"#,
)
.bind(workflow_id)
.fetch_one(&self.pool)
.await?;
Ok(WorkflowState {
id: row.get(0),
name: row.get(1),
status: serde_json::from_str(&row.get::<String, _>(2))?,
data: serde_json::from_str(&row.get::<String, _>(3))?,
created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(4))?.into(),
updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(5))?.into(),
})
}
async fn update_workflow(&self, workflow: &WorkflowState) -> WorkflowResult<()> {
sqlx::query(
r#"
UPDATE workflows
SET name = ?, status = ?, data = ?, updated_at = ?
WHERE id = ?
"#,
)
.bind(&workflow.name)
.bind(serde_json::to_string(&workflow.status)?)
.bind(serde_json::to_string(&workflow.data)?)
.bind(workflow.updated_at.to_rfc3339())
.bind(&workflow.id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn create_task(&self, task: &TaskState) -> WorkflowResult<()> {
let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());
sqlx::query(r#"
INSERT INTO tasks (id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"#)
.bind(&task.id)
.bind(&task.workflow_id)
.bind(&task.name)
.bind(serde_json::to_string(&task.status)?)
.bind(&task.assignee)
.bind(serde_json::to_string(&task.data)?)
.bind(task.created_at.to_rfc3339())
.bind(task.updated_at.to_rfc3339())
.bind(completed_at_str)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_task(&self, task_id: &str) -> WorkflowResult<TaskState> {
let row = sqlx::query(r#"
SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
FROM tasks
WHERE id = ?
"#)
.bind(task_id)
.fetch_one(&self.pool)
.await?;
let completed_at = row
.get::<Option<String>, _>(8)
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.into());
Ok(TaskState {
id: row.get(0),
workflow_id: row.get(1),
name: row.get(2),
status: serde_json::from_str(&row.get::<String, _>(3))?,
assignee: row.get(4),
data: serde_json::from_str(&row.get::<String, _>(5))?,
created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
completed_at,
})
}
async fn update_task(&self, task: &TaskState) -> WorkflowResult<()> {
let completed_at_str = task.completed_at.map(|dt| dt.to_rfc3339());
sqlx::query(
r#"
UPDATE tasks
SET name = ?, status = ?, assignee = ?, data = ?, updated_at = ?, completed_at = ?
WHERE id = ? AND workflow_id = ?
"#,
)
.bind(&task.name)
.bind(serde_json::to_string(&task.status)?)
.bind(&task.assignee)
.bind(serde_json::to_string(&task.data)?)
.bind(task.updated_at.to_rfc3339())
.bind(completed_at_str)
.bind(&task.id)
.bind(&task.workflow_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_tasks_by_workflow(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
let rows = sqlx::query(r#"
SELECT id, workflow_id, name, status, assignee, data, created_at, updated_at, completed_at
FROM tasks
WHERE workflow_id = ?
"#)
.bind(workflow_id)
.fetch_all(&self.pool)
.await?;
let mut tasks = Vec::with_capacity(rows.len());
for row in rows {
let completed_at = row
.get::<Option<String>, _>(8)
.map(|s| chrono::DateTime::parse_from_rfc3339(&s))
.transpose()?
.map(Into::into);
tasks.push(TaskState {
id: row.get(0),
workflow_id: row.get(1),
name: row.get(2),
status: serde_json::from_str(&row.get::<String, _>(3))?,
assignee: row.get(4),
data: serde_json::from_str(&row.get::<String, _>(5))?,
created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(6))?.into(),
updated_at: chrono::DateTime::parse_from_rfc3339(&row.get::<String, _>(7))?.into(),
completed_at,
});
}
Ok(tasks)
}
}