1use crate::config::WebhookConfig;
14use anyhow::Result;
15use chrono::Utc;
16use serde::{Deserialize, Serialize};
17use std::time::Duration;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum WebhookEvent {
25 UploadFailed,
27 AuthFailure,
29 CorruptionDetected,
31 CircuitBreakerOpen,
33}
34
35impl WebhookEvent {
36 pub fn as_str(&self) -> &'static str {
37 match self {
38 WebhookEvent::UploadFailed => "upload_failed",
39 WebhookEvent::AuthFailure => "auth_failure",
40 WebhookEvent::CorruptionDetected => "corruption_detected",
41 WebhookEvent::CircuitBreakerOpen => "circuit_breaker_open",
42 }
43 }
44
45 pub fn parse(s: &str) -> Option<Self> {
46 match s {
47 "upload_failed" => Some(WebhookEvent::UploadFailed),
48 "auth_failure" => Some(WebhookEvent::AuthFailure),
49 "corruption_detected" => Some(WebhookEvent::CorruptionDetected),
50 "circuit_breaker_open" => Some(WebhookEvent::CircuitBreakerOpen),
51 _ => None,
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct WebhookPayload {
59 pub event: String,
61 pub database: String,
63 pub error: String,
65 pub attempts: u32,
67 pub timestamp: String,
69 pub version: String,
71 #[serde(skip_serializing_if = "Option::is_none")]
73 pub context: Option<serde_json::Value>,
74}
75
76impl WebhookPayload {
77 pub fn new(event: WebhookEvent, database: &str, error: &str, attempts: u32) -> Self {
79 Self {
80 event: event.as_str().to_string(),
81 database: database.to_string(),
82 error: error.to_string(),
83 attempts,
84 timestamp: Utc::now().to_rfc3339(),
85 version: env!("CARGO_PKG_VERSION").to_string(),
86 context: None,
87 }
88 }
89
90 pub fn custom(event: &str, database: &str, error: &str, attempts: u32) -> Self {
92 Self {
93 event: event.to_string(),
94 database: database.to_string(),
95 error: error.to_string(),
96 attempts,
97 timestamp: Utc::now().to_rfc3339(),
98 version: env!("CARGO_PKG_VERSION").to_string(),
99 context: None,
100 }
101 }
102
103 pub fn with_context(mut self, context: serde_json::Value) -> Self {
104 self.context = Some(context);
105 self
106 }
107}
108
109pub struct WebhookSender {
111 configs: Vec<WebhookConfig>,
112 client: reqwest::Client,
113}
114
115impl WebhookSender {
116 pub fn new(configs: Vec<WebhookConfig>) -> Self {
118 let client = reqwest::Client::builder()
119 .timeout(Duration::from_secs(10))
120 .build()
121 .expect("failed to build webhook HTTP client");
122
123 Self { configs, client }
124 }
125
126 pub fn is_empty(&self) -> bool {
128 self.configs.is_empty()
129 }
130
131 pub async fn send(&self, payload: WebhookPayload) {
137 let event_name = &payload.event;
138
139 for config in &self.configs {
140 if !config.events.iter().any(|e| e == event_name) {
141 continue;
142 }
143
144 if let Err(e) = self.send_to_webhook(config, &payload).await {
145 tracing::error!(
146 "Failed to send webhook to {}: {}",
147 config.url,
148 e
149 );
150 }
151 }
152 }
153
154 async fn send_to_webhook(
156 &self,
157 config: &WebhookConfig,
158 payload: &WebhookPayload,
159 ) -> Result<()> {
160 let body = serde_json::to_string(payload)?;
161
162 let mut request = self
163 .client
164 .post(&config.url)
165 .header("Content-Type", "application/json")
166 .header("User-Agent", format!("hadb-io/{}", env!("CARGO_PKG_VERSION")));
167
168 if let Some(ref secret) = config.secret {
170 let signature = compute_hmac_signature(secret, &body);
171 request = request.header("X-Hadb-Signature", signature);
172 }
173
174 let response = request.body(body).send().await?;
175
176 if !response.status().is_success() {
177 tracing::error!(
178 "Webhook {} returned status {}: {}",
179 config.url,
180 response.status(),
181 response.text().await.unwrap_or_default()
182 );
183 } else {
184 tracing::debug!("Webhook sent successfully to {}", config.url);
185 }
186
187 Ok(())
188 }
189
190 pub async fn notify_upload_failed(&self, database: &str, error: &str, attempts: u32) {
192 let payload = WebhookPayload::new(WebhookEvent::UploadFailed, database, error, attempts);
193 self.send(payload).await;
194 }
195
196 pub async fn notify_auth_failure(&self, database: &str, error: &str) {
198 let payload = WebhookPayload::new(WebhookEvent::AuthFailure, database, error, 1);
199 self.send(payload).await;
200 }
201
202 pub async fn notify_corruption(&self, database: &str, error: &str) {
204 let payload =
205 WebhookPayload::new(WebhookEvent::CorruptionDetected, database, error, 0);
206 self.send(payload).await;
207 }
208
209 pub async fn notify_circuit_breaker_open(&self, database: &str, consecutive_failures: u32) {
211 let payload = WebhookPayload::new(
212 WebhookEvent::CircuitBreakerOpen,
213 database,
214 &format!(
215 "Circuit breaker opened after {} consecutive failures",
216 consecutive_failures
217 ),
218 consecutive_failures,
219 );
220 self.send(payload).await;
221 }
222}
223
224pub fn compute_hmac_signature(secret: &str, body: &str) -> String {
226 use hmac::{Hmac, Mac};
227 use sha2::Sha256;
228
229 type HmacSha256 = Hmac<Sha256>;
230
231 let mut mac =
232 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
233 mac.update(body.as_bytes());
234 let result = mac.finalize();
235
236 format!("sha256={}", hex::encode(result.into_bytes()))
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn test_webhook_event_serialization() {
245 assert_eq!(WebhookEvent::UploadFailed.as_str(), "upload_failed");
246 assert_eq!(WebhookEvent::AuthFailure.as_str(), "auth_failure");
247 assert_eq!(
248 WebhookEvent::CorruptionDetected.as_str(),
249 "corruption_detected"
250 );
251 assert_eq!(
252 WebhookEvent::CircuitBreakerOpen.as_str(),
253 "circuit_breaker_open"
254 );
255 }
256
257 #[test]
258 fn test_webhook_event_from_str() {
259 assert_eq!(
260 WebhookEvent::parse("upload_failed"),
261 Some(WebhookEvent::UploadFailed)
262 );
263 assert_eq!(
264 WebhookEvent::parse("auth_failure"),
265 Some(WebhookEvent::AuthFailure)
266 );
267 assert_eq!(WebhookEvent::parse("invalid"), None);
268 }
269
270 #[test]
271 fn test_webhook_payload_creation() {
272 let payload =
273 WebhookPayload::new(WebhookEvent::UploadFailed, "mydb", "Connection timeout", 5);
274
275 assert_eq!(payload.event, "upload_failed");
276 assert_eq!(payload.database, "mydb");
277 assert_eq!(payload.error, "Connection timeout");
278 assert_eq!(payload.attempts, 5);
279 assert!(!payload.timestamp.is_empty());
280 }
281
282 #[test]
283 fn test_webhook_payload_custom_event() {
284 let payload = WebhookPayload::custom("sync_failed", "mydb", "Error", 3);
285 assert_eq!(payload.event, "sync_failed");
286 }
287
288 #[test]
289 fn test_webhook_payload_json() {
290 let payload = WebhookPayload::new(WebhookEvent::UploadFailed, "mydb", "Error", 3);
291
292 let json = serde_json::to_string(&payload).unwrap();
293 assert!(json.contains("\"event\":\"upload_failed\""));
294 assert!(json.contains("\"database\":\"mydb\""));
295 assert!(!json.contains("context"));
297 }
298
299 #[test]
300 fn test_webhook_payload_with_context() {
301 let payload = WebhookPayload::new(WebhookEvent::UploadFailed, "mydb", "Error", 3)
302 .with_context(serde_json::json!({"key": "s3://bucket/path"}));
303
304 let json = serde_json::to_string(&payload).unwrap();
305 assert!(json.contains("\"context\""));
306 assert!(json.contains("s3://bucket/path"));
307 }
308
309 #[test]
310 fn test_hmac_signature() {
311 let signature = compute_hmac_signature("secret", "test body");
312 assert!(signature.starts_with("sha256="));
313 assert_eq!(signature.len(), 71); }
315
316 #[test]
317 fn test_hmac_signature_deterministic() {
318 let sig1 = compute_hmac_signature("secret", "test body");
319 let sig2 = compute_hmac_signature("secret", "test body");
320 assert_eq!(sig1, sig2);
321 }
322
323 #[test]
324 fn test_hmac_signature_different_inputs() {
325 let sig1 = compute_hmac_signature("secret", "body1");
326 let sig2 = compute_hmac_signature("secret", "body2");
327 assert_ne!(sig1, sig2);
328 }
329
330 #[test]
331 fn test_webhook_sender_empty() {
332 let sender = WebhookSender::new(vec![]);
333 assert!(sender.is_empty());
334 }
335
336 #[test]
337 fn test_webhook_config_event_filtering() {
338 let config = WebhookConfig {
339 url: "https://example.com/webhook".to_string(),
340 events: vec!["upload_failed".to_string(), "auth_failure".to_string()],
341 secret: None,
342 };
343
344 assert!(config.events.iter().any(|e| e == "upload_failed"));
345 assert!(config.events.iter().any(|e| e == "auth_failure"));
346 assert!(!config.events.iter().any(|e| e == "corruption_detected"));
347 }
348}