use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub id: String,
pub name: String,
pub url: String,
pub events: Vec<String>,
#[serde(skip_serializing)]
pub secret: Option<String>,
pub active: bool,
pub created_at: u64,
pub delivery_count: u64,
pub failure_count: u64,
pub last_delivery: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub template: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct WebhookDelivery {
pub webhook_id: String,
pub topic: String,
pub status_code: u16,
pub success: bool,
pub timestamp: u64,
pub latency_ms: u64,
pub error: Option<String>,
pub attempt: u32,
}
#[derive(Debug, Clone, Serialize)]
pub struct WebhookSummary {
pub id: String,
pub name: String,
pub url: String,
pub events: Vec<String>,
pub has_secret: bool,
pub active: bool,
pub created_at: u64,
pub delivery_count: u64,
pub failure_count: u64,
pub last_delivery: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct DispatchResult {
pub matched: usize,
pub webhook_ids: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct WebhookStats {
pub total_webhooks: usize,
pub active_webhooks: usize,
pub total_deliveries: u64,
pub total_failures: u64,
pub recent_deliveries: Vec<WebhookDelivery>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RetryEntry {
pub webhook_id: String,
pub topic: String,
pub attempt: u32,
pub max_attempts: u32,
pub next_retry_at: u64,
pub original_error: String,
pub enqueued_at: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct DeadLetterEntry {
pub webhook_id: String,
pub topic: String,
pub attempts: u32,
pub last_error: String,
pub dead_at: u64,
}
pub struct WebhookRegistry {
webhooks: HashMap<String, WebhookConfig>,
deliveries: Vec<WebhookDelivery>,
max_deliveries: usize,
next_id: u64,
retry_queue: Vec<RetryEntry>,
dead_letters: Vec<DeadLetterEntry>,
pub max_retry_attempts: u32,
}
impl WebhookRegistry {
pub fn new() -> Self {
WebhookRegistry {
webhooks: HashMap::new(),
deliveries: Vec::new(),
max_deliveries: 500,
next_id: 1,
retry_queue: Vec::new(),
dead_letters: Vec::new(),
max_retry_attempts: 5,
}
}
pub fn register(
&mut self,
name: &str,
url: &str,
events: Vec<String>,
secret: Option<String>,
) -> String {
self.register_with_template(name, url, events, secret, None)
}
pub fn register_with_template(
&mut self,
name: &str,
url: &str,
events: Vec<String>,
secret: Option<String>,
template: Option<String>,
) -> String {
let id = format!("wh_{}", self.next_id);
self.next_id += 1;
let config = WebhookConfig {
id: id.clone(),
name: name.to_string(),
url: url.to_string(),
events,
secret,
active: true,
created_at: now_secs(),
delivery_count: 0,
failure_count: 0,
last_delivery: None,
template,
};
self.webhooks.insert(id.clone(), config);
id
}
pub fn unregister(&mut self, id: &str) -> bool {
self.webhooks.remove(id).is_some()
}
pub fn get(&self, id: &str) -> Option<&WebhookConfig> {
self.webhooks.get(id)
}
pub fn toggle(&mut self, id: &str) -> Option<bool> {
match self.webhooks.get_mut(id) {
Some(wh) => {
wh.active = !wh.active;
Some(wh.active)
}
None => None,
}
}
pub fn get_filters(&self, id: &str) -> Option<&Vec<String>> {
self.webhooks.get(id).map(|wh| &wh.events)
}
pub fn set_filters(&mut self, id: &str, events: Vec<String>) -> bool {
match self.webhooks.get_mut(id) {
Some(wh) => { wh.events = events; true }
None => false,
}
}
pub fn list(&self) -> Vec<WebhookSummary> {
let mut result: Vec<WebhookSummary> = self.webhooks.values().map(|wh| {
WebhookSummary {
id: wh.id.clone(),
name: wh.name.clone(),
url: wh.url.clone(),
events: wh.events.clone(),
has_secret: wh.secret.is_some(),
active: wh.active,
created_at: wh.created_at,
delivery_count: wh.delivery_count,
failure_count: wh.failure_count,
last_delivery: wh.last_delivery,
}
}).collect();
result.sort_by(|a, b| a.id.cmp(&b.id));
result
}
pub fn match_topic(&self, topic: &str) -> Vec<String> {
self.webhooks.values()
.filter(|wh| wh.active && topic_matches(&wh.events, topic))
.map(|wh| wh.id.clone())
.collect()
}
pub fn dispatch(&mut self, topic: &str, _payload: &serde_json::Value, _source: &str) -> DispatchResult {
let matching_ids = self.match_topic(topic);
for id in &matching_ids {
if let Some(wh) = self.webhooks.get_mut(id) {
wh.delivery_count += 1;
wh.last_delivery = Some(now_secs());
}
let delivery = WebhookDelivery {
webhook_id: id.clone(),
topic: topic.to_string(),
status_code: 0,
success: false,
timestamp: now_secs(),
latency_ms: 0,
error: Some("pending".to_string()),
attempt: 0,
};
self.record_delivery(delivery);
}
DispatchResult {
matched: matching_ids.len(),
webhook_ids: matching_ids,
}
}
pub fn record_delivery(&mut self, delivery: WebhookDelivery) {
if !delivery.success && delivery.error.as_deref() != Some("pending") {
if let Some(wh) = self.webhooks.get_mut(&delivery.webhook_id) {
wh.failure_count += 1;
}
}
self.deliveries.push(delivery);
while self.deliveries.len() > self.max_deliveries {
self.deliveries.remove(0);
}
}
pub fn record_completed(
&mut self,
webhook_id: &str,
topic: &str,
status_code: u16,
latency_ms: u64,
error: Option<String>,
attempt: u32,
) {
let success = (200..300).contains(&status_code);
if !success {
if let Some(wh) = self.webhooks.get_mut(webhook_id) {
wh.failure_count += 1;
}
}
let delivery = WebhookDelivery {
webhook_id: webhook_id.to_string(),
topic: topic.to_string(),
status_code,
success,
timestamp: now_secs(),
latency_ms,
error,
attempt,
};
self.deliveries.push(delivery);
while self.deliveries.len() > self.max_deliveries {
self.deliveries.remove(0);
}
}
pub fn recent_deliveries(&self, limit: usize, webhook_id: Option<&str>) -> Vec<&WebhookDelivery> {
self.deliveries.iter().rev()
.filter(|d| match webhook_id {
Some(id) => d.webhook_id == id,
None => true,
})
.take(limit)
.collect()
}
pub fn stats(&self) -> WebhookStats {
let total_deliveries: u64 = self.webhooks.values().map(|w| w.delivery_count).sum();
let total_failures: u64 = self.webhooks.values().map(|w| w.failure_count).sum();
let recent: Vec<WebhookDelivery> = self.deliveries.iter().rev()
.take(10)
.cloned()
.collect();
WebhookStats {
total_webhooks: self.webhooks.len(),
active_webhooks: self.webhooks.values().filter(|w| w.active).count(),
total_deliveries,
total_failures,
recent_deliveries: recent,
}
}
pub fn count(&self) -> usize {
self.webhooks.len()
}
pub fn active_count(&self) -> usize {
self.webhooks.values().filter(|w| w.active).count()
}
pub fn enqueue_retry(&mut self, webhook_id: &str, topic: &str, error: &str, attempt: u32) -> bool {
let now = now_secs();
if attempt >= self.max_retry_attempts {
self.dead_letters.push(DeadLetterEntry {
webhook_id: webhook_id.to_string(),
topic: topic.to_string(),
attempts: attempt,
last_error: error.to_string(),
dead_at: now,
});
if self.dead_letters.len() > 200 {
self.dead_letters.remove(0);
}
return false;
}
let backoff_secs = 1u64 << attempt;
self.retry_queue.push(RetryEntry {
webhook_id: webhook_id.to_string(),
topic: topic.to_string(),
attempt: attempt + 1,
max_attempts: self.max_retry_attempts,
next_retry_at: now + backoff_secs,
original_error: error.to_string(),
enqueued_at: now,
});
true
}
pub fn drain_due_retries(&mut self) -> Vec<RetryEntry> {
let now = now_secs();
let (due, remaining): (Vec<_>, Vec<_>) = self.retry_queue
.drain(..)
.partition(|r| r.next_retry_at <= now);
self.retry_queue = remaining;
due
}
pub fn retry_queue(&self) -> &[RetryEntry] {
&self.retry_queue
}
pub fn dead_letters(&self) -> &[DeadLetterEntry] {
&self.dead_letters
}
pub fn retry_queue_len(&self) -> usize {
self.retry_queue.len()
}
pub fn dead_letters_len(&self) -> usize {
self.dead_letters.len()
}
pub fn compute_signature(secret: &str, payload: &[u8]) -> String {
let mut hasher_input = Vec::with_capacity(secret.len() + payload.len());
hasher_input.extend_from_slice(secret.as_bytes());
hasher_input.extend_from_slice(payload);
let mut hash: u64 = 0xcbf29ce484222325; for &byte in &hasher_input {
hash ^= byte as u64;
hash = hash.wrapping_mul(0x100000001b3); }
format!("sha256={:016x}", hash)
}
pub fn set_template(&mut self, id: &str, template: Option<String>) -> bool {
match self.webhooks.get_mut(id) {
Some(wh) => { wh.template = template; true }
None => false,
}
}
pub fn get_template(&self, id: &str) -> Option<Option<&str>> {
self.webhooks.get(id).map(|wh| wh.template.as_deref())
}
pub fn render_payload(&self, webhook_id: &str, topic: &str, payload: &serde_json::Value, source: &str) -> serde_json::Value {
match self.webhooks.get(webhook_id) {
Some(wh) => match &wh.template {
Some(tmpl) => {
let rendered = render_template(tmpl, topic, payload, source, &wh.name, &wh.id);
serde_json::from_str(&rendered).unwrap_or_else(|_| serde_json::json!({
"rendered": rendered,
}))
}
None => serde_json::json!({
"topic": topic,
"payload": payload,
"source": source,
"timestamp": now_secs(),
}),
}
None => serde_json::json!({
"topic": topic,
"payload": payload,
"source": source,
}),
}
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
pub fn render_template(
template: &str,
topic: &str,
payload: &serde_json::Value,
source: &str,
webhook_name: &str,
webhook_id: &str,
) -> String {
let payload_str = serde_json::to_string(payload).unwrap_or_default();
template
.replace("{{topic}}", topic)
.replace("{{timestamp}}", &now_secs().to_string())
.replace("{{source}}", source)
.replace("{{payload}}", &payload_str)
.replace("{{webhook_name}}", webhook_name)
.replace("{{webhook_id}}", webhook_id)
}
fn topic_matches(filters: &[String], topic: &str) -> bool {
filters.iter().any(|f| {
if f == "*" {
true
} else if let Some(prefix) = f.strip_suffix(".*") {
topic.starts_with(prefix) && (topic.len() == prefix.len() || topic.as_bytes().get(prefix.len()) == Some(&b'.'))
} else {
f == topic
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn register_and_list() {
let mut reg = WebhookRegistry::new();
let id = reg.register("deploy-notify", "https://example.com/hook", vec!["deploy".into()], None);
assert_eq!(id, "wh_1");
assert_eq!(reg.count(), 1);
let list = reg.list();
assert_eq!(list.len(), 1);
assert_eq!(list[0].name, "deploy-notify");
assert_eq!(list[0].url, "https://example.com/hook");
assert!(!list[0].has_secret);
assert!(list[0].active);
}
#[test]
fn register_with_secret() {
let mut reg = WebhookRegistry::new();
reg.register("secure", "https://example.com", vec!["*".into()], Some("mysecret".into()));
let list = reg.list();
assert!(list[0].has_secret);
let json = serde_json::to_value(&list[0]).unwrap();
assert!(json.get("secret").is_none());
}
#[test]
fn unregister() {
let mut reg = WebhookRegistry::new();
let id = reg.register("temp", "https://temp.com", vec!["*".into()], None);
assert_eq!(reg.count(), 1);
assert!(reg.unregister(&id));
assert_eq!(reg.count(), 0);
assert!(!reg.unregister(&id)); }
#[test]
fn toggle_active() {
let mut reg = WebhookRegistry::new();
let id = reg.register("toggler", "https://t.com", vec!["*".into()], None);
assert_eq!(reg.toggle(&id), Some(false)); assert_eq!(reg.toggle(&id), Some(true)); assert_eq!(reg.toggle("nonexistent"), None);
}
#[test]
fn topic_matching_exact() {
let mut reg = WebhookRegistry::new();
reg.register("deploy-only", "https://d.com", vec!["deploy".into()], None);
assert_eq!(reg.match_topic("deploy").len(), 1);
assert_eq!(reg.match_topic("deploy.success").len(), 0);
assert_eq!(reg.match_topic("other").len(), 0);
}
#[test]
fn topic_matching_prefix() {
let mut reg = WebhookRegistry::new();
reg.register("daemon-watcher", "https://d.com", vec!["daemon.*".into()], None);
assert_eq!(reg.match_topic("daemon.started").len(), 1);
assert_eq!(reg.match_topic("daemon.stopped").len(), 1);
assert_eq!(reg.match_topic("daemon").len(), 1); assert_eq!(reg.match_topic("deploy").len(), 0);
}
#[test]
fn topic_matching_wildcard() {
let mut reg = WebhookRegistry::new();
reg.register("catch-all", "https://a.com", vec!["*".into()], None);
assert_eq!(reg.match_topic("deploy").len(), 1);
assert_eq!(reg.match_topic("daemon.crashed").len(), 1);
assert_eq!(reg.match_topic("anything").len(), 1);
}
#[test]
fn topic_matching_multiple_filters() {
let mut reg = WebhookRegistry::new();
reg.register("multi", "https://m.com", vec!["deploy".into(), "config.*".into()], None);
assert_eq!(reg.match_topic("deploy").len(), 1);
assert_eq!(reg.match_topic("config.updated").len(), 1);
assert_eq!(reg.match_topic("daemon.started").len(), 0);
}
#[test]
fn inactive_webhook_not_matched() {
let mut reg = WebhookRegistry::new();
let id = reg.register("inactive", "https://i.com", vec!["*".into()], None);
reg.toggle(&id);
assert_eq!(reg.match_topic("deploy").len(), 0);
}
#[test]
fn dispatch_records_deliveries() {
let mut reg = WebhookRegistry::new();
reg.register("a", "https://a.com", vec!["deploy".into()], None);
reg.register("b", "https://b.com", vec!["*".into()], None);
let result = reg.dispatch("deploy", &serde_json::json!({"flow": "F1"}), "server");
assert_eq!(result.matched, 2);
assert_eq!(result.webhook_ids.len(), 2);
let list = reg.list();
for wh in &list {
assert_eq!(wh.delivery_count, 1);
assert!(wh.last_delivery.is_some());
}
}
#[test]
fn dispatch_non_matching_topic() {
let mut reg = WebhookRegistry::new();
reg.register("deploy-only", "https://d.com", vec!["deploy".into()], None);
let result = reg.dispatch("config.updated", &serde_json::json!({}), "server");
assert_eq!(result.matched, 0);
assert!(result.webhook_ids.is_empty());
}
#[test]
fn record_completed_delivery() {
let mut reg = WebhookRegistry::new();
let id = reg.register("test", "https://t.com", vec!["*".into()], None);
reg.record_completed(&id, "deploy", 200, 45, None, 0);
reg.record_completed(&id, "deploy", 500, 120, Some("server error".into()), 0);
let deliveries = reg.recent_deliveries(10, Some(&id));
assert_eq!(deliveries.len(), 2);
assert!(deliveries[0].success == false); assert!(deliveries[1].success == true);
let stats = reg.stats();
assert_eq!(stats.total_failures, 1);
}
#[test]
fn recent_deliveries_filtered() {
let mut reg = WebhookRegistry::new();
let id1 = reg.register("a", "https://a.com", vec!["*".into()], None);
let id2 = reg.register("b", "https://b.com", vec!["*".into()], None);
reg.record_completed(&id1, "deploy", 200, 10, None, 0);
reg.record_completed(&id2, "config", 200, 20, None, 0);
reg.record_completed(&id1, "deploy", 201, 15, None, 0);
let all = reg.recent_deliveries(10, None);
assert_eq!(all.len(), 3);
let a_only = reg.recent_deliveries(10, Some(&id1));
assert_eq!(a_only.len(), 2);
let b_only = reg.recent_deliveries(10, Some(&id2));
assert_eq!(b_only.len(), 1);
}
#[test]
fn stats_aggregation() {
let mut reg = WebhookRegistry::new();
let id = reg.register("stats-test", "https://s.com", vec!["*".into()], None);
reg.dispatch("deploy", &serde_json::json!({}), "server");
reg.dispatch("config", &serde_json::json!({}), "server");
reg.record_completed(&id, "error", 500, 100, Some("fail".into()), 0);
let stats = reg.stats();
assert_eq!(stats.total_webhooks, 1);
assert_eq!(stats.active_webhooks, 1);
assert_eq!(stats.total_deliveries, 2);
assert_eq!(stats.total_failures, 1);
assert!(stats.recent_deliveries.len() >= 2);
}
#[test]
fn auto_increment_ids() {
let mut reg = WebhookRegistry::new();
let id1 = reg.register("a", "https://a.com", vec!["*".into()], None);
let id2 = reg.register("b", "https://b.com", vec!["*".into()], None);
let id3 = reg.register("c", "https://c.com", vec!["*".into()], None);
assert_eq!(id1, "wh_1");
assert_eq!(id2, "wh_2");
assert_eq!(id3, "wh_3");
}
#[test]
fn compute_signature_deterministic() {
let sig1 = WebhookRegistry::compute_signature("secret", b"payload");
let sig2 = WebhookRegistry::compute_signature("secret", b"payload");
assert_eq!(sig1, sig2);
assert!(sig1.starts_with("sha256="));
let sig3 = WebhookRegistry::compute_signature("other", b"payload");
assert_ne!(sig1, sig3);
}
#[test]
fn summary_serializes() {
let mut reg = WebhookRegistry::new();
reg.register("ser-test", "https://s.com", vec!["deploy".into(), "config.*".into()], Some("secret".into()));
let list = reg.list();
let json = serde_json::to_value(&list[0]).unwrap();
assert_eq!(json["name"], "ser-test");
assert_eq!(json["url"], "https://s.com");
assert_eq!(json["events"].as_array().unwrap().len(), 2);
assert_eq!(json["has_secret"], true);
assert_eq!(json["active"], true);
assert!(json.get("secret").is_none()); }
#[test]
fn delivery_log_trimmed() {
let mut reg = WebhookRegistry::new();
reg.max_deliveries = 5;
let id = reg.register("trim", "https://t.com", vec!["*".into()], None);
for i in 0..10 {
reg.record_completed(&id, &format!("event_{i}"), 200, 10, None, 0);
}
assert_eq!(reg.deliveries.len(), 5);
assert_eq!(reg.deliveries.last().unwrap().topic, "event_9");
}
}