use serde_json::json;
use systemprompt_identifiers::{ContextId, TaskId, UserId};
use systemprompt_models::{Config, TaskMetadata};
use crate::models::a2a::{Message, Task, TaskState, TaskStatus};
#[derive(Debug)]
pub struct BroadcastTaskCreatedParams<'a> {
pub task_id: &'a TaskId,
pub context_id: &'a ContextId,
pub user_id: &'a str,
pub user_message: &'a Message,
pub agent_name: &'a str,
pub token: &'a str,
}
pub async fn broadcast_task_created(params: BroadcastTaskCreatedParams<'_>) {
let BroadcastTaskCreatedParams {
task_id,
context_id,
user_id,
user_message,
agent_name,
token,
} = params;
let event_task = build_event_task(task_id, context_id, user_message, agent_name);
let api_url = match Config::get() {
Ok(c) => c.api_internal_url.clone(),
Err(e) => {
tracing::warn!(error = %e, "Cannot broadcast task_created: config unavailable");
return;
},
};
let webhook_url = format!("{}/api/v1/webhook/broadcast", api_url);
let payload = json!({
"event_type": "task_created",
"entity_id": task_id.as_str(),
"context_id": context_id.as_str(),
"user_id": user_id,
"task_data": json!({ "task": event_task })
});
let client = reqwest::Client::new();
match client
.post(&webhook_url)
.header("Authorization", format!("Bearer {token}"))
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
tracing::info!(task_id = %task_id, "Broadcast task_created via webhook");
} else {
tracing::warn!(
task_id = %task_id,
status = %response.status(),
"Webhook broadcast failed"
);
}
},
Err(e) => {
tracing::warn!(task_id = %task_id, error = %e, "Webhook broadcast error");
},
}
}
pub async fn broadcast_task_completed(task: &Task, user_id: &UserId, token: &str) {
let api_url = match Config::get() {
Ok(c) => c.api_internal_url.clone(),
Err(e) => {
tracing::warn!(error = %e, "Cannot broadcast task_completed: config unavailable");
return;
},
};
let webhook_url = format!("{}/api/v1/webhook/broadcast", api_url);
let task_data = match serde_json::to_value(task) {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, task_id = %task.id, "Failed to serialize task for broadcast");
serde_json::json!(null)
},
};
let payload = json!({
"event_type": "task_completed",
"entity_id": task.id.as_str(),
"context_id": task.context_id.as_str(),
"user_id": user_id.as_str(),
"task_data": task_data
});
let client = reqwest::Client::new();
match client
.post(&webhook_url)
.header("Authorization", format!("Bearer {token}"))
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
tracing::info!(task_id = %task.id, "Broadcast task_completed");
} else {
tracing::warn!(
task_id = %task.id,
status = %response.status(),
"Webhook failed"
);
}
},
Err(e) => {
tracing::warn!(task_id = %task.id, error = %e, "Webhook error");
},
}
}
fn build_event_task(
task_id: &TaskId,
context_id: &ContextId,
user_message: &Message,
agent_name: &str,
) -> Task {
Task {
id: task_id.clone(),
context_id: context_id.clone(),
status: TaskStatus {
state: TaskState::Submitted,
message: None,
timestamp: Some(chrono::Utc::now()),
},
history: Some(vec![user_message.clone()]),
artifacts: None,
metadata: Some(TaskMetadata::new_agent_message(agent_name.to_string())),
created_at: Some(chrono::Utc::now()),
last_modified: Some(chrono::Utc::now()),
}
}
pub async fn broadcast_artifact_created(
artifact: &crate::models::a2a::Artifact,
task_id: &TaskId,
context_id: &ContextId,
user_id: &UserId,
token: &str,
) -> Result<(), anyhow::Error> {
let api_url = Config::get()
.map_err(|e| anyhow::anyhow!("Config unavailable for artifact broadcast: {}", e))?
.api_internal_url
.clone();
let webhook_url = format!("{}/api/v1/webhook/broadcast", api_url);
let payload = json!({
"event_type": "artifact_created",
"entity_id": artifact.id.clone(),
"context_id": context_id.as_str(),
"user_id": user_id.as_str(),
});
let client = reqwest::Client::new();
let response = client
.post(&webhook_url)
.header("Authorization", format!("Bearer {token}"))
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await
.map_err(|e| anyhow::anyhow!("Webhook request failed: {}", e))?;
if response.status().is_success() {
tracing::info!(
artifact_id = %artifact.id,
task_id = %task_id,
"Broadcast artifact_created via webhook"
);
Ok(())
} else {
Err(anyhow::anyhow!(
"Webhook broadcast failed: status={}, artifact_id={}",
response.status(),
artifact.id
))
}
}