use std::time::Duration;
use async_trait::async_trait;
use redis::AsyncCommands;
use redis::aio::ConnectionManager;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::Mutex;
use crate::common::message::{Message, SharedMessage};
use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::transform::value::ValueSource;
fn json_value_to_string(v: &Value) -> String {
if let Some(s) = v.as_str() {
s.to_string()
} else {
v.to_string()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RedisValueConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub from: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<Value>,
}
impl RedisValueConfig {
fn compile(&self) -> Result<ValueSource> {
ValueSource::compile(self.from.as_deref(), self.value.as_ref())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RedisTtlConfig {
Static(String),
Dynamic(RedisTtlOptions),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RedisTtlOptions {
pub from: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RedisSinkConfig {
pub url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key: Option<RedisValueConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<RedisValueConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub items: Option<RedisValueConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ttl: Option<RedisTtlConfig>,
}
impl RedisSinkConfig {
fn validate_and_normalize(&mut self) -> Result<()> {
if self.url.trim().is_empty() {
return Err(Error::config("redis sink requires 'url' in config"));
}
let has_single = self.key.is_some() && self.value.is_some();
let has_batch = self.items.is_some();
if !has_single && !has_batch {
return Err(Error::config(
"redis sink requires either 'key'/'value' pair OR 'items'",
));
}
if has_single && has_batch {
return Err(Error::config(
"redis sink cannot have both 'key'/'value' AND 'items'",
));
}
if let Some(ttl_config) = &self.ttl {
match ttl_config {
RedisTtlConfig::Static(s) => {
let d = humantime::parse_duration(s).map_err(|e| {
Error::config(format!("redis sink has invalid 'ttl': {}", e))
})?;
if d.as_secs() == 0 {
return Err(Error::config("redis sink requires 'ttl' to be at least 1s"));
}
}
RedisTtlConfig::Dynamic(opts) => {
if opts.from.trim().is_empty() {
return Err(Error::config(
"redis sink 'ttl.from' cannot be empty in dynamic mode",
));
}
if let Some(def) = &opts.default {
let d = humantime::parse_duration(def).map_err(|e| {
Error::config(format!("redis sink has invalid 'ttl.default': {}", e))
})?;
if d.as_secs() == 0 {
return Err(Error::config(
"redis sink requires 'ttl.default' to be at least 1s",
));
}
}
}
}
}
Ok(())
}
}
struct CompiledTtl {
source: ValueSource,
default: Option<Duration>,
}
impl CompiledTtl {
fn resolve(&self, msg: &SharedMessage) -> Result<Option<Duration>> {
let value = self.source.resolve(msg.as_ref());
if (value.is_null() && self.source.should_skip_null())
|| (value.is_string() && value.as_str().unwrap().trim().is_empty())
{
return Ok(self.default);
}
let s = if let Some(s) = value.as_str() {
s
} else {
return Err(Error::sink(format!(
"TTL resolved to non-string value: {}",
value
)));
};
if s.trim().is_empty() {
return Ok(self.default);
}
let d = humantime::parse_duration(s)
.map_err(|e| Error::sink(format!("Failed to parse TTL duration '{}': {}", s, e)))?;
if d.as_secs() == 0 {
return Err(Error::sink("TTL must be at least 1s"));
}
Ok(Some(d))
}
}
pub struct RedisSink {
id: String,
key: Option<ValueSource>,
value: Option<ValueSource>,
items: Option<ValueSource>,
ttl: Option<CompiledTtl>,
connection: Mutex<ConnectionManager>,
}
impl RedisSink {
pub async fn new(id: impl Into<String>, config: RedisSinkConfig) -> Result<Self> {
let id = id.into();
let mut config = config;
config.validate_and_normalize()?;
let key = config.key.map(|k| k.compile()).transpose()?;
let value = config.value.map(|v| v.compile()).transpose()?;
let items = config.items.map(|i| i.compile()).transpose()?;
let ttl = if let Some(ttl_config) = config.ttl {
match ttl_config {
RedisTtlConfig::Static(s) => {
let d = humantime::parse_duration(&s).unwrap();
Some(CompiledTtl {
source: ValueSource::Static(Value::String(s)),
default: Some(d),
})
}
RedisTtlConfig::Dynamic(opts) => {
let source = ValueSource::compile(Some(&opts.from), None)?;
let default = opts
.default
.map(|def| humantime::parse_duration(&def).unwrap());
Some(CompiledTtl { source, default })
}
}
} else {
None
};
let client = redis::Client::open(config.url.clone())
.map_err(|e| Error::config(format!("redis sink has invalid 'url': {}", e)))?;
let connection = ConnectionManager::new(client)
.await
.map_err(|e| Error::sink(format!("Failed to connect to redis: {}", e)))?;
Ok(Self {
id,
key,
value,
items,
ttl,
connection: Mutex::new(connection),
})
}
fn resolve_string(
source: &ValueSource,
msg: &Message,
label: &str,
allow_empty: bool,
) -> Result<String> {
let value = source.resolve(msg);
if value.is_null() && source.should_skip_null() {
return Err(Error::sink(format!(
"redis sink missing '{}' from message payload",
label
)));
}
let resolved = if let Some(s) = value.as_str() {
s.to_string()
} else {
value.to_string()
};
if !allow_empty && resolved.trim().is_empty() {
return Err(Error::sink(format!(
"redis sink resolved empty '{}' value",
label
)));
}
Ok(resolved)
}
}
#[async_trait]
impl Sink for RedisSink {
fn id(&self) -> &str {
&self.id
}
async fn process(&self, msg: SharedMessage) -> Result<()> {
let msg_ref = msg.as_ref();
let mut conn = self.connection.lock().await;
let ttl_duration = if let Some(ttl_compiler) = &self.ttl {
ttl_compiler.resolve(&msg)?
} else {
None
};
if let Some(items_source) = &self.items {
let items_val = items_source.resolve(msg_ref);
let items_obj = items_val
.as_object()
.ok_or_else(|| Error::sink("redis sink 'items' must resolve to a JSON object"))?;
if items_obj.is_empty() {
return Ok(());
}
if let Some(ttl) = ttl_duration {
let ttl_secs = ttl.as_secs();
let mut pipe = redis::pipe();
for (k, v) in items_obj {
pipe.set_ex(k, json_value_to_string(v), ttl_secs);
}
let _: () = pipe
.query_async(&mut *conn)
.await
.map_err(|e| Error::sink(format!("redis pipeline SETEX failed: {}", e)))?;
} else {
let kv_pairs: Vec<(String, String)> = items_obj
.iter()
.map(|(k, v)| (k.clone(), json_value_to_string(v)))
.collect();
let _: () = conn
.mset(&kv_pairs)
.await
.map_err(|e| Error::sink(format!("redis MSET failed: {}", e)))?;
}
} else if let (Some(key_source), Some(value_source)) = (&self.key, &self.value) {
let key = Self::resolve_string(key_source, msg_ref, "key", false)?;
let value = Self::resolve_string(value_source, msg_ref, "value", true)?;
if let Some(ttl) = ttl_duration {
let _: () = conn
.set_ex(key, value, ttl.as_secs())
.await
.map_err(|e| Error::sink(format!("redis SETEX failed: {}", e)))?;
} else {
let _: () = conn
.set(key, value)
.await
.map_err(|e| Error::sink(format!("redis SET failed: {}", e)))?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_redis_sink_config_single() {
let mut config = RedisSinkConfig {
url: "redis://localhost:6379/0".to_string(),
key: Some(RedisValueConfig {
from: Some("k".into()),
value: None,
}),
value: Some(RedisValueConfig {
from: Some("v".into()),
value: None,
}),
items: None,
ttl: None,
};
config.validate_and_normalize().unwrap();
}
#[test]
fn test_redis_sink_config_batch() {
let mut config = RedisSinkConfig {
url: "redis://localhost:6379/0".to_string(),
key: None,
value: None,
items: Some(RedisValueConfig {
from: Some("$".into()),
value: None,
}),
ttl: Some(RedisTtlConfig::Static("60s".to_string())),
};
config.validate_and_normalize().unwrap();
}
#[test]
fn test_redis_sink_config_missing_required() {
let mut config = RedisSinkConfig {
url: "redis://localhost:6379/0".to_string(),
key: None,
value: None,
items: None,
ttl: None,
};
assert!(config.validate_and_normalize().is_err());
}
#[test]
fn test_redis_sink_config_conflict() {
let mut config = RedisSinkConfig {
url: "redis://localhost:6379/0".to_string(),
key: Some(RedisValueConfig {
from: Some("k".into()),
value: None,
}),
value: Some(RedisValueConfig {
from: Some("v".into()),
value: None,
}),
items: Some(RedisValueConfig {
from: Some("$".into()),
value: None,
}),
ttl: None,
};
assert!(config.validate_and_normalize().is_err());
}
#[test]
fn test_redis_sink_config_rejects_zero_ttl() {
let mut config = RedisSinkConfig {
url: "redis://localhost:6379/0".to_string(),
key: None,
value: None,
items: Some(RedisValueConfig {
from: Some("$".into()),
value: None,
}),
ttl: Some(RedisTtlConfig::Static("0s".to_string())),
};
let err = config.validate_and_normalize().unwrap_err();
assert!(err.to_string().contains("redis sink requires 'ttl'"));
}
#[test]
fn test_redis_sink_config_dynamic_ttl_valid() {
let mut config = RedisSinkConfig {
url: "redis://localhost:6379/0".to_string(),
key: Some(RedisValueConfig {
from: Some("k".into()),
value: None,
}),
value: Some(RedisValueConfig {
from: Some("v".into()),
value: None,
}),
items: None,
ttl: Some(RedisTtlConfig::Dynamic(RedisTtlOptions {
from: "{{ $.ttl }}".into(),
default: Some("60s".into()),
})),
};
config.validate_and_normalize().unwrap();
}
}