#[cfg(feature = "redis")]
mod redis_e2e {
use std::time::Duration;
use anyhow::Result;
use pipeflow::config::Config;
use pipeflow::engine::Engine;
use redis::AsyncCommands;
use redis::aio::ConnectionManager;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_redis_get_set_with_ttl() -> Result<()> {
let redis_url = match std::env::var("REDIS_URL") {
Ok(url) => url,
Err(_) => {
eprintln!("REDIS_URL not set; skipping redis e2e test");
return Ok(());
}
};
let input_key = format!("pipeflow:e2e:in:{}", uuid::Uuid::now_v7());
let output_key = format!("pipeflow:e2e:out:{}", uuid::Uuid::now_v7());
let client = redis::Client::open(redis_url.clone())?;
let mut conn = ConnectionManager::new(client).await?;
let _: () = conn.set(&input_key, "hello").await?;
let yaml = format!(
r#"
pipeline:
sources:
- id: redis_in
type: redis
config:
url: "{redis_url}"
key: "{input_key}"
mode: oneshot
parse_json: false
transforms:
- id: pass
inputs: [redis_in]
outputs: [redis_out]
sinks:
- id: redis_out
type: redis
config:
url: "{redis_url}"
key:
value: "{output_key}"
value:
from: "$.value"
ttl: "30s"
"#
);
let config = Config::from_yaml(&yaml)?;
let mut engine = Engine::from_config(config)?;
engine.build().await?;
let run = engine.run_with_signal(async {
tokio::time::sleep(Duration::from_secs(5)).await;
});
tokio::time::timeout(Duration::from_secs(5), run)
.await
.expect("engine timeout")?;
let value: String = conn.get(&output_key).await?;
assert_eq!(value, "hello");
let ttl: i64 = conn.ttl(&output_key).await?;
assert!(ttl > 0, "expected TTL to be set for key");
let _: () = conn.del(&input_key).await?;
let _: () = conn.del(&output_key).await?;
Ok(())
}
}