#[cfg(feature = "redis")]
use std::sync::Arc;
#[cfg(feature = "redis")]
use std::time::Duration;
#[cfg(feature = "redis")]
use serde_json::json;
#[cfg(feature = "redis")]
use pipeflow::Message;
#[cfg(feature = "redis")]
use pipeflow::sink::Sink;
#[cfg(feature = "redis")]
use pipeflow::sink::redis::{RedisSink, RedisSinkConfig, RedisTtlConfig, RedisValueConfig};
#[cfg(feature = "redis")]
use pipeflow::source::redis::RedisSourceConfig;
#[cfg(feature = "redis")]
use pipeflow::source::{MessageSender, Source};
#[cfg(feature = "redis")]
use redis::AsyncCommands;
#[cfg(feature = "redis")]
use redis::aio::ConnectionManager;
#[cfg(feature = "redis")]
#[tokio::test]
async fn test_redis_sink_source_roundtrip() -> anyhow::Result<()> {
let redis_url = match std::env::var("REDIS_URL") {
Ok(url) => url,
Err(_) => {
eprintln!("REDIS_URL not set; skipping redis integration test");
return Ok(());
}
};
let key = format!("pipeflow:test:{}", uuid::Uuid::now_v7());
let sink_config = RedisSinkConfig {
url: redis_url.clone(),
key: Some(RedisValueConfig {
from: None,
value: Some(json!(key)),
}),
value: Some(RedisValueConfig {
from: Some("$.value".to_string()),
value: None,
}),
items: None,
ttl: Some(RedisTtlConfig::Static("30s".to_string())),
};
let sink = RedisSink::new("redis_sink", sink_config).await?;
let msg = Message::new("test_source", json!({ "value": "hello" }));
sink.process(Arc::new(msg)).await?;
let source_config = RedisSourceConfig {
url: redis_url.clone(),
key: Some(key.clone()),
keys: None,
interval: Duration::from_secs(5),
schedule: None,
mode: "oneshot".to_string(),
parse_json: true,
};
let source = pipeflow::source::redis::RedisSource::new("redis_source", source_config)?;
let (tx, mut rx) = tokio::sync::broadcast::channel(2);
let sender = MessageSender::new(tx, None);
let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
source.run(sender, shutdown_rx).await?;
let received = rx.recv().await.expect("message received");
assert_eq!(received.payload["key"], key);
assert_eq!(received.payload["value"], "hello");
let client = redis::Client::open(redis_url)?;
let mut conn = ConnectionManager::new(client).await?;
let ttl: i64 = conn.ttl(&key).await?;
assert!(ttl > 0, "expected TTL to be set for key");
let _: () = conn.del(&key).await?;
Ok(())
}