Skip to main content

hadb_io/
webhook.rs

1//! Webhook notifications for failure events.
2//!
3//! Generic webhook infrastructure for the hadb ecosystem. Sends HTTP POST
4//! notifications when failures occur:
5//! - upload_failed: Persistent upload failure after retries exhausted
6//! - auth_failure: Authentication/authorization error
7//! - corruption_detected: Data corruption detected via checksum
8//! - circuit_breaker_open: Circuit breaker tripped due to cascading failures
9//!
10//! Engine-specific events (e.g., sync_failed, segment_corrupt) are defined
11//! in each product crate and sent as string event names.
12
13use crate::config::WebhookConfig;
14use anyhow::Result;
15use chrono::Utc;
16use serde::{Deserialize, Serialize};
17use std::time::Duration;
18
19/// Common webhook event types shared across the ecosystem.
20///
21/// Engines can also send custom event names as strings via `WebhookSender::send()`.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum WebhookEvent {
25    /// Upload failed after all retries exhausted.
26    UploadFailed,
27    /// Authentication or authorization error.
28    AuthFailure,
29    /// Data corruption detected.
30    CorruptionDetected,
31    /// Circuit breaker opened due to cascading failures.
32    CircuitBreakerOpen,
33}
34
35impl WebhookEvent {
36    pub fn as_str(&self) -> &'static str {
37        match self {
38            WebhookEvent::UploadFailed => "upload_failed",
39            WebhookEvent::AuthFailure => "auth_failure",
40            WebhookEvent::CorruptionDetected => "corruption_detected",
41            WebhookEvent::CircuitBreakerOpen => "circuit_breaker_open",
42        }
43    }
44
45    pub fn parse(s: &str) -> Option<Self> {
46        match s {
47            "upload_failed" => Some(WebhookEvent::UploadFailed),
48            "auth_failure" => Some(WebhookEvent::AuthFailure),
49            "corruption_detected" => Some(WebhookEvent::CorruptionDetected),
50            "circuit_breaker_open" => Some(WebhookEvent::CircuitBreakerOpen),
51            _ => None,
52        }
53    }
54}
55
56/// Webhook payload sent to configured URLs.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct WebhookPayload {
59    /// Event type (can be a standard WebhookEvent or a custom engine-specific string).
60    pub event: String,
61    /// Database or resource name.
62    pub database: String,
63    /// Error message.
64    pub error: String,
65    /// Number of retry attempts made.
66    pub attempts: u32,
67    /// ISO 8601 timestamp.
68    pub timestamp: String,
69    /// Sender version (crate using hadb-io).
70    pub version: String,
71    /// Optional additional context.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub context: Option<serde_json::Value>,
74}
75
76impl WebhookPayload {
77    /// Create a new payload from a standard event.
78    pub fn new(event: WebhookEvent, database: &str, error: &str, attempts: u32) -> Self {
79        Self {
80            event: event.as_str().to_string(),
81            database: database.to_string(),
82            error: error.to_string(),
83            attempts,
84            timestamp: Utc::now().to_rfc3339(),
85            version: env!("CARGO_PKG_VERSION").to_string(),
86            context: None,
87        }
88    }
89
90    /// Create a payload with a custom event name (for engine-specific events).
91    pub fn custom(event: &str, database: &str, error: &str, attempts: u32) -> Self {
92        Self {
93            event: event.to_string(),
94            database: database.to_string(),
95            error: error.to_string(),
96            attempts,
97            timestamp: Utc::now().to_rfc3339(),
98            version: env!("CARGO_PKG_VERSION").to_string(),
99            context: None,
100        }
101    }
102
103    pub fn with_context(mut self, context: serde_json::Value) -> Self {
104        self.context = Some(context);
105        self
106    }
107}
108
109/// Webhook sender for dispatching failure notifications.
110pub struct WebhookSender {
111    configs: Vec<WebhookConfig>,
112    client: reqwest::Client,
113}
114
115impl WebhookSender {
116    /// Create a new webhook sender.
117    pub fn new(configs: Vec<WebhookConfig>) -> Self {
118        let client = reqwest::Client::builder()
119            .timeout(Duration::from_secs(10))
120            .build()
121            .expect("failed to build webhook HTTP client");
122
123        Self { configs, client }
124    }
125
126    /// Check if any webhooks are configured.
127    pub fn is_empty(&self) -> bool {
128        self.configs.is_empty()
129    }
130
131    /// Send webhook notification for an event.
132    ///
133    /// Sends to all configured webhooks that subscribe to this event type.
134    /// Failures are logged at error level but don't propagate — webhooks are best-effort
135    /// but must not be silently swallowed.
136    pub async fn send(&self, payload: WebhookPayload) {
137        let event_name = &payload.event;
138
139        for config in &self.configs {
140            if !config.events.iter().any(|e| e == event_name) {
141                continue;
142            }
143
144            if let Err(e) = self.send_to_webhook(config, &payload).await {
145                tracing::error!(
146                    "Failed to send webhook to {}: {}",
147                    config.url,
148                    e
149                );
150            }
151        }
152    }
153
154    /// Send payload to a single webhook.
155    async fn send_to_webhook(
156        &self,
157        config: &WebhookConfig,
158        payload: &WebhookPayload,
159    ) -> Result<()> {
160        let body = serde_json::to_string(payload)?;
161
162        let mut request = self
163            .client
164            .post(&config.url)
165            .header("Content-Type", "application/json")
166            .header("User-Agent", format!("hadb-io/{}", env!("CARGO_PKG_VERSION")));
167
168        // Add HMAC signature if secret is configured
169        if let Some(ref secret) = config.secret {
170            let signature = compute_hmac_signature(secret, &body);
171            request = request.header("X-Hadb-Signature", signature);
172        }
173
174        let response = request.body(body).send().await?;
175
176        if !response.status().is_success() {
177            tracing::error!(
178                "Webhook {} returned status {}: {}",
179                config.url,
180                response.status(),
181                response.text().await.unwrap_or_default()
182            );
183        } else {
184            tracing::debug!("Webhook sent successfully to {}", config.url);
185        }
186
187        Ok(())
188    }
189
190    /// Send upload_failed notification.
191    pub async fn notify_upload_failed(&self, database: &str, error: &str, attempts: u32) {
192        let payload = WebhookPayload::new(WebhookEvent::UploadFailed, database, error, attempts);
193        self.send(payload).await;
194    }
195
196    /// Send auth_failure notification.
197    pub async fn notify_auth_failure(&self, database: &str, error: &str) {
198        let payload = WebhookPayload::new(WebhookEvent::AuthFailure, database, error, 1);
199        self.send(payload).await;
200    }
201
202    /// Send corruption_detected notification.
203    pub async fn notify_corruption(&self, database: &str, error: &str) {
204        let payload =
205            WebhookPayload::new(WebhookEvent::CorruptionDetected, database, error, 0);
206        self.send(payload).await;
207    }
208
209    /// Send circuit_breaker_open notification.
210    pub async fn notify_circuit_breaker_open(&self, database: &str, consecutive_failures: u32) {
211        let payload = WebhookPayload::new(
212            WebhookEvent::CircuitBreakerOpen,
213            database,
214            &format!(
215                "Circuit breaker opened after {} consecutive failures",
216                consecutive_failures
217            ),
218            consecutive_failures,
219        );
220        self.send(payload).await;
221    }
222}
223
224/// Compute HMAC-SHA256 signature for webhook payload.
225pub fn compute_hmac_signature(secret: &str, body: &str) -> String {
226    use hmac::{Hmac, Mac};
227    use sha2::Sha256;
228
229    type HmacSha256 = Hmac<Sha256>;
230
231    let mut mac =
232        HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
233    mac.update(body.as_bytes());
234    let result = mac.finalize();
235
236    format!("sha256={}", hex::encode(result.into_bytes()))
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test_webhook_event_serialization() {
245        assert_eq!(WebhookEvent::UploadFailed.as_str(), "upload_failed");
246        assert_eq!(WebhookEvent::AuthFailure.as_str(), "auth_failure");
247        assert_eq!(
248            WebhookEvent::CorruptionDetected.as_str(),
249            "corruption_detected"
250        );
251        assert_eq!(
252            WebhookEvent::CircuitBreakerOpen.as_str(),
253            "circuit_breaker_open"
254        );
255    }
256
257    #[test]
258    fn test_webhook_event_from_str() {
259        assert_eq!(
260            WebhookEvent::parse("upload_failed"),
261            Some(WebhookEvent::UploadFailed)
262        );
263        assert_eq!(
264            WebhookEvent::parse("auth_failure"),
265            Some(WebhookEvent::AuthFailure)
266        );
267        assert_eq!(WebhookEvent::parse("invalid"), None);
268    }
269
270    #[test]
271    fn test_webhook_payload_creation() {
272        let payload =
273            WebhookPayload::new(WebhookEvent::UploadFailed, "mydb", "Connection timeout", 5);
274
275        assert_eq!(payload.event, "upload_failed");
276        assert_eq!(payload.database, "mydb");
277        assert_eq!(payload.error, "Connection timeout");
278        assert_eq!(payload.attempts, 5);
279        assert!(!payload.timestamp.is_empty());
280    }
281
282    #[test]
283    fn test_webhook_payload_custom_event() {
284        let payload = WebhookPayload::custom("sync_failed", "mydb", "Error", 3);
285        assert_eq!(payload.event, "sync_failed");
286    }
287
288    #[test]
289    fn test_webhook_payload_json() {
290        let payload = WebhookPayload::new(WebhookEvent::UploadFailed, "mydb", "Error", 3);
291
292        let json = serde_json::to_string(&payload).unwrap();
293        assert!(json.contains("\"event\":\"upload_failed\""));
294        assert!(json.contains("\"database\":\"mydb\""));
295        // context should be absent when None
296        assert!(!json.contains("context"));
297    }
298
299    #[test]
300    fn test_webhook_payload_with_context() {
301        let payload = WebhookPayload::new(WebhookEvent::UploadFailed, "mydb", "Error", 3)
302            .with_context(serde_json::json!({"key": "s3://bucket/path"}));
303
304        let json = serde_json::to_string(&payload).unwrap();
305        assert!(json.contains("\"context\""));
306        assert!(json.contains("s3://bucket/path"));
307    }
308
309    #[test]
310    fn test_hmac_signature() {
311        let signature = compute_hmac_signature("secret", "test body");
312        assert!(signature.starts_with("sha256="));
313        assert_eq!(signature.len(), 71); // "sha256=" (7) + 64 hex chars
314    }
315
316    #[test]
317    fn test_hmac_signature_deterministic() {
318        let sig1 = compute_hmac_signature("secret", "test body");
319        let sig2 = compute_hmac_signature("secret", "test body");
320        assert_eq!(sig1, sig2);
321    }
322
323    #[test]
324    fn test_hmac_signature_different_inputs() {
325        let sig1 = compute_hmac_signature("secret", "body1");
326        let sig2 = compute_hmac_signature("secret", "body2");
327        assert_ne!(sig1, sig2);
328    }
329
330    #[test]
331    fn test_webhook_sender_empty() {
332        let sender = WebhookSender::new(vec![]);
333        assert!(sender.is_empty());
334    }
335
336    #[test]
337    fn test_webhook_config_event_filtering() {
338        let config = WebhookConfig {
339            url: "https://example.com/webhook".to_string(),
340            events: vec!["upload_failed".to_string(), "auth_failure".to_string()],
341            secret: None,
342        };
343
344        assert!(config.events.iter().any(|e| e == "upload_failed"));
345        assert!(config.events.iter().any(|e| e == "auth_failure"));
346        assert!(!config.events.iter().any(|e| e == "corruption_detected"));
347    }
348}