pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#[cfg(feature = "redis")]
use std::sync::Arc;
#[cfg(feature = "redis")]
use std::time::Duration;

#[cfg(feature = "redis")]
use serde_json::json;

#[cfg(feature = "redis")]
use pipeflow::Message;
#[cfg(feature = "redis")]
use pipeflow::sink::Sink;
#[cfg(feature = "redis")]
use pipeflow::sink::redis::{RedisSink, RedisSinkConfig, RedisTtlConfig, RedisValueConfig};
#[cfg(feature = "redis")]
use pipeflow::source::redis::RedisSourceConfig;
#[cfg(feature = "redis")]
use pipeflow::source::{MessageSender, Source};

#[cfg(feature = "redis")]
use redis::AsyncCommands;
#[cfg(feature = "redis")]
use redis::aio::ConnectionManager;

#[cfg(feature = "redis")]
#[tokio::test]
async fn test_redis_sink_source_roundtrip() -> anyhow::Result<()> {
    let redis_url = match std::env::var("REDIS_URL") {
        Ok(url) => url,
        Err(_) => {
            eprintln!("REDIS_URL not set; skipping redis integration test");
            return Ok(());
        }
    };

    let key = format!("pipeflow:test:{}", uuid::Uuid::now_v7());

    let sink_config = RedisSinkConfig {
        url: redis_url.clone(),
        key: Some(RedisValueConfig {
            from: None,
            value: Some(json!(key)),
        }),
        value: Some(RedisValueConfig {
            from: Some("$.value".to_string()),
            value: None,
        }),
        items: None,
        ttl: Some(RedisTtlConfig::Static("30s".to_string())),
    };

    let sink = RedisSink::new("redis_sink", sink_config).await?;
    let msg = Message::new("test_source", json!({ "value": "hello" }));
    sink.process(Arc::new(msg)).await?;

    let source_config = RedisSourceConfig {
        url: redis_url.clone(),
        key: Some(key.clone()),
        keys: None,
        interval: Duration::from_secs(5),
        schedule: None,
        mode: "oneshot".to_string(),
        parse_json: true,
    };
    let source = pipeflow::source::redis::RedisSource::new("redis_source", source_config)?;

    let (tx, mut rx) = tokio::sync::broadcast::channel(2);
    let sender = MessageSender::new(tx, None);
    let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

    source.run(sender, shutdown_rx).await?;
    let received = rx.recv().await.expect("message received");

    assert_eq!(received.payload["key"], key);
    assert_eq!(received.payload["value"], "hello");

    let client = redis::Client::open(redis_url)?;
    let mut conn = ConnectionManager::new(client).await?;
    let ttl: i64 = conn.ttl(&key).await?;
    assert!(ttl > 0, "expected TTL to be set for key");
    let _: () = conn.del(&key).await?;

    Ok(())
}