use anyhow::{Context, Result};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum WebhookEvent {
MessageReceived,
MessageSent,
SessionCreated,
SessionEnded,
QueryExecuted,
ErrorOccurred,
Custom,
}
impl WebhookEvent {
pub fn as_str(&self) -> &str {
match self {
WebhookEvent::MessageReceived => "message.received",
WebhookEvent::MessageSent => "message.sent",
WebhookEvent::SessionCreated => "session.created",
WebhookEvent::SessionEnded => "session.ended",
WebhookEvent::QueryExecuted => "query.executed",
WebhookEvent::ErrorOccurred => "error.occurred",
WebhookEvent::Custom => "custom",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookPayload {
pub event: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub session_id: String,
pub data: serde_json::Value,
pub webhook_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub id: String,
pub url: String,
pub events: Vec<WebhookEvent>,
pub headers: HashMap<String, String>,
pub secret: Option<String>,
pub timeout: Duration,
pub retry_policy: RetryPolicy,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_retries: usize,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(60),
backoff_multiplier: 2.0,
}
}
}
impl Default for WebhookConfig {
fn default() -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
url: String::new(),
events: vec![],
headers: HashMap::new(),
secret: None,
timeout: Duration::from_secs(30),
retry_policy: RetryPolicy::default(),
enabled: true,
}
}
}
#[derive(Debug, Clone)]
pub struct WebhookDeliveryResult {
pub webhook_id: String,
pub success: bool,
pub status_code: Option<u16>,
pub response_body: Option<String>,
pub error: Option<String>,
pub attempts: usize,
pub duration: Duration,
}
pub struct WebhookManager {
webhooks: Arc<RwLock<HashMap<String, WebhookConfig>>>,
client: Client,
delivery_history: Arc<RwLock<Vec<WebhookDeliveryResult>>>,
}
impl WebhookManager {
pub fn new() -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.context("Failed to create HTTP client")?;
info!("Initialized webhook manager");
Ok(Self {
webhooks: Arc::new(RwLock::new(HashMap::new())),
client,
delivery_history: Arc::new(RwLock::new(Vec::new())),
})
}
pub async fn register(&self, config: WebhookConfig) -> Result<()> {
info!("Registering webhook: {} for URL: {}", config.id, config.url);
let mut webhooks = self.webhooks.write().await;
webhooks.insert(config.id.clone(), config);
Ok(())
}
pub async fn unregister(&self, webhook_id: &str) -> Result<()> {
info!("Unregistering webhook: {}", webhook_id);
let mut webhooks = self.webhooks.write().await;
webhooks.remove(webhook_id);
Ok(())
}
pub async fn trigger(
&self,
event: WebhookEvent,
session_id: String,
data: serde_json::Value,
) -> Result<()> {
debug!(
"Triggering webhook event: {:?} for session: {}",
event, session_id
);
let webhooks = self.webhooks.read().await;
for webhook in webhooks.values() {
if !webhook.enabled {
continue;
}
if !webhook.events.contains(&event) {
continue;
}
let payload = WebhookPayload {
event: event.as_str().to_string(),
timestamp: chrono::Utc::now(),
session_id: session_id.clone(),
data: data.clone(),
webhook_id: webhook.id.clone(),
};
let webhook_clone = webhook.clone();
let client = self.client.clone();
let delivery_history = self.delivery_history.clone();
tokio::spawn(async move {
let result = Self::deliver_webhook(&client, &webhook_clone, &payload).await;
let mut history = delivery_history.write().await;
history.push(result);
if history.len() > 1000 {
history.remove(0);
}
});
}
Ok(())
}
async fn deliver_webhook(
client: &Client,
webhook: &WebhookConfig,
payload: &WebhookPayload,
) -> WebhookDeliveryResult {
let start = std::time::Instant::now();
let mut attempts = 0;
let mut backoff = webhook.retry_policy.initial_backoff;
loop {
attempts += 1;
debug!(
"Delivering webhook to {} (attempt {})",
webhook.url, attempts
);
let mut request = client
.post(&webhook.url)
.timeout(webhook.timeout)
.json(&payload);
for (key, value) in &webhook.headers {
request = request.header(key, value);
}
if let Some(ref secret) = webhook.secret {
let signature = Self::generate_signature(payload, secret);
request = request.header("X-Webhook-Signature", signature);
}
match request.send().await {
Ok(response) => {
let status = response.status();
let body = response.text().await.ok();
if status.is_success() {
info!("Webhook delivered successfully to {}", webhook.url);
return WebhookDeliveryResult {
webhook_id: webhook.id.clone(),
success: true,
status_code: Some(status.as_u16()),
response_body: body,
error: None,
attempts,
duration: start.elapsed(),
};
} else {
warn!(
"Webhook delivery failed with status {}: {}",
status, webhook.url
);
if attempts >= webhook.retry_policy.max_retries {
return WebhookDeliveryResult {
webhook_id: webhook.id.clone(),
success: false,
status_code: Some(status.as_u16()),
response_body: body,
error: Some(format!("Failed with status {}", status)),
attempts,
duration: start.elapsed(),
};
}
}
}
Err(e) => {
error!("Webhook delivery error to {}: {}", webhook.url, e);
if attempts >= webhook.retry_policy.max_retries {
return WebhookDeliveryResult {
webhook_id: webhook.id.clone(),
success: false,
status_code: None,
response_body: None,
error: Some(e.to_string()),
attempts,
duration: start.elapsed(),
};
}
}
}
tokio::time::sleep(backoff).await;
backoff = Duration::from_secs_f64(
(backoff.as_secs_f64() * webhook.retry_policy.backoff_multiplier)
.min(webhook.retry_policy.max_backoff.as_secs_f64()),
);
}
}
fn generate_signature(payload: &WebhookPayload, secret: &str) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let payload_json = serde_json::to_string(payload).unwrap_or_default();
let mut mac =
HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
mac.update(payload_json.as_bytes());
let result = mac.finalize();
let hex_string = result
.into_bytes()
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>();
format!("sha256={}", hex_string)
}
pub async fn get_delivery_history(&self, limit: usize) -> Vec<WebhookDeliveryResult> {
let history = self.delivery_history.read().await;
history.iter().rev().take(limit).cloned().collect()
}
pub async fn get_statistics(&self) -> WebhookStatistics {
let history = self.delivery_history.read().await;
let total_deliveries = history.len();
let successful_deliveries = history.iter().filter(|d| d.success).count();
let failed_deliveries = total_deliveries - successful_deliveries;
let average_duration = if !history.is_empty() {
history.iter().map(|d| d.duration.as_millis()).sum::<u128>() / history.len() as u128
} else {
0
};
WebhookStatistics {
total_deliveries,
successful_deliveries,
failed_deliveries,
average_duration_ms: average_duration,
}
}
pub async fn list_webhooks(&self) -> Vec<WebhookConfig> {
let webhooks = self.webhooks.read().await;
webhooks.values().cloned().collect()
}
}
impl Default for WebhookManager {
fn default() -> Self {
Self::new().expect("Failed to create webhook manager")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookStatistics {
pub total_deliveries: usize,
pub successful_deliveries: usize,
pub failed_deliveries: usize,
pub average_duration_ms: u128,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_webhook_registration() {
let manager = WebhookManager::new().expect("should succeed");
let config = WebhookConfig {
id: "test-webhook".to_string(),
url: "https://example.com/webhook".to_string(),
events: vec![WebhookEvent::MessageReceived],
..Default::default()
};
manager.register(config).await.expect("should succeed");
let webhooks = manager.list_webhooks().await;
assert_eq!(webhooks.len(), 1);
assert_eq!(webhooks[0].id, "test-webhook");
}
#[tokio::test]
async fn test_webhook_event_names() {
assert_eq!(WebhookEvent::MessageReceived.as_str(), "message.received");
assert_eq!(WebhookEvent::SessionCreated.as_str(), "session.created");
}
}