systemprompt_agent/services/a2a_server/streaming/
webhook_client.rs1use reqwest::Client;
2use serde::Serialize;
3use systemprompt_models::{A2AEvent, AgUiEvent, Config};
4
5#[derive(Debug, thiserror::Error)]
6pub enum WebhookError {
7 #[error("HTTP request failed: {0}")]
8 Request(#[from] reqwest::Error),
9 #[error("Webhook returned error status {status}: {message}")]
10 StatusError { status: u16, message: String },
11}
12
13#[derive(Serialize)]
14struct AgUiWebhookPayload {
15 #[serde(flatten)]
16 event: AgUiEvent,
17 user_id: String,
18}
19
20#[derive(Serialize)]
21struct A2AWebhookPayload {
22 #[serde(flatten)]
23 event: A2AEvent,
24 user_id: String,
25}
26
27fn get_api_url() -> String {
28 Config::get().map_or_else(
29 |_| "http://localhost:3000".to_string(),
30 |c| c.api_internal_url.clone(),
31 )
32}
33
34pub async fn broadcast_agui_event(
35 user_id: &str,
36 event: AgUiEvent,
37 auth_token: &str,
38) -> Result<usize, WebhookError> {
39 let url = format!("{}/api/v1/webhook/agui", get_api_url());
40 let event_type = event.event_type();
41
42 if auth_token.is_empty() {
43 tracing::warn!(
44 event_type = ?event_type,
45 user_id = %user_id,
46 "Attempting to broadcast AGUI event with empty auth_token - webhook will fail"
47 );
48 }
49
50 tracing::debug!(event_type = ?event_type, url = %url, has_token = !auth_token.is_empty(), "Sending AGUI event");
51
52 let payload = AgUiWebhookPayload {
53 event,
54 user_id: user_id.to_string(),
55 };
56
57 let client = Client::new();
58 let response = client
59 .post(&url)
60 .header("Authorization", format!("Bearer {}", auth_token))
61 .header("Content-Type", "application/json")
62 .json(&payload)
63 .send()
64 .await;
65
66 match response {
67 Ok(resp) => {
68 if resp.status().is_success() {
69 #[derive(serde::Deserialize)]
70 struct WebhookResponse {
71 connection_count: usize,
72 }
73
74 match resp.json::<WebhookResponse>().await {
75 Ok(result) => {
76 tracing::debug!(
77 event_type = ?event_type,
78 connection_count = result.connection_count,
79 "AGUI event broadcasted"
80 );
81 Ok(result.connection_count)
82 },
83 Err(e) => {
84 tracing::error!(
85 event_type = ?event_type,
86 error = %e,
87 "AGUI response parse error"
88 );
89 Err(WebhookError::Request(e))
90 },
91 }
92 } else {
93 let status = resp.status().as_u16();
94 let message = resp
95 .text()
96 .await
97 .unwrap_or_else(|e| format!("<error reading response: {}>", e));
98 tracing::error!(
99 event_type = ?event_type,
100 status = status,
101 message = %message,
102 "AGUI event failed"
103 );
104 Err(WebhookError::StatusError { status, message })
105 }
106 },
107 Err(e) => {
108 tracing::error!(event_type = ?event_type, error = %e, "AGUI request error");
109 Err(WebhookError::Request(e))
110 },
111 }
112}
113
114pub async fn broadcast_a2a_event(
115 user_id: &str,
116 event: A2AEvent,
117 auth_token: &str,
118) -> Result<usize, WebhookError> {
119 let url = format!("{}/api/v1/webhook/a2a", get_api_url());
120 let event_type = event.event_type();
121
122 tracing::debug!(event_type = ?event_type, url = %url, "Sending A2A event");
123
124 let payload = A2AWebhookPayload {
125 event,
126 user_id: user_id.to_string(),
127 };
128
129 let client = Client::new();
130 let response = client
131 .post(&url)
132 .header("Authorization", format!("Bearer {}", auth_token))
133 .header("Content-Type", "application/json")
134 .json(&payload)
135 .send()
136 .await;
137
138 match response {
139 Ok(resp) => {
140 if resp.status().is_success() {
141 #[derive(serde::Deserialize)]
142 struct WebhookResponse {
143 connection_count: usize,
144 }
145
146 match resp.json::<WebhookResponse>().await {
147 Ok(result) => {
148 tracing::debug!(
149 event_type = ?event_type,
150 connection_count = result.connection_count,
151 "A2A event broadcasted"
152 );
153 Ok(result.connection_count)
154 },
155 Err(e) => {
156 tracing::error!(
157 event_type = ?event_type,
158 error = %e,
159 "A2A response parse error"
160 );
161 Err(WebhookError::Request(e))
162 },
163 }
164 } else {
165 let status = resp.status().as_u16();
166 let message = resp
167 .text()
168 .await
169 .unwrap_or_else(|e| format!("<error reading response: {}>", e));
170 tracing::error!(
171 event_type = ?event_type,
172 status = status,
173 message = %message,
174 "A2A event failed"
175 );
176 Err(WebhookError::StatusError { status, message })
177 }
178 },
179 Err(e) => {
180 tracing::error!(event_type = ?event_type, error = %e, "A2A request error");
181 Err(WebhookError::Request(e))
182 },
183 }
184}
185
186#[derive(Clone, Debug)]
187pub struct WebhookContext {
188 user_id: String,
189 auth_token: String,
190}
191
192impl WebhookContext {
193 pub fn new(user_id: impl Into<String>, auth_token: impl Into<String>) -> Self {
194 Self {
195 user_id: user_id.into(),
196 auth_token: auth_token.into(),
197 }
198 }
199
200 pub fn user_id(&self) -> &str {
201 &self.user_id
202 }
203
204 pub async fn broadcast_agui(&self, event: AgUiEvent) -> Result<usize, WebhookError> {
205 broadcast_agui_event(&self.user_id, event, &self.auth_token).await
206 }
207
208 pub async fn broadcast_a2a(&self, event: A2AEvent) -> Result<usize, WebhookError> {
209 broadcast_a2a_event(&self.user_id, event, &self.auth_token).await
210 }
211}