Skip to main content

vtcode_core/a2a/
webhook.rs

1//! Webhook delivery for A2A push notifications
2//!
3//! Handles HTTP POST delivery of streaming events to configured webhook URLs
4//! with retry logic, authentication, and error handling.
5
6use super::rpc::{SendStreamingMessageResponse, StreamingEvent, TaskPushNotificationConfig};
7use reqwest::Client;
8use std::time::Duration;
9use tracing::{debug, warn};
10
11/// Webhook notifier for delivering A2A events
12#[derive(Debug, Clone)]
13pub struct WebhookNotifier {
14    client: Client,
15    max_retries: u32,
16    retry_delay_ms: u64,
17}
18
19impl Default for WebhookNotifier {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl WebhookNotifier {
26    fn build_http_client() -> Client {
27        match Client::builder().timeout(Duration::from_secs(10)).build() {
28            Ok(client) => client,
29            Err(error) => {
30                warn!(error = %error, "Failed to configure webhook HTTP client; using default client");
31                Client::new()
32            }
33        }
34    }
35
36    /// Create a new webhook notifier with default settings
37    pub fn new() -> Self {
38        Self {
39            client: Self::build_http_client(),
40            max_retries: 3,
41            retry_delay_ms: 1000,
42        }
43    }
44
45    /// Create a webhook notifier with custom settings
46    pub fn with_settings(max_retries: u32, retry_delay_ms: u64) -> Self {
47        Self {
48            client: Self::build_http_client(),
49            max_retries,
50            retry_delay_ms,
51        }
52    }
53
54    /// Deliver a streaming event to a webhook URL
55    pub async fn send_event(
56        &self,
57        config: &TaskPushNotificationConfig,
58        event: StreamingEvent,
59    ) -> Result<(), WebhookError> {
60        let response = SendStreamingMessageResponse { event };
61        let json = serde_json::to_string(&response)
62            .map_err(|e| WebhookError::Serialization(e.to_string()))?;
63
64        self.send_with_retry(&config.url, &json, config.authentication.as_deref())
65            .await
66    }
67
68    /// Send webhook with retry logic
69    async fn send_with_retry(
70        &self,
71        url: &str,
72        json: &str,
73        auth: Option<&str>,
74    ) -> Result<(), WebhookError> {
75        let mut last_error = None;
76
77        for attempt in 0..=self.max_retries {
78            if attempt > 0 {
79                let delay = self.retry_delay_ms * 2u64.pow(attempt - 1); // Exponential backoff
80                debug!(
81                    "Retrying webhook delivery after {}ms (attempt {})",
82                    delay, attempt
83                );
84                tokio::time::sleep(Duration::from_millis(delay)).await;
85            }
86
87            match self.send_request(url, json, auth).await {
88                Ok(()) => {
89                    debug!("Webhook delivered successfully to {}", url);
90                    return Ok(());
91                }
92                Err(e) => {
93                    warn!("Webhook delivery attempt {} failed: {}", attempt + 1, e);
94                    last_error = Some(e);
95                }
96            }
97        }
98
99        Err(last_error.unwrap_or(WebhookError::Unknown))
100    }
101
102    /// Send a single HTTP request
103    async fn send_request(
104        &self,
105        url: &str,
106        json: &str,
107        auth: Option<&str>,
108    ) -> Result<(), WebhookError> {
109        let mut request = self
110            .client
111            .post(url)
112            .header("Content-Type", "application/json")
113            .header("User-Agent", "VT Code-A2A/1.0");
114
115        if let Some(auth_header) = auth {
116            request = request.header("Authorization", auth_header);
117        }
118
119        let response = request
120            .body(json.to_string())
121            .send()
122            .await
123            .map_err(|e| WebhookError::Network(e.to_string()))?;
124
125        if response.status().is_success() {
126            Ok(())
127        } else {
128            Err(WebhookError::HttpError(response.status().as_u16()))
129        }
130    }
131}
132
133/// Webhook delivery errors
134#[derive(Debug, Clone, thiserror::Error)]
135pub enum WebhookError {
136    /// Network error
137    #[error("Network error: {0}")]
138    Network(String),
139    /// HTTP error status code
140    #[error("HTTP error: {0}")]
141    HttpError(u16),
142    /// JSON serialization error
143    #[error("Serialization error: {0}")]
144    Serialization(String),
145    /// Unknown error
146    #[error("Unknown error")]
147    Unknown,
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::a2a::types::{TaskState, TaskStatus};
154
155    #[test]
156    fn test_webhook_notifier_creation() {
157        let notifier = WebhookNotifier::new();
158        assert_eq!(notifier.max_retries, 3);
159        assert_eq!(notifier.retry_delay_ms, 1000);
160    }
161
162    #[test]
163    fn test_webhook_notifier_with_settings() {
164        let notifier = WebhookNotifier::with_settings(5, 2000);
165        assert_eq!(notifier.max_retries, 5);
166        assert_eq!(notifier.retry_delay_ms, 2000);
167    }
168
169    #[tokio::test]
170    async fn test_webhook_error_display() {
171        let err = WebhookError::Network("Connection refused".to_string());
172        assert!(err.to_string().contains("Network error"));
173
174        let err = WebhookError::HttpError(404);
175        assert!(err.to_string().contains("404"));
176    }
177
178    #[tokio::test]
179    async fn test_send_event_serialization() {
180        let notifier = WebhookNotifier::new();
181        let config = TaskPushNotificationConfig {
182            task_id: "task-1".to_string(),
183            url: "https://example.com/webhook".to_string(),
184            authentication: None,
185        };
186
187        let event = StreamingEvent::TaskStatus {
188            task_id: "task-1".to_string(),
189            context_id: None,
190            status: TaskStatus::new(TaskState::Completed),
191            kind: "status-update".to_string(),
192            r#final: true,
193        };
194
195        // This will fail with network error since the URL doesn't exist,
196        // but we're testing that serialization works
197        let result = notifier.send_event(&config, event).await;
198        assert!(result.is_err());
199
200        if let Err(WebhookError::Serialization(_)) = result {
201            panic!("Unexpected serialization error");
202        }
203    }
204}