Skip to main content

systemprompt_agent/services/a2a_server/streaming/
webhook_client.rs

1use reqwest::Client;
2use serde::Serialize;
3use systemprompt_identifiers::UserId;
4use systemprompt_models::{A2AEvent, AgUiEvent, Config};
5
6#[derive(Debug, thiserror::Error)]
7pub enum WebhookError {
8    #[error("HTTP request failed: {0}")]
9    Request(#[from] reqwest::Error),
10    #[error("Webhook returned error status {status}: {message}")]
11    StatusError { status: u16, message: String },
12}
13
14#[derive(Serialize)]
15struct AgUiWebhookPayload {
16    #[serde(flatten)]
17    event: AgUiEvent,
18    user_id: UserId,
19}
20
21#[derive(Serialize)]
22struct A2AWebhookPayload {
23    #[serde(flatten)]
24    event: A2AEvent,
25    user_id: UserId,
26}
27
28fn get_api_url() -> String {
29    Config::get().map_or_else(
30        |_| "http://localhost:3000".to_string(),
31        |c| c.api_internal_url.clone(),
32    )
33}
34
35pub async fn broadcast_agui_event(
36    user_id: &UserId,
37    event: AgUiEvent,
38    auth_token: &str,
39) -> Result<usize, WebhookError> {
40    let url = format!("{}/api/v1/webhook/agui", get_api_url());
41    let event_type = event.event_type();
42
43    if auth_token.is_empty() {
44        tracing::warn!(
45            event_type = ?event_type,
46            user_id = %user_id,
47            "Attempting to broadcast AGUI event with empty auth_token - webhook will fail"
48        );
49    }
50
51    tracing::debug!(event_type = ?event_type, url = %url, has_token = !auth_token.is_empty(), "Sending AGUI event");
52
53    let payload = AgUiWebhookPayload {
54        event,
55        user_id: user_id.clone(),
56    };
57
58    let client = Client::new();
59    let response = client
60        .post(&url)
61        .header("Authorization", format!("Bearer {}", auth_token))
62        .header("Content-Type", "application/json")
63        .json(&payload)
64        .send()
65        .await;
66
67    match response {
68        Ok(resp) => {
69            if resp.status().is_success() {
70                #[derive(serde::Deserialize)]
71                struct WebhookResponse {
72                    connection_count: usize,
73                }
74
75                match resp.json::<WebhookResponse>().await {
76                    Ok(result) => {
77                        tracing::debug!(
78                            event_type = ?event_type,
79                            connection_count = result.connection_count,
80                            "AGUI event broadcasted"
81                        );
82                        Ok(result.connection_count)
83                    },
84                    Err(e) => {
85                        tracing::error!(
86                            event_type = ?event_type,
87                            error = %e,
88                            "AGUI response parse error"
89                        );
90                        Err(WebhookError::Request(e))
91                    },
92                }
93            } else {
94                let status = resp.status().as_u16();
95                let message = resp
96                    .text()
97                    .await
98                    .unwrap_or_else(|e| format!("<error reading response: {}>", e));
99                tracing::error!(
100                    event_type = ?event_type,
101                    status = status,
102                    message = %message,
103                    "AGUI event failed"
104                );
105                Err(WebhookError::StatusError { status, message })
106            }
107        },
108        Err(e) => {
109            tracing::error!(event_type = ?event_type, error = %e, "AGUI request error");
110            Err(WebhookError::Request(e))
111        },
112    }
113}
114
115pub async fn broadcast_a2a_event(
116    user_id: &UserId,
117    event: A2AEvent,
118    auth_token: &str,
119) -> Result<usize, WebhookError> {
120    let url = format!("{}/api/v1/webhook/a2a", get_api_url());
121    let event_type = event.event_type();
122
123    tracing::debug!(event_type = ?event_type, url = %url, "Sending A2A event");
124
125    let payload = A2AWebhookPayload {
126        event,
127        user_id: user_id.clone(),
128    };
129
130    let client = Client::new();
131    let response = client
132        .post(&url)
133        .header("Authorization", format!("Bearer {}", auth_token))
134        .header("Content-Type", "application/json")
135        .json(&payload)
136        .send()
137        .await;
138
139    match response {
140        Ok(resp) => {
141            if resp.status().is_success() {
142                #[derive(serde::Deserialize)]
143                struct WebhookResponse {
144                    connection_count: usize,
145                }
146
147                match resp.json::<WebhookResponse>().await {
148                    Ok(result) => {
149                        tracing::debug!(
150                            event_type = ?event_type,
151                            connection_count = result.connection_count,
152                            "A2A event broadcasted"
153                        );
154                        Ok(result.connection_count)
155                    },
156                    Err(e) => {
157                        tracing::error!(
158                            event_type = ?event_type,
159                            error = %e,
160                            "A2A response parse error"
161                        );
162                        Err(WebhookError::Request(e))
163                    },
164                }
165            } else {
166                let status = resp.status().as_u16();
167                let message = resp
168                    .text()
169                    .await
170                    .unwrap_or_else(|e| format!("<error reading response: {}>", e));
171                tracing::error!(
172                    event_type = ?event_type,
173                    status = status,
174                    message = %message,
175                    "A2A event failed"
176                );
177                Err(WebhookError::StatusError { status, message })
178            }
179        },
180        Err(e) => {
181            tracing::error!(event_type = ?event_type, error = %e, "A2A request error");
182            Err(WebhookError::Request(e))
183        },
184    }
185}
186
187#[derive(Clone, Debug)]
188pub struct WebhookContext {
189    user_id: UserId,
190    auth_token: String,
191}
192
193impl WebhookContext {
194    pub fn new(user_id: UserId, auth_token: impl Into<String>) -> Self {
195        Self {
196            user_id,
197            auth_token: auth_token.into(),
198        }
199    }
200
201    pub const fn user_id(&self) -> &UserId {
202        &self.user_id
203    }
204
205    pub async fn broadcast_agui(&self, event: AgUiEvent) -> Result<usize, WebhookError> {
206        broadcast_agui_event(&self.user_id, event, &self.auth_token).await
207    }
208
209    pub async fn broadcast_a2a(&self, event: A2AEvent) -> Result<usize, WebhookError> {
210        broadcast_a2a_event(&self.user_id, event, &self.auth_token).await
211    }
212}