use crate::config::sinks::SinkType;
use crate::events::sinks::Sink;
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[async_trait]
pub trait HttpSender: Send + Sync + std::fmt::Debug {
async fn send(
&self,
method: &str,
url: &str,
headers: &HashMap<String, String>,
body: Value,
) -> Result<u16>;
}
#[derive(Debug, Clone)]
pub struct WebhookConfig {
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub max_retries: u32,
pub backoff: Vec<Duration>,
pub timeout: Duration,
}
impl Default for WebhookConfig {
fn default() -> Self {
Self {
url: String::new(),
method: "POST".to_string(),
headers: HashMap::new(),
max_retries: 3,
backoff: vec![
Duration::from_millis(100),
Duration::from_millis(500),
Duration::from_secs(2),
],
timeout: Duration::from_secs(10),
}
}
}
#[derive(Debug)]
pub struct WebhookSink {
config: WebhookConfig,
sender: Arc<dyn HttpSender>,
}
impl WebhookSink {
pub fn new(sender: Arc<dyn HttpSender>, config: WebhookConfig) -> Self {
Self { config, sender }
}
async fn send_with_retry(&self, payload: Value) -> Result<()> {
let mut last_error = String::new();
for attempt in 0..=self.config.max_retries {
if attempt > 0 {
let backoff_idx = (attempt as usize - 1).min(self.config.backoff.len() - 1);
let delay = self.config.backoff[backoff_idx];
tracing::debug!(
attempt = attempt,
delay_ms = delay.as_millis(),
"webhook: retrying after backoff"
);
tokio::time::sleep(delay).await;
}
match self
.sender
.send(
&self.config.method,
&self.config.url,
&self.config.headers,
payload.clone(),
)
.await
{
Ok(status) if (200..300).contains(&status) => {
tracing::debug!(
url = %self.config.url,
status = status,
"webhook: delivered successfully"
);
return Ok(());
}
Ok(status) if (400..500).contains(&status) => {
return Err(anyhow!(
"webhook: client error {} from {}",
status,
self.config.url
));
}
Ok(status) => {
last_error = format!("server error {} from {}", status, self.config.url);
tracing::warn!(
url = %self.config.url,
status = status,
attempt = attempt + 1,
"webhook: server error, will retry"
);
}
Err(e) => {
last_error = format!("network error: {}", e);
tracing::warn!(
url = %self.config.url,
error = %e,
attempt = attempt + 1,
"webhook: network error, will retry"
);
}
}
}
Err(anyhow!(
"webhook: failed after {} retries: {}",
self.config.max_retries,
last_error
))
}
}
#[async_trait]
impl Sink for WebhookSink {
async fn deliver(
&self,
payload: Value,
_recipient_id: Option<&str>,
_context_vars: &HashMap<String, Value>,
) -> Result<()> {
if self.config.url.is_empty() {
return Err(anyhow!("webhook: URL not configured"));
}
self.send_with_retry(payload).await
}
fn name(&self) -> &str {
"webhook"
}
fn sink_type(&self) -> SinkType {
SinkType::Webhook
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
struct RecordedRequest {
method: String,
url: String,
headers: HashMap<String, String>,
body: Value,
}
#[derive(Debug)]
struct MockHttpSender {
responses: Mutex<Vec<Result<u16>>>,
requests: Mutex<Vec<RecordedRequest>>,
call_count: AtomicUsize,
}
impl MockHttpSender {
fn with_responses(responses: Vec<Result<u16>>) -> Self {
Self {
responses: Mutex::new(responses),
requests: Mutex::new(Vec::new()),
call_count: AtomicUsize::new(0),
}
}
fn always_ok() -> Self {
Self::with_responses(vec![])
}
}
#[async_trait]
impl HttpSender for MockHttpSender {
async fn send(
&self,
method: &str,
url: &str,
headers: &HashMap<String, String>,
body: Value,
) -> Result<u16> {
let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
self.requests.lock().await.push(RecordedRequest {
method: method.to_string(),
url: url.to_string(),
headers: headers.clone(),
body,
});
let mut responses = self.responses.lock().await;
if idx < responses.len() {
std::mem::replace(&mut responses[idx], Ok(0))
} else {
Ok(200) }
}
}
fn fast_config(url: &str) -> WebhookConfig {
WebhookConfig {
url: url.to_string(),
method: "POST".to_string(),
headers: HashMap::new(),
max_retries: 3,
backoff: vec![
Duration::from_millis(1),
Duration::from_millis(1),
Duration::from_millis(1),
],
timeout: Duration::from_secs(5),
}
}
#[tokio::test]
async fn test_webhook_success() {
let sender = Arc::new(MockHttpSender::always_ok());
let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com/hook"));
let payload = json!({"event": "user.created", "user_id": "123"});
sink.deliver(payload.clone(), None, &HashMap::new())
.await
.unwrap();
let requests = sender.requests.lock().await;
assert_eq!(requests.len(), 1);
assert_eq!(requests[0].method, "POST");
assert_eq!(requests[0].url, "https://example.com/hook");
assert_eq!(requests[0].body, payload);
}
#[tokio::test]
async fn test_webhook_custom_headers() {
let sender = Arc::new(MockHttpSender::always_ok());
let mut config = fast_config("https://example.com/hook");
config
.headers
.insert("Authorization".to_string(), "Bearer token123".to_string());
config.method = "PUT".to_string();
let sink = WebhookSink::new(sender.clone(), config);
sink.deliver(json!({}), None, &HashMap::new())
.await
.unwrap();
let requests = sender.requests.lock().await;
assert_eq!(requests[0].method, "PUT");
assert_eq!(
requests[0].headers.get("Authorization").unwrap(),
"Bearer token123"
);
}
#[tokio::test]
async fn test_webhook_retry_on_server_error() {
let sender = Arc::new(MockHttpSender::with_responses(vec![
Ok(500), Ok(200), ]));
let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
sink.deliver(json!({}), None, &HashMap::new())
.await
.unwrap();
assert_eq!(sender.call_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_webhook_no_retry_on_client_error() {
let sender = Arc::new(MockHttpSender::with_responses(vec![
Ok(400), ]));
let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
let result = sink.deliver(json!({}), None, &HashMap::new()).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("client error 400"));
assert_eq!(sender.call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_webhook_retry_on_network_error() {
let sender = Arc::new(MockHttpSender::with_responses(vec![
Err(anyhow!("connection refused")),
Ok(200),
]));
let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
sink.deliver(json!({}), None, &HashMap::new())
.await
.unwrap();
assert_eq!(sender.call_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_webhook_max_retries_exceeded() {
let sender = Arc::new(MockHttpSender::with_responses(vec![
Ok(503),
Ok(503),
Ok(503),
Ok(503),
]));
let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
let result = sink.deliver(json!({}), None, &HashMap::new()).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("after 3 retries"));
assert_eq!(sender.call_count.load(Ordering::SeqCst), 4); }
#[tokio::test]
async fn test_webhook_empty_url_error() {
let sender = Arc::new(MockHttpSender::always_ok());
let sink = WebhookSink::new(sender, fast_config(""));
let result = sink.deliver(json!({}), None, &HashMap::new()).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("URL not configured")
);
}
#[test]
fn test_webhook_sink_name_and_type() {
let sender = Arc::new(MockHttpSender::always_ok());
let sink = WebhookSink::new(sender, fast_config("https://example.com"));
assert_eq!(sink.name(), "webhook");
assert_eq!(sink.sink_type(), SinkType::Webhook);
}
}