webhook-dispatcher 0.1.2

In-process webhook delivery engine with retries, DLQ, signatures, rate limits, and pluggable storage.
Documentation
#[cfg(feature = "redis")]
use async_trait::async_trait;
#[cfg(feature = "redis")]
use redis::AsyncCommands;

#[cfg(feature = "redis")]
use crate::error::DeliveryOutcome;
#[cfg(feature = "redis")]
use crate::storage::Storage;
#[cfg(feature = "redis")]
use crate::types::{DlqEntry, DeliveryState, IdempotencyKey, EventId, EndpointId, TenantId};
#[cfg(feature = "redis")]
use crate::worker::Task;

#[cfg(feature = "redis")]
pub struct RedisStorage {
    client: redis::Client,
    prefix: String,
}

#[cfg(feature = "redis")]
impl RedisStorage {
    pub fn new(client: redis::Client, prefix: impl Into<String>) -> Self {
        Self {
            client,
            prefix: prefix.into(),
        }
    }

    fn pending_key(&self) -> String {
        format!("{}:pending", self.prefix)
    }

    fn dlq_key(&self) -> String {
        format!("{}:dlq", self.prefix)
    }

    fn status_key(&self) -> String {
        format!("{}:status", self.prefix)
    }

    fn id_key(key: &IdempotencyKey) -> String {
        let tenant = key.tenant_id.as_ref().map(|t| t.0.as_str()).unwrap_or("");
        format!("{}|{}|{}", key.event_id.0, key.endpoint_id.0, tenant)
    }
}

#[cfg(feature = "redis")]
#[async_trait]
impl Storage for RedisStorage {
    async fn record_enqueue(&self, task: &Task) {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return,
        };
        let _ = conn
            .rpush(self.pending_key(), serde_json::to_string(task).unwrap_or_default())
            .await;
    }

    async fn record_delivery(&self, task: &Task, _outcome: &DeliveryOutcome) {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return,
        };
        let payload = serde_json::to_string(task).unwrap_or_default();
        let _ = conn.lrem(self.pending_key(), 1, payload).await;
    }

    async fn record_dlq(&self, entry: &DlqEntry) {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return,
        };
        let _ = conn
            .rpush(self.dlq_key(), serde_json::to_string(entry).unwrap_or_default())
            .await;
    }

    async fn load_pending(&self) -> Vec<Task> {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return Vec::new(),
        };
        let values: Vec<String> = conn.lrange(self.pending_key(), 0, -1).await.unwrap_or_default();
        values
            .into_iter()
            .filter_map(|v| serde_json::from_str::<Task>(&v).ok())
            .collect()
    }

    async fn record_status(&self, key: &IdempotencyKey, state: &DeliveryState) {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return,
        };
        let payload = serde_json::to_string(state).unwrap_or_default();
        let _ = conn.hset(self.status_key(), Self::id_key(key), payload).await;
    }

    async fn load_status(&self) -> std::collections::HashMap<IdempotencyKey, DeliveryState> {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return std::collections::HashMap::new(),
        };
        let map: std::collections::HashMap<String, String> =
            conn.hgetall(self.status_key()).await.unwrap_or_default();
        map.into_iter()
            .filter_map(|(id, value)| {
                let mut parts = id.split('|');
                let event = parts.next()?.to_string();
                let endpoint = parts.next()?.to_string();
                let tenant = parts.next().unwrap_or("").to_string();
                let tenant_id = if tenant.is_empty() { None } else { Some(TenantId(tenant)) };
                let state = serde_json::from_str::<DeliveryState>(&value).ok()?;
                Some((IdempotencyKey::new(EventId(event), EndpointId(endpoint), tenant_id), state))
            })
            .collect()
    }

    async fn remove_dlq(&self, key: &IdempotencyKey) {
        let mut conn = match self.client.get_tokio_connection().await {
            Ok(c) => c,
            Err(_) => return,
        };
        let payloads: Vec<String> = conn.lrange(self.dlq_key(), 0, -1).await.unwrap_or_default();
        for payload in payloads {
            if let Ok(entry) = serde_json::from_str::<DlqEntry>(&payload) {
                if &entry.key == key {
                    let _ = conn.lrem(self.dlq_key(), 1, payload).await;
                    break;
                }
            }
        }
    }
}