use serde_json::json;
use systemprompt_agent::models::a2a::TaskState;
use systemprompt_identifiers::{TaskId, UserId};
use systemprompt_models::ExecutionStep;
use systemprompt_runtime::AppContext;
use super::types::{AgUiWebhookData, WebhookRequest};
use super::validation::validate_json_serializable;
use systemprompt_agent::repository::content::ArtifactRepository;
use systemprompt_agent::repository::context::ContextRepository;
use systemprompt_agent::repository::execution::ExecutionStepRepository;
use systemprompt_agent::repository::task::TaskRepository;
pub async fn load_event_data(
app_context: &AppContext,
request: &WebhookRequest,
) -> Result<AgUiWebhookData, anyhow::Error> {
let db = app_context.db_pool();
match request.event_type.as_str() {
"task_completed" => load_task_completed(db, request).await,
"artifact_created" => load_artifact_created(db, request).await,
"message_received" => load_message_received(db, request).await,
"context_updated" => load_context_updated(db, request).await,
"execution_step" => load_execution_step(request),
"task_created" => load_task_created(request),
_ => Err(anyhow::anyhow!(
"Unknown event type: {}",
request.event_type
)),
}
}
async fn load_task_completed(
db: &systemprompt_database::DbPool,
request: &WebhookRequest,
) -> Result<AgUiWebhookData, anyhow::Error> {
let task_repo = TaskRepository::new(db)?;
let artifact_repo = ArtifactRepository::new(db)?;
let step_repo = ExecutionStepRepository::new(db)?;
let task_id = TaskId::new(&request.entity_id);
let timestamp = chrono::Utc::now();
task_repo
.update_task_state(&task_id, TaskState::Completed, ×tamp)
.await
.map_err(|e| anyhow::anyhow!("Failed to complete task: {}", e))?;
let mut task = task_repo
.get_task(&task_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to load task: {}", e))?
.ok_or_else(|| anyhow::anyhow!("Task not found: {}", request.entity_id))?;
let artifacts = artifact_repo
.get_artifacts_by_task(&task_id)
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, task_id = %task_id, "Failed to load artifacts for webhook event");
vec![]
});
let messages = task_repo
.get_messages_by_task(&task_id)
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, task_id = %task_id, "Failed to load messages for webhook event");
vec![]
});
let execution_steps = step_repo.list_by_task(&task_id).await.unwrap_or_else(|e| {
tracing::warn!(error = %e, task_id = %task_id, "Failed to load execution steps for webhook event");
vec![]
});
task.history = if messages.is_empty() {
None
} else {
Some(messages)
};
if !execution_steps.is_empty() {
if let Some(ref mut metadata) = task.metadata {
metadata.execution_steps = Some(execution_steps.clone());
}
}
let payload = json!({
"task": task,
"artifacts": if artifacts.is_empty() { None } else { Some(&artifacts) },
"executionSteps": if execution_steps.is_empty() { None } else { Some(&execution_steps) },
});
validate_json_serializable(&payload)
.map_err(|e| anyhow::anyhow!("JSON validation failed: {}", e))?;
Ok(AgUiWebhookData {
event_name: "task_completed".to_string(),
payload,
})
}
async fn load_artifact_created(
db: &systemprompt_database::DbPool,
request: &WebhookRequest,
) -> Result<AgUiWebhookData, anyhow::Error> {
let artifact_repo = ArtifactRepository::new(db)?;
let artifact_id = systemprompt_identifiers::ArtifactId::new(&request.entity_id);
let artifact = artifact_repo
.get_artifact_by_id(&artifact_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to load artifact: {}", e))?
.ok_or_else(|| anyhow::anyhow!("Artifact not found: {}", request.entity_id))?;
Ok(AgUiWebhookData {
event_name: "artifact".to_string(),
payload: json!({
"artifact": artifact,
"taskId": artifact.metadata.task_id,
"contextId": request.context_id,
}),
})
}
async fn load_message_received(
db: &systemprompt_database::DbPool,
request: &WebhookRequest,
) -> Result<AgUiWebhookData, anyhow::Error> {
let pool = db
.pool_arc()
.map_err(|e| anyhow::anyhow!("Database error: {}", e))?;
let message = sqlx::query!(
r#"SELECT m.id, m.message_id, STRING_AGG(mp.id::text, ',') as part_ids
FROM task_messages m
LEFT JOIN message_parts mp ON m.message_id = mp.message_id
WHERE m.message_id = $1
GROUP BY m.id, m.message_id"#,
request.entity_id
)
.fetch_optional(pool.as_ref())
.await
.map_err(|e| anyhow::anyhow!("Failed to load message: {}", e))?;
if message.is_some() {
Ok(AgUiWebhookData {
event_name: "message_received".to_string(),
payload: json!({
"messageId": request.entity_id,
}),
})
} else {
Err(anyhow::anyhow!("Message not found: {}", request.entity_id))
}
}
async fn load_context_updated(
db: &systemprompt_database::DbPool,
request: &WebhookRequest,
) -> Result<AgUiWebhookData, anyhow::Error> {
let context_repo = ContextRepository::new(db)?;
let context_id = request.context_id.clone();
let user_id = UserId::new(request.user_id.clone());
let context = context_repo
.get_context(&context_id, &user_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to load context: {}", e))?;
Ok(AgUiWebhookData {
event_name: "context_updated".to_string(),
payload: json!({
"contextId": request.context_id,
"context": context,
}),
})
}
fn load_execution_step(request: &WebhookRequest) -> Result<AgUiWebhookData, anyhow::Error> {
let step_data = request
.step_data
.as_ref()
.ok_or_else(|| anyhow::anyhow!("step_data required for execution_step events"))?;
let step: ExecutionStep = serde_json::from_value(step_data.clone())
.map_err(|e| anyhow::anyhow!("Invalid step_data format: {}", e))?;
let event_name = match step.status {
systemprompt_models::StepStatus::Completed => "step_finished",
_ => "step_started",
};
Ok(AgUiWebhookData {
event_name: event_name.to_string(),
payload: json!({
"stepName": step.step_type().to_string(),
"taskId": step.task_id,
"step": step,
}),
})
}
#[derive(serde::Deserialize)]
struct TaskCreatedData {
task: systemprompt_agent::models::a2a::Task,
}
fn load_task_created(request: &WebhookRequest) -> Result<AgUiWebhookData, anyhow::Error> {
let task_data = request.task_data.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"task_created event MUST include task_data. Task ID: {}",
request.entity_id
)
})?;
let payload: TaskCreatedData = serde_json::from_value(task_data.clone()).map_err(|e| {
anyhow::anyhow!(
"Failed to deserialize task_created data for task {}: {}",
request.entity_id,
e
)
})?;
if payload.task.history.as_ref().is_none_or(Vec::is_empty) {
return Err(anyhow::anyhow!(
"task_created payload has empty history - user message is missing! Task ID: {}",
request.entity_id
));
}
Ok(AgUiWebhookData {
event_name: "run_started".to_string(),
payload: json!({
"task": payload.task,
"threadId": request.context_id,
"runId": request.entity_id,
}),
})
}