systemprompt_agent/services/a2a_server/streaming/
webhook_client.rs1use reqwest::Client;
2use serde::Serialize;
3use systemprompt_identifiers::UserId;
4use systemprompt_models::{A2AEvent, AgUiEvent, Config};
5
6#[derive(Debug, thiserror::Error)]
7pub enum WebhookError {
8 #[error("HTTP request failed: {0}")]
9 Request(#[from] reqwest::Error),
10 #[error("Webhook returned error status {status}: {message}")]
11 StatusError { status: u16, message: String },
12}
13
14#[derive(Serialize)]
15struct AgUiWebhookPayload {
16 #[serde(flatten)]
17 event: AgUiEvent,
18 user_id: UserId,
19}
20
21#[derive(Serialize)]
22struct A2AWebhookPayload {
23 #[serde(flatten)]
24 event: A2AEvent,
25 user_id: UserId,
26}
27
28fn get_api_url() -> String {
29 Config::get().map_or_else(
30 |_| "http://localhost:3000".to_string(),
31 |c| c.api_internal_url.clone(),
32 )
33}
34
35pub async fn broadcast_agui_event(
36 user_id: &UserId,
37 event: AgUiEvent,
38 auth_token: &str,
39) -> Result<usize, WebhookError> {
40 let url = format!("{}/api/v1/webhook/agui", get_api_url());
41 let event_type = event.event_type();
42
43 if auth_token.is_empty() {
44 tracing::warn!(
45 event_type = ?event_type,
46 user_id = %user_id,
47 "Attempting to broadcast AGUI event with empty auth_token - webhook will fail"
48 );
49 }
50
51 tracing::debug!(event_type = ?event_type, url = %url, has_token = !auth_token.is_empty(), "Sending AGUI event");
52
53 let payload = AgUiWebhookPayload {
54 event,
55 user_id: user_id.clone(),
56 };
57
58 let client = Client::new();
59 let response = client
60 .post(&url)
61 .header("Authorization", format!("Bearer {}", auth_token))
62 .header("Content-Type", "application/json")
63 .json(&payload)
64 .send()
65 .await;
66
67 match response {
68 Ok(resp) => {
69 if resp.status().is_success() {
70 #[derive(serde::Deserialize)]
71 struct WebhookResponse {
72 connection_count: usize,
73 }
74
75 match resp.json::<WebhookResponse>().await {
76 Ok(result) => {
77 tracing::debug!(
78 event_type = ?event_type,
79 connection_count = result.connection_count,
80 "AGUI event broadcasted"
81 );
82 Ok(result.connection_count)
83 },
84 Err(e) => {
85 tracing::error!(
86 event_type = ?event_type,
87 error = %e,
88 "AGUI response parse error"
89 );
90 Err(WebhookError::Request(e))
91 },
92 }
93 } else {
94 let status = resp.status().as_u16();
95 let message = resp
96 .text()
97 .await
98 .unwrap_or_else(|e| format!("<error reading response: {}>", e));
99 tracing::error!(
100 event_type = ?event_type,
101 status = status,
102 message = %message,
103 "AGUI event failed"
104 );
105 Err(WebhookError::StatusError { status, message })
106 }
107 },
108 Err(e) => {
109 tracing::error!(event_type = ?event_type, error = %e, "AGUI request error");
110 Err(WebhookError::Request(e))
111 },
112 }
113}
114
115pub async fn broadcast_a2a_event(
116 user_id: &UserId,
117 event: A2AEvent,
118 auth_token: &str,
119) -> Result<usize, WebhookError> {
120 let url = format!("{}/api/v1/webhook/a2a", get_api_url());
121 let event_type = event.event_type();
122
123 tracing::debug!(event_type = ?event_type, url = %url, "Sending A2A event");
124
125 let payload = A2AWebhookPayload {
126 event,
127 user_id: user_id.clone(),
128 };
129
130 let client = Client::new();
131 let response = client
132 .post(&url)
133 .header("Authorization", format!("Bearer {}", auth_token))
134 .header("Content-Type", "application/json")
135 .json(&payload)
136 .send()
137 .await;
138
139 match response {
140 Ok(resp) => {
141 if resp.status().is_success() {
142 #[derive(serde::Deserialize)]
143 struct WebhookResponse {
144 connection_count: usize,
145 }
146
147 match resp.json::<WebhookResponse>().await {
148 Ok(result) => {
149 tracing::debug!(
150 event_type = ?event_type,
151 connection_count = result.connection_count,
152 "A2A event broadcasted"
153 );
154 Ok(result.connection_count)
155 },
156 Err(e) => {
157 tracing::error!(
158 event_type = ?event_type,
159 error = %e,
160 "A2A response parse error"
161 );
162 Err(WebhookError::Request(e))
163 },
164 }
165 } else {
166 let status = resp.status().as_u16();
167 let message = resp
168 .text()
169 .await
170 .unwrap_or_else(|e| format!("<error reading response: {}>", e));
171 tracing::error!(
172 event_type = ?event_type,
173 status = status,
174 message = %message,
175 "A2A event failed"
176 );
177 Err(WebhookError::StatusError { status, message })
178 }
179 },
180 Err(e) => {
181 tracing::error!(event_type = ?event_type, error = %e, "A2A request error");
182 Err(WebhookError::Request(e))
183 },
184 }
185}
186
187#[derive(Clone, Debug)]
188pub struct WebhookContext {
189 user_id: UserId,
190 auth_token: String,
191}
192
193impl WebhookContext {
194 pub fn new(user_id: UserId, auth_token: impl Into<String>) -> Self {
195 Self {
196 user_id,
197 auth_token: auth_token.into(),
198 }
199 }
200
201 pub const fn user_id(&self) -> &UserId {
202 &self.user_id
203 }
204
205 pub async fn broadcast_agui(&self, event: AgUiEvent) -> Result<usize, WebhookError> {
206 broadcast_agui_event(&self.user_id, event, &self.auth_token).await
207 }
208
209 pub async fn broadcast_a2a(&self, event: A2AEvent) -> Result<usize, WebhookError> {
210 broadcast_a2a_event(&self.user_id, event, &self.auth_token).await
211 }
212}