Skip to main content

systemprompt_agent/services/a2a_server/streaming/
webhook_client.rs

1use reqwest::Client;
2use serde::Serialize;
3use systemprompt_models::{A2AEvent, AgUiEvent, Config};
4
5#[derive(Debug, thiserror::Error)]
6pub enum WebhookError {
7    #[error("HTTP request failed: {0}")]
8    Request(#[from] reqwest::Error),
9    #[error("Webhook returned error status {status}: {message}")]
10    StatusError { status: u16, message: String },
11}
12
13#[derive(Serialize)]
14struct AgUiWebhookPayload {
15    #[serde(flatten)]
16    event: AgUiEvent,
17    user_id: String,
18}
19
20#[derive(Serialize)]
21struct A2AWebhookPayload {
22    #[serde(flatten)]
23    event: A2AEvent,
24    user_id: String,
25}
26
27fn get_api_url() -> String {
28    Config::get().map_or_else(
29        |_| "http://localhost:3000".to_string(),
30        |c| c.api_internal_url.clone(),
31    )
32}
33
34pub async fn broadcast_agui_event(
35    user_id: &str,
36    event: AgUiEvent,
37    auth_token: &str,
38) -> Result<usize, WebhookError> {
39    let url = format!("{}/api/v1/webhook/agui", get_api_url());
40    let event_type = event.event_type();
41
42    if auth_token.is_empty() {
43        tracing::warn!(
44            event_type = ?event_type,
45            user_id = %user_id,
46            "Attempting to broadcast AGUI event with empty auth_token - webhook will fail"
47        );
48    }
49
50    tracing::debug!(event_type = ?event_type, url = %url, has_token = !auth_token.is_empty(), "Sending AGUI event");
51
52    let payload = AgUiWebhookPayload {
53        event,
54        user_id: user_id.to_string(),
55    };
56
57    let client = Client::new();
58    let response = client
59        .post(&url)
60        .header("Authorization", format!("Bearer {}", auth_token))
61        .header("Content-Type", "application/json")
62        .json(&payload)
63        .send()
64        .await;
65
66    match response {
67        Ok(resp) => {
68            if resp.status().is_success() {
69                #[derive(serde::Deserialize)]
70                struct WebhookResponse {
71                    connection_count: usize,
72                }
73
74                match resp.json::<WebhookResponse>().await {
75                    Ok(result) => {
76                        tracing::debug!(
77                            event_type = ?event_type,
78                            connection_count = result.connection_count,
79                            "AGUI event broadcasted"
80                        );
81                        Ok(result.connection_count)
82                    },
83                    Err(e) => {
84                        tracing::error!(
85                            event_type = ?event_type,
86                            error = %e,
87                            "AGUI response parse error"
88                        );
89                        Err(WebhookError::Request(e))
90                    },
91                }
92            } else {
93                let status = resp.status().as_u16();
94                let message = resp
95                    .text()
96                    .await
97                    .unwrap_or_else(|e| format!("<error reading response: {}>", e));
98                tracing::error!(
99                    event_type = ?event_type,
100                    status = status,
101                    message = %message,
102                    "AGUI event failed"
103                );
104                Err(WebhookError::StatusError { status, message })
105            }
106        },
107        Err(e) => {
108            tracing::error!(event_type = ?event_type, error = %e, "AGUI request error");
109            Err(WebhookError::Request(e))
110        },
111    }
112}
113
114pub async fn broadcast_a2a_event(
115    user_id: &str,
116    event: A2AEvent,
117    auth_token: &str,
118) -> Result<usize, WebhookError> {
119    let url = format!("{}/api/v1/webhook/a2a", get_api_url());
120    let event_type = event.event_type();
121
122    tracing::debug!(event_type = ?event_type, url = %url, "Sending A2A event");
123
124    let payload = A2AWebhookPayload {
125        event,
126        user_id: user_id.to_string(),
127    };
128
129    let client = Client::new();
130    let response = client
131        .post(&url)
132        .header("Authorization", format!("Bearer {}", auth_token))
133        .header("Content-Type", "application/json")
134        .json(&payload)
135        .send()
136        .await;
137
138    match response {
139        Ok(resp) => {
140            if resp.status().is_success() {
141                #[derive(serde::Deserialize)]
142                struct WebhookResponse {
143                    connection_count: usize,
144                }
145
146                match resp.json::<WebhookResponse>().await {
147                    Ok(result) => {
148                        tracing::debug!(
149                            event_type = ?event_type,
150                            connection_count = result.connection_count,
151                            "A2A event broadcasted"
152                        );
153                        Ok(result.connection_count)
154                    },
155                    Err(e) => {
156                        tracing::error!(
157                            event_type = ?event_type,
158                            error = %e,
159                            "A2A response parse error"
160                        );
161                        Err(WebhookError::Request(e))
162                    },
163                }
164            } else {
165                let status = resp.status().as_u16();
166                let message = resp
167                    .text()
168                    .await
169                    .unwrap_or_else(|e| format!("<error reading response: {}>", e));
170                tracing::error!(
171                    event_type = ?event_type,
172                    status = status,
173                    message = %message,
174                    "A2A event failed"
175                );
176                Err(WebhookError::StatusError { status, message })
177            }
178        },
179        Err(e) => {
180            tracing::error!(event_type = ?event_type, error = %e, "A2A request error");
181            Err(WebhookError::Request(e))
182        },
183    }
184}
185
186#[derive(Clone, Debug)]
187pub struct WebhookContext {
188    user_id: String,
189    auth_token: String,
190}
191
192impl WebhookContext {
193    pub fn new(user_id: impl Into<String>, auth_token: impl Into<String>) -> Self {
194        Self {
195            user_id: user_id.into(),
196            auth_token: auth_token.into(),
197        }
198    }
199
200    pub fn user_id(&self) -> &str {
201        &self.user_id
202    }
203
204    pub async fn broadcast_agui(&self, event: AgUiEvent) -> Result<usize, WebhookError> {
205        broadcast_agui_event(&self.user_id, event, &self.auth_token).await
206    }
207
208    pub async fn broadcast_a2a(&self, event: A2AEvent) -> Result<usize, WebhookError> {
209        broadcast_a2a_event(&self.user_id, event, &self.auth_token).await
210    }
211}