hyperi_rustlib/dlq/
redis_dlq.rs1use serde::{Deserialize, Serialize};
20
21use super::entry::DlqEntry;
22use super::error::DlqError;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(default)]
27pub struct RedisDlqConfig {
28 pub enabled: bool,
30
31 pub url: String,
33
34 pub stream_key: String,
36
37 pub max_len: Option<usize>,
39}
40
41impl Default for RedisDlqConfig {
42 fn default() -> Self {
43 Self {
44 enabled: false,
45 url: "redis://127.0.0.1:6379".into(),
46 stream_key: "dlq".into(),
47 max_len: None,
48 }
49 }
50}
51
52pub struct RedisDlqInner {
54 conn: redis::aio::MultiplexedConnection,
55 stream_key: String,
56 max_len: Option<usize>,
57}
58
59impl std::fmt::Debug for RedisDlqInner {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("RedisDlqInner")
62 .field("stream_key", &self.stream_key)
63 .field("max_len", &self.max_len)
64 .finish_non_exhaustive()
65 }
66}
67
68impl RedisDlqInner {
69 pub async fn new(config: &RedisDlqConfig) -> Result<Self, DlqError> {
75 let client = redis::Client::open(config.url.as_str())
76 .map_err(|e| DlqError::BackendError(format!("Redis DLQ open: {e}")))?;
77 let conn = client
78 .get_multiplexed_async_connection()
79 .await
80 .map_err(|e| DlqError::BackendError(format!("Redis DLQ connect: {e}")))?;
81 Ok(Self {
82 conn,
83 stream_key: config.stream_key.clone(),
84 max_len: config.max_len,
85 })
86 }
87
88 pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
90 if batch.is_empty() {
91 return Ok(());
92 }
93
94 let mut pipe = redis::pipe();
95 for entry in batch {
96 let json =
97 serde_json::to_string(entry).map_err(|e| DlqError::Serialization(e.to_string()))?;
98 let mut cmd = redis::cmd("XADD");
99 cmd.arg(&self.stream_key);
100 if let Some(max_len) = self.max_len {
101 cmd.arg("MAXLEN").arg("~").arg(max_len);
102 }
103 cmd.arg("*").arg("data").arg(&json);
104 pipe.add_command(cmd);
105 }
106
107 pipe.query_async::<()>(&mut self.conn)
108 .await
109 .map_err(|e| DlqError::BackendError(format!("Redis XADD batch: {e}")))?;
110
111 #[cfg(feature = "metrics")]
112 metrics::counter!("dfe_dlq_sent_total", "backend" => "redis").increment(batch.len() as u64);
113
114 Ok(())
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn config_defaults() {
124 let config = RedisDlqConfig::default();
125 assert!(!config.enabled);
126 assert_eq!(config.url, "redis://127.0.0.1:6379");
127 assert_eq!(config.stream_key, "dlq");
128 assert!(config.max_len.is_none());
129 }
130
131 #[test]
132 fn config_deserialise() {
133 let json = r#"{"enabled":true,"url":"redis://redis:6379","stream_key":"failed_events","max_len":10000}"#;
134 let config: RedisDlqConfig = serde_json::from_str(json).unwrap();
135 assert!(config.enabled);
136 assert_eq!(config.url, "redis://redis:6379");
137 assert_eq!(config.stream_key, "failed_events");
138 assert_eq!(config.max_len, Some(10000));
139 }
140}