use crate::config::register_plugin;
use crate::config::ItemType;
use crate::config::{ConfigSpec, ExecutionType};
use crate::{BatchingPolicy, Closer, Error, Message, MessageBatch, Output, OutputBatch};
use async_trait::async_trait;
use fiddler_macros::fiddler_registration_func;
use redis::aio::ConnectionManager;
use redis::{AsyncCommands, Client, ErrorKind};
use serde::Deserialize;
use serde_yaml::Value;
use std::time::Duration;
use tracing::{debug, warn};
fn is_auth_error(e: &redis::RedisError) -> bool {
matches!(e.kind(), ErrorKind::AuthenticationFailed)
|| e.to_string().to_lowercase().contains("noauth")
|| e.to_string().to_lowercase().contains("wrongpass")
|| e.to_string().to_lowercase().contains("invalid password")
}
fn redis_error_to_fiddler_error(e: redis::RedisError, context: &str) -> Error {
if is_auth_error(&e) {
Error::ConfigFailedValidation(format!("Redis authentication failed: {}", e))
} else {
Error::ExecutionError(format!("{}: {}", context, e))
}
}
const DEFAULT_MODE: &str = "list";
const DEFAULT_LIST_COMMAND: &str = "rpush";
#[derive(Deserialize, Clone)]
pub struct RedisOutputConfig {
pub url: String,
#[serde(default = "default_mode")]
pub mode: String,
pub key: Option<String>,
pub channel: Option<String>,
pub stream: Option<String>,
pub max_len: Option<usize>,
#[serde(default = "default_list_command")]
pub list_command: String,
#[serde(default)]
pub batch: Option<BatchingPolicy>,
}
fn default_mode() -> String {
DEFAULT_MODE.to_string()
}
fn default_list_command() -> String {
DEFAULT_LIST_COMMAND.to_string()
}
pub struct RedisListOutput {
conn: ConnectionManager,
key: String,
use_lpush: bool,
batch_size: usize,
interval: Duration,
max_batch_bytes: usize,
}
impl RedisListOutput {
pub async fn new(config: RedisOutputConfig) -> Result<Self, Error> {
let key = config
.key
.ok_or_else(|| Error::ConfigFailedValidation("key required for list mode".into()))?;
let use_lpush = config.list_command.to_lowercase() == "lpush";
let client = Client::open(config.url.as_str())
.map_err(|e| redis_error_to_fiddler_error(e, "Invalid Redis URL"))?;
let conn = ConnectionManager::new(client)
.await
.map_err(|e| redis_error_to_fiddler_error(e, "Failed to connect to Redis"))?;
let batch_size = config.batch.as_ref().map_or(500, |b| b.effective_size());
let interval = config
.batch
.as_ref()
.map_or(Duration::from_secs(10), |b| b.effective_duration());
let max_batch_bytes = config
.batch
.as_ref()
.map_or(10_485_760, |b| b.effective_max_batch_bytes());
debug!(key = %key, use_lpush = use_lpush, "Redis list output initialized");
Ok(Self {
conn,
key,
use_lpush,
batch_size,
interval,
max_batch_bytes,
})
}
}
#[async_trait]
impl OutputBatch for RedisListOutput {
async fn write_batch(&mut self, messages: MessageBatch) -> Result<(), Error> {
if messages.is_empty() {
return Ok(());
}
let mut pipe = redis::pipe();
for msg in &messages {
if self.use_lpush {
pipe.lpush(&self.key, msg.bytes.as_slice());
} else {
pipe.rpush(&self.key, msg.bytes.as_slice());
}
}
pipe.query_async::<()>(&mut self.conn).await.map_err(|e| {
if is_auth_error(&e) {
Error::UnRetryable(format!("Redis authentication failed: {}", e))
} else {
Error::OutputError(format!("Redis push failed: {}", e))
}
})?;
debug!(count = messages.len(), key = %self.key, "Pushed batch to Redis");
Ok(())
}
async fn batch_size(&self) -> usize {
self.batch_size
}
async fn interval(&self) -> Duration {
self.interval
}
async fn max_batch_bytes(&self) -> usize {
self.max_batch_bytes
}
}
#[async_trait]
impl Closer for RedisListOutput {
async fn close(&mut self) -> Result<(), Error> {
debug!("Redis list output closing");
Ok(())
}
}
pub struct RedisPubSubOutput {
conn: ConnectionManager,
channel: String,
}
impl RedisPubSubOutput {
pub async fn new(config: RedisOutputConfig) -> Result<Self, Error> {
let channel = config.channel.ok_or_else(|| {
Error::ConfigFailedValidation("channel required for pubsub mode".into())
})?;
let client = Client::open(config.url.as_str())
.map_err(|e| redis_error_to_fiddler_error(e, "Invalid Redis URL"))?;
let conn = ConnectionManager::new(client)
.await
.map_err(|e| redis_error_to_fiddler_error(e, "Failed to connect to Redis"))?;
debug!(channel = %channel, "Redis pub/sub output initialized");
Ok(Self { conn, channel })
}
}
#[async_trait]
impl Output for RedisPubSubOutput {
async fn write(&mut self, message: Message) -> Result<(), Error> {
self.conn
.publish::<_, _, ()>(&self.channel, message.bytes.as_slice())
.await
.map_err(|e| {
if is_auth_error(&e) {
Error::UnRetryable(format!("Redis authentication failed: {}", e))
} else {
Error::OutputError(format!("Redis publish failed: {}", e))
}
})?;
Ok(())
}
}
#[async_trait]
impl Closer for RedisPubSubOutput {
async fn close(&mut self) -> Result<(), Error> {
debug!("Redis pub/sub output closing");
Ok(())
}
}
pub struct RedisStreamOutput {
conn: ConnectionManager,
stream: String,
max_len: Option<usize>,
batch_size: usize,
interval: Duration,
max_batch_bytes: usize,
}
impl RedisStreamOutput {
pub async fn new(config: RedisOutputConfig) -> Result<Self, Error> {
let stream = config.stream.ok_or_else(|| {
Error::ConfigFailedValidation("stream required for stream mode".into())
})?;
let client = Client::open(config.url.as_str())
.map_err(|e| redis_error_to_fiddler_error(e, "Invalid Redis URL"))?;
let conn = ConnectionManager::new(client)
.await
.map_err(|e| redis_error_to_fiddler_error(e, "Failed to connect to Redis"))?;
let batch_size = config.batch.as_ref().map_or(500, |b| b.effective_size());
let interval = config
.batch
.as_ref()
.map_or(Duration::from_secs(10), |b| b.effective_duration());
let max_batch_bytes = config
.batch
.as_ref()
.map_or(10_485_760, |b| b.effective_max_batch_bytes());
let max_len = config.max_len;
debug!(stream = %stream, max_len = ?max_len, "Redis stream output initialized");
Ok(Self {
conn,
stream,
max_len,
batch_size,
interval,
max_batch_bytes,
})
}
fn build_xadd_cmd(stream: &str, max_len: Option<usize>, message: &Message) -> redis::Cmd {
let mut cmd = redis::cmd("XADD");
cmd.arg(stream);
if let Some(len) = max_len {
cmd.arg("MAXLEN").arg("~").arg(len);
}
cmd.arg("*");
cmd.arg("data").arg(message.bytes.as_slice());
for (key, value) in &message.metadata {
if let Some(s) = value.as_str() {
cmd.arg(format!("meta:{}", key)).arg(s);
}
}
cmd
}
}
#[async_trait]
impl OutputBatch for RedisStreamOutput {
async fn write_batch(&mut self, messages: MessageBatch) -> Result<(), Error> {
if messages.is_empty() {
return Ok(());
}
let mut pipe = redis::pipe();
for msg in &messages {
pipe.add_command(Self::build_xadd_cmd(&self.stream, self.max_len, msg));
}
pipe.query_async::<()>(&mut self.conn).await.map_err(|e| {
if is_auth_error(&e) {
Error::UnRetryable(format!("Redis authentication failed: {}", e))
} else {
Error::OutputError(format!("Redis XADD failed: {}", e))
}
})?;
debug!(count = messages.len(), stream = %self.stream, "XADD batch to Redis stream");
Ok(())
}
async fn batch_size(&self) -> usize {
self.batch_size
}
async fn interval(&self) -> Duration {
self.interval
}
async fn max_batch_bytes(&self) -> usize {
self.max_batch_bytes
}
}
#[async_trait]
impl Closer for RedisStreamOutput {
async fn close(&mut self) -> Result<(), Error> {
debug!("Redis stream output closing");
Ok(())
}
}
#[fiddler_registration_func]
fn create_redis_output(conf: Value) -> Result<ExecutionType, Error> {
let config: RedisOutputConfig = serde_yaml::from_value(conf)?;
if config.url.is_empty() {
return Err(Error::ConfigFailedValidation("url is required".into()));
}
match config.mode.as_str() {
"list" => {
if config.key.is_none() {
return Err(Error::ConfigFailedValidation(
"key is required for list mode".into(),
));
}
if !["lpush", "rpush"].contains(&config.list_command.to_lowercase().as_str()) {
return Err(Error::ConfigFailedValidation(
"list_command must be 'lpush' or 'rpush'".into(),
));
}
Ok(ExecutionType::OutputBatch(Box::new(
RedisListOutput::new(config).await?,
)))
}
"pubsub" => {
if config.channel.is_none() {
return Err(Error::ConfigFailedValidation(
"channel is required for pubsub mode".into(),
));
}
if config.batch.is_some() {
warn!("batch configuration is ignored in pubsub mode");
}
Ok(ExecutionType::Output(Box::new(
RedisPubSubOutput::new(config).await?,
)))
}
"stream" => {
if config.stream.is_none() {
return Err(Error::ConfigFailedValidation(
"stream is required for stream mode".into(),
));
}
Ok(ExecutionType::OutputBatch(Box::new(
RedisStreamOutput::new(config).await?,
)))
}
_ => Err(Error::ConfigFailedValidation(
"mode must be 'list', 'pubsub', or 'stream'".into(),
)),
}
}
pub(crate) fn register_redis() -> Result<(), Error> {
let config = r#"type: object
required:
- url
properties:
url:
type: string
description: "Redis connection URL (redis://host:port/db)"
mode:
type: string
enum: ["list", "pubsub", "stream"]
default: "list"
description: "Operation mode"
key:
type: string
description: "List key name (required for list mode)"
channel:
type: string
description: "Pub/Sub channel name (required for pubsub mode)"
stream:
type: string
description: "Stream name (required for stream mode)"
max_len:
type: integer
description: "Approximate maximum stream length for trimming (stream mode only)"
list_command:
type: string
enum: ["lpush", "rpush"]
default: "rpush"
description: "List operation (for list mode)"
batch:
type: object
properties:
size:
type: integer
description: "Batch size (default: 500)"
duration:
type: string
description: "Flush interval (default: 10s)"
max_batch_bytes:
type: integer
default: 10485760
description: "Maximum cumulative byte size per batch (default: 10MB)"
description: "Batching policy (list mode only)"
"#;
let conf_spec = ConfigSpec::from_schema(config)?;
register_plugin(
"redis".into(),
ItemType::Output,
conf_spec.clone(),
create_redis_output,
)?;
register_plugin(
"redis".into(),
ItemType::OutputBatch,
conf_spec,
create_redis_output,
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_deserialization_list() {
let yaml = r#"
url: "redis://localhost:6379/0"
mode: "list"
key: "events"
list_command: "rpush"
batch:
size: 1000
"#;
let config: RedisOutputConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config.url, "redis://localhost:6379/0");
assert_eq!(config.mode, "list");
assert_eq!(config.key, Some("events".to_string()));
assert_eq!(config.list_command, "rpush");
assert!(config.batch.is_some());
}
#[test]
fn test_config_deserialization_pubsub() {
let yaml = r#"
url: "redis://localhost:6379"
mode: "pubsub"
channel: "events.stream"
"#;
let config: RedisOutputConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config.url, "redis://localhost:6379");
assert_eq!(config.mode, "pubsub");
assert_eq!(config.channel, Some("events.stream".to_string()));
}
#[test]
fn test_config_defaults() {
let yaml = r#"
url: "redis://localhost:6379"
key: "test"
"#;
let config: RedisOutputConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config.mode, "list");
assert_eq!(config.list_command, "rpush");
assert!(config.batch.is_none());
}
#[test]
fn test_config_deserialization_stream() {
let yaml = r#"
url: "redis://localhost:6379/0"
mode: "stream"
stream: "my_stream"
max_len: 10000
batch:
size: 1000
"#;
let config: RedisOutputConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config.url, "redis://localhost:6379/0");
assert_eq!(config.mode, "stream");
assert_eq!(config.stream, Some("my_stream".to_string()));
assert_eq!(config.max_len, Some(10000));
assert!(config.batch.is_some());
}
#[test]
fn test_config_deserialization_stream_no_trim() {
let yaml = r#"
url: "redis://localhost:6379"
mode: "stream"
stream: "events"
"#;
let config: RedisOutputConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config.mode, "stream");
assert_eq!(config.stream, Some("events".to_string()));
assert_eq!(config.max_len, None);
assert!(config.batch.is_none());
}
#[test]
fn test_stream_mode_requires_stream_field() {
let yaml = r#"
url: "redis://localhost:6379"
mode: "stream"
"#;
let config: RedisOutputConfig = serde_yaml::from_str(yaml).unwrap();
assert!(config.stream.is_none());
}
#[test]
fn test_register_redis() {
let result = register_redis();
assert!(result.is_ok() || matches!(result, Err(Error::DuplicateRegisteredName(_))));
}
}