use crate::{ZoeyError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WebhookEventType {
MessageReceived,
MessageSent,
AgentStarted,
AgentStopped,
Error,
ActionExecuted,
MemoryCreated,
Custom,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub id: String,
pub url: String,
pub events: Vec<WebhookEventType>,
#[serde(skip_serializing)]
pub secret: Option<String>,
pub headers: HashMap<String, String>,
pub retry_policy: RetryPolicy,
pub enabled: bool,
pub description: Option<String>,
}
impl WebhookConfig {
pub fn new(id: impl Into<String>, url: impl Into<String>) -> Self {
Self {
id: id.into(),
url: url.into(),
events: vec![],
secret: None,
headers: HashMap::new(),
retry_policy: RetryPolicy::default(),
enabled: true,
description: None,
}
}
pub fn with_events(mut self, events: Vec<WebhookEventType>) -> Self {
self.events = events;
self
}
pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
self.secret = Some(secret.into());
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_retries: u32,
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
initial_delay_ms: 1000,
max_delay_ms: 60000,
multiplier: 2.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookPayload {
pub event: WebhookEventType,
pub event_id: String,
pub timestamp: String,
pub agent_id: Option<String>,
pub data: serde_json::Value,
}
impl WebhookPayload {
pub fn new(event: WebhookEventType, data: serde_json::Value) -> Self {
Self {
event,
event_id: uuid::Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
agent_id: None,
data,
}
}
pub fn with_agent_id(mut self, agent_id: impl Into<String>) -> Self {
self.agent_id = Some(agent_id.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DeliveryStatus {
Pending,
InProgress,
Delivered,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookDelivery {
pub id: String,
pub webhook_id: String,
pub payload: WebhookPayload,
pub status: DeliveryStatus,
pub attempts: u32,
pub last_attempt: Option<String>,
pub last_error: Option<String>,
pub response_code: Option<u16>,
pub created_at: String,
}
pub struct WebhookManager {
webhooks: Arc<RwLock<HashMap<String, WebhookConfig>>>,
delivery_tx: mpsc::Sender<WebhookDelivery>,
history: Arc<RwLock<Vec<WebhookDelivery>>>,
client: reqwest::Client,
max_history: usize,
}
impl WebhookManager {
pub fn new(buffer_size: usize) -> (Self, mpsc::Receiver<WebhookDelivery>) {
let (tx, rx) = mpsc::channel(buffer_size);
let manager = Self {
webhooks: Arc::new(RwLock::new(HashMap::new())),
delivery_tx: tx,
history: Arc::new(RwLock::new(Vec::new())),
client: reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default(),
max_history: 1000,
};
(manager, rx)
}
pub fn start_worker(self: Arc<Self>, mut rx: mpsc::Receiver<WebhookDelivery>) {
tokio::spawn(async move {
while let Some(mut delivery) = rx.recv().await {
self.process_delivery(&mut delivery).await;
}
});
}
pub fn register(&self, config: WebhookConfig) {
info!("Registering webhook: {} -> {}", config.id, config.url);
self.webhooks
.write()
.unwrap()
.insert(config.id.clone(), config);
}
pub fn unregister(&self, webhook_id: &str) {
info!("Unregistering webhook: {}", webhook_id);
self.webhooks.write().unwrap().remove(webhook_id);
}
pub fn list_webhooks(&self) -> Vec<WebhookConfig> {
self.webhooks.read().unwrap().values().cloned().collect()
}
pub async fn trigger(&self, payload: WebhookPayload) -> Result<Vec<String>> {
let webhooks = self.webhooks.read().unwrap();
let mut delivery_ids = Vec::new();
for webhook in webhooks.values() {
if !webhook.enabled {
continue;
}
if webhook.events.is_empty() || webhook.events.contains(&payload.event) {
let delivery = WebhookDelivery {
id: uuid::Uuid::new_v4().to_string(),
webhook_id: webhook.id.clone(),
payload: payload.clone(),
status: DeliveryStatus::Pending,
attempts: 0,
last_attempt: None,
last_error: None,
response_code: None,
created_at: chrono::Utc::now().to_rfc3339(),
};
delivery_ids.push(delivery.id.clone());
if let Err(e) = self.delivery_tx.send(delivery).await {
error!("Failed to queue webhook delivery: {}", e);
}
}
}
Ok(delivery_ids)
}
async fn process_delivery(&self, delivery: &mut WebhookDelivery) {
let webhook = {
let webhooks = self.webhooks.read().unwrap();
match webhooks.get(&delivery.webhook_id) {
Some(w) => w.clone(),
None => {
warn!(
"Webhook {} not found, skipping delivery",
delivery.webhook_id
);
return;
}
}
};
delivery.status = DeliveryStatus::InProgress;
let retry_policy = &webhook.retry_policy;
let mut delay = Duration::from_millis(retry_policy.initial_delay_ms);
for attempt in 0..=retry_policy.max_retries {
delivery.attempts = attempt + 1;
delivery.last_attempt = Some(chrono::Utc::now().to_rfc3339());
debug!(
"Attempting webhook delivery {} (attempt {})",
delivery.id, delivery.attempts
);
match self.send_webhook(&webhook, &delivery.payload).await {
Ok(status_code) => {
delivery.status = DeliveryStatus::Delivered;
delivery.response_code = Some(status_code);
delivery.last_error = None;
info!(
"Webhook delivered successfully: {} -> {} (status: {})",
delivery.id, webhook.url, status_code
);
break;
}
Err(e) => {
delivery.last_error = Some(e.to_string());
if attempt < retry_policy.max_retries {
warn!(
"Webhook delivery failed (attempt {}): {}. Retrying in {:?}",
delivery.attempts, e, delay
);
tokio::time::sleep(delay).await;
delay = Duration::from_millis(
((delay.as_millis() as f64) * retry_policy.multiplier)
.min(retry_policy.max_delay_ms as f64)
as u64,
);
} else {
delivery.status = DeliveryStatus::Failed;
error!(
"Webhook delivery failed after {} attempts: {}",
delivery.attempts, e
);
}
}
}
}
self.record_history(delivery.clone());
}
async fn send_webhook(&self, webhook: &WebhookConfig, payload: &WebhookPayload) -> Result<u16> {
let body = serde_json::to_string(payload)
.map_err(|e| ZoeyError::other(format!("Failed to serialize payload: {}", e)))?;
let mut request = self
.client
.post(&webhook.url)
.header("Content-Type", "application/json")
.header("User-Agent", "LauraAI-Webhook/1.0");
for (key, value) in &webhook.headers {
request = request.header(key, value);
}
if let Some(secret) = &webhook.secret {
let signature = self.compute_signature(secret, &body);
request = request.header("X-Webhook-Signature", signature);
}
let response = request
.body(body)
.send()
.await
.map_err(|e| ZoeyError::other(format!("HTTP request failed: {}", e)))?;
let status = response.status().as_u16();
if status >= 200 && status < 300 {
Ok(status)
} else {
Err(ZoeyError::other(format!(
"Webhook returned status {}",
status
)))
}
}
fn compute_signature(&self, secret: &str, body: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(secret.as_bytes());
hasher.update(body.as_bytes());
let result = hasher.finalize();
format!("sha256={}", hex::encode(result))
}
fn record_history(&self, delivery: WebhookDelivery) {
let mut history = self.history.write().unwrap();
if history.len() >= self.max_history {
history.remove(0);
}
history.push(delivery);
}
pub fn get_history(&self, limit: usize) -> Vec<WebhookDelivery> {
let history = self.history.read().unwrap();
history.iter().rev().take(limit).cloned().collect()
}
pub fn get_delivery(&self, id: &str) -> Option<WebhookDelivery> {
self.history
.read()
.unwrap()
.iter()
.find(|d| d.id == id)
.cloned()
}
}
pub struct WebhookEvents;
impl WebhookEvents {
pub fn message_received(
message_id: &str,
content: &str,
sender: &str,
room_id: &str,
) -> WebhookPayload {
WebhookPayload::new(
WebhookEventType::MessageReceived,
serde_json::json!({
"message_id": message_id,
"content": content,
"sender": sender,
"room_id": room_id,
}),
)
}
pub fn message_sent(message_id: &str, content: &str, room_id: &str) -> WebhookPayload {
WebhookPayload::new(
WebhookEventType::MessageSent,
serde_json::json!({
"message_id": message_id,
"content": content,
"room_id": room_id,
}),
)
}
pub fn agent_started(agent_id: &str, agent_name: &str) -> WebhookPayload {
WebhookPayload::new(
WebhookEventType::AgentStarted,
serde_json::json!({
"agent_name": agent_name,
}),
)
.with_agent_id(agent_id)
}
pub fn agent_stopped(agent_id: &str, reason: &str) -> WebhookPayload {
WebhookPayload::new(
WebhookEventType::AgentStopped,
serde_json::json!({
"reason": reason,
}),
)
.with_agent_id(agent_id)
}
pub fn error(
error_type: &str,
message: &str,
details: Option<serde_json::Value>,
) -> WebhookPayload {
WebhookPayload::new(
WebhookEventType::Error,
serde_json::json!({
"error_type": error_type,
"message": message,
"details": details,
}),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_webhook_config_builder() {
let config = WebhookConfig::new("test", "https://example.com/webhook")
.with_events(vec![WebhookEventType::MessageReceived])
.with_secret("my_secret")
.with_header("Authorization", "Bearer token");
assert_eq!(config.id, "test");
assert_eq!(config.url, "https://example.com/webhook");
assert!(config.events.contains(&WebhookEventType::MessageReceived));
assert_eq!(config.secret, Some("my_secret".to_string()));
}
#[test]
fn test_webhook_payload_creation() {
let payload = WebhookPayload::new(
WebhookEventType::MessageReceived,
serde_json::json!({"test": "data"}),
)
.with_agent_id("agent-123");
assert_eq!(payload.event, WebhookEventType::MessageReceived);
assert_eq!(payload.agent_id, Some("agent-123".to_string()));
}
#[tokio::test]
async fn test_webhook_manager_registration() {
let (manager, _rx) = WebhookManager::new(10);
let config = WebhookConfig::new("test", "https://example.com/webhook");
manager.register(config);
let webhooks = manager.list_webhooks();
assert_eq!(webhooks.len(), 1);
assert_eq!(webhooks[0].id, "test");
}
}