Skip to main content

hyperi_rustlib/dlq/
redis_dlq.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/redis_dlq.rs
3// Purpose:   Redis Streams DLQ backend variant
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Redis Streams backend variant for the DLQ enum.
10//!
11//! Writes failed messages to a Redis Stream via `XADD`. Supports
12//! optional `MAXLEN ~` trimming to bound stream size. The connection
13//! is a `MultiplexedConnection` -- async-native, no `spawn_blocking`
14//! needed.
15//!
16//! Single-batch sends are issued via Redis pipelining (one round-trip
17//! per batch) when the backend serves more than one entry per call.
18
19use serde::{Deserialize, Serialize};
20
21use super::entry::DlqEntry;
22use super::error::DlqError;
23
24/// Configuration for the Redis DLQ backend.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(default)]
27pub struct RedisDlqConfig {
28    /// Enable the Redis backend.
29    pub enabled: bool,
30
31    /// Redis connection URL (e.g. `redis://localhost:6379`).
32    pub url: String,
33
34    /// Stream key to write DLQ entries to.
35    pub stream_key: String,
36
37    /// Optional maximum stream length (approximate trimming via `MAXLEN ~`).
38    pub max_len: Option<usize>,
39}
40
41impl Default for RedisDlqConfig {
42    fn default() -> Self {
43        Self {
44            enabled: false,
45            url: "redis://127.0.0.1:6379".into(),
46            stream_key: "dlq".into(),
47            max_len: None,
48        }
49    }
50}
51
52/// Redis backend -- internal variant carried by [`super::DlqBackend::Redis`].
53pub struct RedisDlqInner {
54    conn: redis::aio::MultiplexedConnection,
55    stream_key: String,
56    max_len: Option<usize>,
57}
58
59impl std::fmt::Debug for RedisDlqInner {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("RedisDlqInner")
62            .field("stream_key", &self.stream_key)
63            .field("max_len", &self.max_len)
64            .finish_non_exhaustive()
65    }
66}
67
68impl RedisDlqInner {
69    /// Build the Redis backend. Opens a multiplexed async connection.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the Redis connection cannot be established.
74    pub async fn new(config: &RedisDlqConfig) -> Result<Self, DlqError> {
75        let client = redis::Client::open(config.url.as_str())
76            .map_err(|e| DlqError::BackendError(format!("Redis DLQ open: {e}")))?;
77        let conn = client
78            .get_multiplexed_async_connection()
79            .await
80            .map_err(|e| DlqError::BackendError(format!("Redis DLQ connect: {e}")))?;
81        Ok(Self {
82            conn,
83            stream_key: config.stream_key.clone(),
84            max_len: config.max_len,
85        })
86    }
87
88    /// Send a batch via pipelined `XADD`s (single round-trip).
89    pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
90        if batch.is_empty() {
91            return Ok(());
92        }
93
94        let mut pipe = redis::pipe();
95        for entry in batch {
96            let json =
97                serde_json::to_string(entry).map_err(|e| DlqError::Serialization(e.to_string()))?;
98            let mut cmd = redis::cmd("XADD");
99            cmd.arg(&self.stream_key);
100            if let Some(max_len) = self.max_len {
101                cmd.arg("MAXLEN").arg("~").arg(max_len);
102            }
103            cmd.arg("*").arg("data").arg(&json);
104            pipe.add_command(cmd);
105        }
106
107        pipe.query_async::<()>(&mut self.conn)
108            .await
109            .map_err(|e| DlqError::BackendError(format!("Redis XADD batch: {e}")))?;
110
111        #[cfg(feature = "metrics")]
112        metrics::counter!("dfe_dlq_sent_total", "backend" => "redis").increment(batch.len() as u64);
113
114        Ok(())
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn config_defaults() {
124        let config = RedisDlqConfig::default();
125        assert!(!config.enabled);
126        assert_eq!(config.url, "redis://127.0.0.1:6379");
127        assert_eq!(config.stream_key, "dlq");
128        assert!(config.max_len.is_none());
129    }
130
131    #[test]
132    fn config_deserialise() {
133        let json = r#"{"enabled":true,"url":"redis://redis:6379","stream_key":"failed_events","max_len":10000}"#;
134        let config: RedisDlqConfig = serde_json::from_str(json).unwrap();
135        assert!(config.enabled);
136        assert_eq!(config.url, "redis://redis:6379");
137        assert_eq!(config.stream_key, "failed_events");
138        assert_eq!(config.max_len, Some(10000));
139    }
140}