Skip to main content

systemprompt_agent/services/a2a_server/streaming/
webhook_client.rs

1use reqwest::Client;
2use serde::Serialize;
3use systemprompt_models::{A2AEvent, AgUiEvent, Config};
4
5#[derive(Debug, thiserror::Error)]
6pub enum WebhookError {
7    #[error("HTTP request failed: {0}")]
8    Request(#[from] reqwest::Error),
9    #[error("Webhook returned error status {status}: {message}")]
10    StatusError { status: u16, message: String },
11}
12
13fn get_api_url() -> String {
14    Config::get()
15        .map(|c| c.api_internal_url.clone())
16        .unwrap_or_else(|_| "http://localhost:3000".to_string())
17}
18
19pub async fn broadcast_agui_event(
20    user_id: &str,
21    event: AgUiEvent,
22    auth_token: &str,
23) -> Result<usize, WebhookError> {
24    let url = format!("{}/api/v1/webhook/agui", get_api_url());
25    let event_type = event.event_type();
26
27    if auth_token.is_empty() {
28        tracing::warn!(
29            event_type = ?event_type,
30            user_id = %user_id,
31            "Attempting to broadcast AGUI event with empty auth_token - webhook will fail"
32        );
33    }
34
35    tracing::debug!(event_type = ?event_type, url = %url, has_token = !auth_token.is_empty(), "Sending AGUI event");
36
37    #[derive(Serialize)]
38    struct AgUiWebhookPayload {
39        #[serde(flatten)]
40        event: AgUiEvent,
41        user_id: String,
42    }
43
44    let payload = AgUiWebhookPayload {
45        event,
46        user_id: user_id.to_string(),
47    };
48
49    let client = Client::new();
50    let response = client
51        .post(&url)
52        .header("Authorization", format!("Bearer {}", auth_token))
53        .header("Content-Type", "application/json")
54        .json(&payload)
55        .send()
56        .await;
57
58    match response {
59        Ok(resp) => {
60            if resp.status().is_success() {
61                #[derive(serde::Deserialize)]
62                struct WebhookResponse {
63                    connection_count: usize,
64                }
65
66                match resp.json::<WebhookResponse>().await {
67                    Ok(result) => {
68                        tracing::debug!(
69                            event_type = ?event_type,
70                            connection_count = result.connection_count,
71                            "AGUI event broadcasted"
72                        );
73                        Ok(result.connection_count)
74                    },
75                    Err(e) => {
76                        tracing::error!(
77                            event_type = ?event_type,
78                            error = %e,
79                            "AGUI response parse error"
80                        );
81                        Err(WebhookError::Request(e))
82                    },
83                }
84            } else {
85                let status = resp.status().as_u16();
86                let message = resp
87                    .text()
88                    .await
89                    .unwrap_or_else(|e| format!("<error reading response: {}>", e));
90                tracing::error!(
91                    event_type = ?event_type,
92                    status = status,
93                    message = %message,
94                    "AGUI event failed"
95                );
96                Err(WebhookError::StatusError { status, message })
97            }
98        },
99        Err(e) => {
100            tracing::error!(event_type = ?event_type, error = %e, "AGUI request error");
101            Err(WebhookError::Request(e))
102        },
103    }
104}
105
106pub async fn broadcast_a2a_event(
107    user_id: &str,
108    event: A2AEvent,
109    auth_token: &str,
110) -> Result<usize, WebhookError> {
111    let url = format!("{}/api/v1/webhook/a2a", get_api_url());
112    let event_type = event.event_type();
113
114    tracing::debug!(event_type = ?event_type, url = %url, "Sending A2A event");
115
116    #[derive(Serialize)]
117    struct A2AWebhookPayload {
118        #[serde(flatten)]
119        event: A2AEvent,
120        user_id: String,
121    }
122
123    let payload = A2AWebhookPayload {
124        event,
125        user_id: user_id.to_string(),
126    };
127
128    let client = Client::new();
129    let response = client
130        .post(&url)
131        .header("Authorization", format!("Bearer {}", auth_token))
132        .header("Content-Type", "application/json")
133        .json(&payload)
134        .send()
135        .await;
136
137    match response {
138        Ok(resp) => {
139            if resp.status().is_success() {
140                #[derive(serde::Deserialize)]
141                struct WebhookResponse {
142                    connection_count: usize,
143                }
144
145                match resp.json::<WebhookResponse>().await {
146                    Ok(result) => {
147                        tracing::debug!(
148                            event_type = ?event_type,
149                            connection_count = result.connection_count,
150                            "A2A event broadcasted"
151                        );
152                        Ok(result.connection_count)
153                    },
154                    Err(e) => {
155                        tracing::error!(
156                            event_type = ?event_type,
157                            error = %e,
158                            "A2A response parse error"
159                        );
160                        Err(WebhookError::Request(e))
161                    },
162                }
163            } else {
164                let status = resp.status().as_u16();
165                let message = resp
166                    .text()
167                    .await
168                    .unwrap_or_else(|e| format!("<error reading response: {}>", e));
169                tracing::error!(
170                    event_type = ?event_type,
171                    status = status,
172                    message = %message,
173                    "A2A event failed"
174                );
175                Err(WebhookError::StatusError { status, message })
176            }
177        },
178        Err(e) => {
179            tracing::error!(event_type = ?event_type, error = %e, "A2A request error");
180            Err(WebhookError::Request(e))
181        },
182    }
183}
184
185#[derive(Clone, Debug)]
186pub struct WebhookContext {
187    user_id: String,
188    auth_token: String,
189}
190
191impl WebhookContext {
192    pub fn new(user_id: impl Into<String>, auth_token: impl Into<String>) -> Self {
193        Self {
194            user_id: user_id.into(),
195            auth_token: auth_token.into(),
196        }
197    }
198
199    pub fn user_id(&self) -> &str {
200        &self.user_id
201    }
202
203    pub async fn broadcast_agui(&self, event: AgUiEvent) -> Result<usize, WebhookError> {
204        broadcast_agui_event(&self.user_id, event, &self.auth_token).await
205    }
206
207    pub async fn broadcast_a2a(&self, event: A2AEvent) -> Result<usize, WebhookError> {
208        broadcast_a2a_event(&self.user_id, event, &self.auth_token).await
209    }
210}