use chrono::{DateTime, Utc};
use systemprompt_identifiers::TaskId;
use systemprompt_models::{ExecutionStep, PlannedTool, StepContent, StepId, StepStatus};
use systemprompt_traits::RepositoryError;
use super::ExecutionStepRepository;
use super::parse::{ParseStepParams, parse_step};
impl ExecutionStepRepository {
pub async fn create(&self, step: &ExecutionStep) -> Result<(), RepositoryError> {
let step_id_str = step.step_id.as_str();
let task_id = &step.task_id;
let status_str = step.status.to_string();
let step_type_str = step.content.step_type().to_string();
let title = step.content.title();
let content_json = serde_json::to_value(&step.content).map_err(|e| {
RepositoryError::Internal(format!("Failed to serialize step content: {e}"))
})?;
sqlx::query!(
r#"INSERT INTO task_execution_steps (
step_id, task_id, step_type, title, status, content, started_at, completed_at, duration_ms, error_message
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"#,
step_id_str,
task_id.as_str(),
step_type_str,
title,
status_str,
content_json,
step.started_at,
step.completed_at,
step.duration_ms,
step.error_message
)
.execute(&*self.write_pool)
.await
.map_err(|e| RepositoryError::Internal(format!("Failed to create execution step: {e}")))?;
Ok(())
}
pub async fn complete_step(
&self,
step_id: &StepId,
started_at: DateTime<Utc>,
tool_result: Option<serde_json::Value>,
) -> Result<(), RepositoryError> {
let completed_at = Utc::now();
let duration_ms = (completed_at - started_at).num_milliseconds() as i32;
let step_id_str = step_id.as_str();
let status_str = StepStatus::Completed.to_string();
if let Some(result) = tool_result {
sqlx::query!(
r#"UPDATE task_execution_steps SET
status = $2,
completed_at = $3,
duration_ms = $4,
content = jsonb_set(content, '{tool_result}', $5)
WHERE step_id = $1"#,
step_id_str,
status_str,
completed_at,
duration_ms,
result
)
.execute(&*self.write_pool)
.await
.map_err(|e| {
RepositoryError::Internal(format!(
"Failed to complete execution step: {step_id}: {e}"
))
})?;
} else {
sqlx::query!(
r#"UPDATE task_execution_steps SET
status = $2,
completed_at = $3,
duration_ms = $4
WHERE step_id = $1"#,
step_id_str,
status_str,
completed_at,
duration_ms
)
.execute(&*self.write_pool)
.await
.map_err(|e| {
RepositoryError::Internal(format!(
"Failed to complete execution step: {step_id}: {e}"
))
})?;
}
Ok(())
}
pub async fn fail_step(
&self,
step_id: &StepId,
started_at: DateTime<Utc>,
error_message: &str,
) -> Result<(), RepositoryError> {
let completed_at = Utc::now();
let duration_ms = (completed_at - started_at).num_milliseconds() as i32;
let step_id_str = step_id.as_str();
let status_str = StepStatus::Failed.to_string();
sqlx::query!(
r#"UPDATE task_execution_steps SET
status = $2,
completed_at = $3,
duration_ms = $4,
error_message = $5
WHERE step_id = $1"#,
step_id_str,
status_str,
completed_at,
duration_ms,
error_message
)
.execute(&*self.write_pool)
.await
.map_err(|e| {
RepositoryError::Internal(format!("Failed to fail execution step: {step_id}: {e}"))
})?;
Ok(())
}
pub async fn fail_in_progress_steps_for_task(
&self,
task_id: &TaskId,
error_message: &str,
) -> Result<u64, RepositoryError> {
let completed_at = Utc::now();
let in_progress_str = StepStatus::InProgress.to_string();
let failed_str = StepStatus::Failed.to_string();
let task_id_str = task_id.as_str();
let result = sqlx::query!(
r#"UPDATE task_execution_steps SET
status = $3,
completed_at = $4,
error_message = $5
WHERE task_id = $1 AND status = $2"#,
task_id_str,
in_progress_str,
failed_str,
completed_at,
error_message
)
.execute(&*self.write_pool)
.await
.map_err(|e| {
RepositoryError::Internal(format!(
"Failed to fail in-progress steps for task: {task_id}: {e}"
))
})?;
Ok(result.rows_affected())
}
pub async fn complete_planning_step(
&self,
step_id: &StepId,
started_at: DateTime<Utc>,
reasoning: Option<String>,
planned_tools: Option<Vec<PlannedTool>>,
) -> Result<ExecutionStep, RepositoryError> {
let completed_at = Utc::now();
let duration_ms = (completed_at - started_at).num_milliseconds() as i32;
let step_id_str = step_id.as_str();
let status_str = StepStatus::Completed.to_string();
let content = StepContent::planning(reasoning, planned_tools);
let content_json = serde_json::to_value(&content).map_err(|e| {
RepositoryError::Internal(format!("Failed to serialize planning content: {e}"))
})?;
let row = sqlx::query!(
r#"UPDATE task_execution_steps SET
status = $2,
completed_at = $3,
duration_ms = $4,
content = $5
WHERE step_id = $1
RETURNING step_id, task_id as "task_id!: TaskId", status, content,
started_at as "started_at!", completed_at, duration_ms, error_message"#,
step_id_str,
status_str,
completed_at,
duration_ms,
content_json
)
.fetch_one(&*self.write_pool)
.await
.map_err(|e| {
RepositoryError::Internal(format!("Failed to complete planning step: {step_id}: {e}"))
})?;
parse_step(ParseStepParams {
step_id: row.step_id,
task_id: row.task_id,
status: row.status,
content: row.content,
started_at: row.started_at,
completed_at: row.completed_at,
duration_ms: row.duration_ms,
error_message: row.error_message,
})
}
}