systemprompt_agent/services/a2a_server/streaming/
webhook_client.rs1use reqwest::Client;
2use serde::Serialize;
3use systemprompt_models::{A2AEvent, AgUiEvent, Config};
4
5#[derive(Debug, thiserror::Error)]
6pub enum WebhookError {
7 #[error("HTTP request failed: {0}")]
8 Request(#[from] reqwest::Error),
9 #[error("Webhook returned error status {status}: {message}")]
10 StatusError { status: u16, message: String },
11}
12
13fn get_api_url() -> String {
14 Config::get()
15 .map(|c| c.api_internal_url.clone())
16 .unwrap_or_else(|_| "http://localhost:3000".to_string())
17}
18
19pub async fn broadcast_agui_event(
20 user_id: &str,
21 event: AgUiEvent,
22 auth_token: &str,
23) -> Result<usize, WebhookError> {
24 let url = format!("{}/api/v1/webhook/agui", get_api_url());
25 let event_type = event.event_type();
26
27 if auth_token.is_empty() {
28 tracing::warn!(
29 event_type = ?event_type,
30 user_id = %user_id,
31 "Attempting to broadcast AGUI event with empty auth_token - webhook will fail"
32 );
33 }
34
35 tracing::debug!(event_type = ?event_type, url = %url, has_token = !auth_token.is_empty(), "Sending AGUI event");
36
37 #[derive(Serialize)]
38 struct AgUiWebhookPayload {
39 #[serde(flatten)]
40 event: AgUiEvent,
41 user_id: String,
42 }
43
44 let payload = AgUiWebhookPayload {
45 event,
46 user_id: user_id.to_string(),
47 };
48
49 let client = Client::new();
50 let response = client
51 .post(&url)
52 .header("Authorization", format!("Bearer {}", auth_token))
53 .header("Content-Type", "application/json")
54 .json(&payload)
55 .send()
56 .await;
57
58 match response {
59 Ok(resp) => {
60 if resp.status().is_success() {
61 #[derive(serde::Deserialize)]
62 struct WebhookResponse {
63 connection_count: usize,
64 }
65
66 match resp.json::<WebhookResponse>().await {
67 Ok(result) => {
68 tracing::debug!(
69 event_type = ?event_type,
70 connection_count = result.connection_count,
71 "AGUI event broadcasted"
72 );
73 Ok(result.connection_count)
74 },
75 Err(e) => {
76 tracing::error!(
77 event_type = ?event_type,
78 error = %e,
79 "AGUI response parse error"
80 );
81 Err(WebhookError::Request(e))
82 },
83 }
84 } else {
85 let status = resp.status().as_u16();
86 let message = resp
87 .text()
88 .await
89 .unwrap_or_else(|e| format!("<error reading response: {}>", e));
90 tracing::error!(
91 event_type = ?event_type,
92 status = status,
93 message = %message,
94 "AGUI event failed"
95 );
96 Err(WebhookError::StatusError { status, message })
97 }
98 },
99 Err(e) => {
100 tracing::error!(event_type = ?event_type, error = %e, "AGUI request error");
101 Err(WebhookError::Request(e))
102 },
103 }
104}
105
106pub async fn broadcast_a2a_event(
107 user_id: &str,
108 event: A2AEvent,
109 auth_token: &str,
110) -> Result<usize, WebhookError> {
111 let url = format!("{}/api/v1/webhook/a2a", get_api_url());
112 let event_type = event.event_type();
113
114 tracing::debug!(event_type = ?event_type, url = %url, "Sending A2A event");
115
116 #[derive(Serialize)]
117 struct A2AWebhookPayload {
118 #[serde(flatten)]
119 event: A2AEvent,
120 user_id: String,
121 }
122
123 let payload = A2AWebhookPayload {
124 event,
125 user_id: user_id.to_string(),
126 };
127
128 let client = Client::new();
129 let response = client
130 .post(&url)
131 .header("Authorization", format!("Bearer {}", auth_token))
132 .header("Content-Type", "application/json")
133 .json(&payload)
134 .send()
135 .await;
136
137 match response {
138 Ok(resp) => {
139 if resp.status().is_success() {
140 #[derive(serde::Deserialize)]
141 struct WebhookResponse {
142 connection_count: usize,
143 }
144
145 match resp.json::<WebhookResponse>().await {
146 Ok(result) => {
147 tracing::debug!(
148 event_type = ?event_type,
149 connection_count = result.connection_count,
150 "A2A event broadcasted"
151 );
152 Ok(result.connection_count)
153 },
154 Err(e) => {
155 tracing::error!(
156 event_type = ?event_type,
157 error = %e,
158 "A2A response parse error"
159 );
160 Err(WebhookError::Request(e))
161 },
162 }
163 } else {
164 let status = resp.status().as_u16();
165 let message = resp
166 .text()
167 .await
168 .unwrap_or_else(|e| format!("<error reading response: {}>", e));
169 tracing::error!(
170 event_type = ?event_type,
171 status = status,
172 message = %message,
173 "A2A event failed"
174 );
175 Err(WebhookError::StatusError { status, message })
176 }
177 },
178 Err(e) => {
179 tracing::error!(event_type = ?event_type, error = %e, "A2A request error");
180 Err(WebhookError::Request(e))
181 },
182 }
183}
184
185#[derive(Clone, Debug)]
186pub struct WebhookContext {
187 user_id: String,
188 auth_token: String,
189}
190
191impl WebhookContext {
192 pub fn new(user_id: impl Into<String>, auth_token: impl Into<String>) -> Self {
193 Self {
194 user_id: user_id.into(),
195 auth_token: auth_token.into(),
196 }
197 }
198
199 pub fn user_id(&self) -> &str {
200 &self.user_id
201 }
202
203 pub async fn broadcast_agui(&self, event: AgUiEvent) -> Result<usize, WebhookError> {
204 broadcast_agui_event(&self.user_id, event, &self.auth_token).await
205 }
206
207 pub async fn broadcast_a2a(&self, event: A2AEvent) -> Result<usize, WebhookError> {
208 broadcast_a2a_event(&self.user_id, event, &self.auth_token).await
209 }
210}