use serde::{Deserialize, Serialize};
use super::entry::DlqEntry;
use super::error::DlqError;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct RedisDlqConfig {
pub enabled: bool,
pub url: String,
pub stream_key: String,
pub max_len: Option<usize>,
}
impl Default for RedisDlqConfig {
fn default() -> Self {
Self {
enabled: false,
url: "redis://127.0.0.1:6379".into(),
stream_key: "dlq".into(),
max_len: None,
}
}
}
pub struct RedisDlqInner {
conn: redis::aio::MultiplexedConnection,
stream_key: String,
max_len: Option<usize>,
}
impl std::fmt::Debug for RedisDlqInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisDlqInner")
.field("stream_key", &self.stream_key)
.field("max_len", &self.max_len)
.finish_non_exhaustive()
}
}
impl RedisDlqInner {
pub async fn new(config: &RedisDlqConfig) -> Result<Self, DlqError> {
let client = redis::Client::open(config.url.as_str())
.map_err(|e| DlqError::BackendError(format!("Redis DLQ open: {e}")))?;
let conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| DlqError::BackendError(format!("Redis DLQ connect: {e}")))?;
Ok(Self {
conn,
stream_key: config.stream_key.clone(),
max_len: config.max_len,
})
}
pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
if batch.is_empty() {
return Ok(());
}
let mut pipe = redis::pipe();
for entry in batch {
let json =
serde_json::to_string(entry).map_err(|e| DlqError::Serialization(e.to_string()))?;
let mut cmd = redis::cmd("XADD");
cmd.arg(&self.stream_key);
if let Some(max_len) = self.max_len {
cmd.arg("MAXLEN").arg("~").arg(max_len);
}
cmd.arg("*").arg("data").arg(&json);
pipe.add_command(cmd);
}
pipe.query_async::<()>(&mut self.conn)
.await
.map_err(|e| DlqError::BackendError(format!("Redis XADD batch: {e}")))?;
#[cfg(feature = "metrics")]
metrics::counter!("dfe_dlq_sent_total", "backend" => "redis").increment(batch.len() as u64);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults() {
let config = RedisDlqConfig::default();
assert!(!config.enabled);
assert_eq!(config.url, "redis://127.0.0.1:6379");
assert_eq!(config.stream_key, "dlq");
assert!(config.max_len.is_none());
}
#[test]
fn config_deserialise() {
let json = r#"{"enabled":true,"url":"redis://redis:6379","stream_key":"failed_events","max_len":10000}"#;
let config: RedisDlqConfig = serde_json::from_str(json).unwrap();
assert!(config.enabled);
assert_eq!(config.url, "redis://redis:6379");
assert_eq!(config.stream_key, "failed_events");
assert_eq!(config.max_len, Some(10000));
}
}