pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! E2E tests for Redis GET -> SET pipeline
//!
//! Requires a Redis server. Set REDIS_URL to enable the test.

#[cfg(feature = "redis")]
mod redis_e2e {
    use std::time::Duration;

    use anyhow::Result;
    use pipeflow::config::Config;
    use pipeflow::engine::Engine;
    use redis::AsyncCommands;
    use redis::aio::ConnectionManager;

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_redis_get_set_with_ttl() -> Result<()> {
        let redis_url = match std::env::var("REDIS_URL") {
            Ok(url) => url,
            Err(_) => {
                eprintln!("REDIS_URL not set; skipping redis e2e test");
                return Ok(());
            }
        };

        let input_key = format!("pipeflow:e2e:in:{}", uuid::Uuid::now_v7());
        let output_key = format!("pipeflow:e2e:out:{}", uuid::Uuid::now_v7());

        let client = redis::Client::open(redis_url.clone())?;
        let mut conn = ConnectionManager::new(client).await?;
        let _: () = conn.set(&input_key, "hello").await?;

        let yaml = format!(
            r#"
pipeline:
  sources:
    - id: redis_in
      type: redis
      config:
        url: "{redis_url}"
        key: "{input_key}"
        mode: oneshot
        parse_json: false
  transforms:
    - id: pass
      inputs: [redis_in]
      outputs: [redis_out]
  sinks:
    - id: redis_out
      type: redis
      config:
        url: "{redis_url}"
        key:
          value: "{output_key}"
        value:
          from: "$.value"
        ttl: "30s"
"#
        );

        let config = Config::from_yaml(&yaml)?;
        let mut engine = Engine::from_config(config)?;
        engine.build().await?;

        let run = engine.run_with_signal(async {
            tokio::time::sleep(Duration::from_secs(5)).await;
        });
        tokio::time::timeout(Duration::from_secs(5), run)
            .await
            .expect("engine timeout")?;

        let value: String = conn.get(&output_key).await?;
        assert_eq!(value, "hello");

        let ttl: i64 = conn.ttl(&output_key).await?;
        assert!(ttl > 0, "expected TTL to be set for key");

        let _: () = conn.del(&input_key).await?;
        let _: () = conn.del(&output_key).await?;

        Ok(())
    }
}