pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#![cfg(feature = "database")]

use std::time::Duration;

use pipeflow::source::{
    MessageSender, Source,
    sql::{SqlDriver, SqlSource, SqlSourceConfig},
};
use serde_json::json;
use tokio::sync::broadcast;

#[tokio::test]
async fn test_sql_source_sqlite_rows() {
    use tempfile::NamedTempFile;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    sqlx::query(
        "CREATE TABLE events (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            active BOOLEAN NOT NULL
        )",
    )
    .execute(&pool)
    .await
    .expect("Failed to create table");

    sqlx::query("INSERT INTO events (id, name, active) VALUES (1, 'alpha', 1), (2, 'beta', 0)")
        .execute(&pool)
        .await
        .expect("Failed to insert rows");

    pool.close().await;

    let config = SqlSourceConfig {
        driver: SqlDriver::Sqlite,
        connection: db_path.to_string(),
        query: "SELECT id, name, active FROM events ORDER BY id".to_string(),
        interval: Duration::from_millis(100),
        schedule: None,
    };

    let source = SqlSource::new("sql_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    let msg1 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for row")
        .expect("Failed to receive message");
    let msg2 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for row")
        .expect("Failed to receive message");

    let _ = shutdown_tx.send(());
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for SQL source to shut down")
        .expect("SQL source task panicked");
    assert!(result.is_ok(), "SQL source should shut down cleanly");

    assert_eq!(msg1.meta.source_node, "sql_source");
    assert_eq!(msg2.meta.source_node, "sql_source");
    assert_eq!(
        msg1.payload,
        json!({"id": 1, "name": "alpha", "active": true})
    );
    assert_eq!(
        msg2.payload,
        json!({"id": 2, "name": "beta", "active": false})
    );
}

#[tokio::test]
async fn test_sql_source_empty_results() {
    use tempfile::NamedTempFile;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY, name TEXT)")
        .execute(&pool)
        .await
        .expect("Failed to create table");

    pool.close().await;

    let config = SqlSourceConfig {
        driver: SqlDriver::Sqlite,
        connection: db_path.to_string(),
        query: "SELECT id, name FROM events".to_string(),
        interval: Duration::from_millis(100),
        schedule: None,
    };

    let source = SqlSource::new("sql_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Wait a bit to ensure query executes
    tokio::time::sleep(Duration::from_millis(50)).await;

    // Should not receive any messages
    let result = tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await;
    assert!(result.is_err(), "Should timeout waiting for messages");

    let _ = shutdown_tx.send(());
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for SQL source to shut down")
        .expect("SQL source task panicked");
    assert!(result.is_ok(), "SQL source should shut down cleanly");
}

#[tokio::test]
async fn test_sql_source_invalid_query() {
    use tempfile::NamedTempFile;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY)")
        .execute(&pool)
        .await
        .expect("Failed to create table");

    pool.close().await;

    let config = SqlSourceConfig {
        driver: SqlDriver::Sqlite,
        connection: db_path.to_string(),
        query: "SELECT * FROM nonexistent_table".to_string(),
        interval: Duration::from_millis(100),
        schedule: None,
    };

    let source = SqlSource::new("sql_source", config).expect("Source should build");

    let (sender, _receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Wait a bit to ensure query executes and logs warning
    tokio::time::sleep(Duration::from_millis(150)).await;

    // The source should continue running despite query errors
    assert!(
        !handle.is_finished(),
        "Source should continue running after query error"
    );

    handle.abort();
}

#[tokio::test]
async fn test_sql_source_null_and_blob_values() {
    use tempfile::NamedTempFile;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    sqlx::query(
        "CREATE TABLE data (
            id INTEGER PRIMARY KEY,
            optional_text TEXT,
            blob_data BLOB
        )",
    )
    .execute(&pool)
    .await
    .expect("Failed to create table");

    sqlx::query("INSERT INTO data (id, optional_text, blob_data) VALUES (1, NULL, X'DEADBEEF')")
        .execute(&pool)
        .await
        .expect("Failed to insert row");

    pool.close().await;

    let config = SqlSourceConfig {
        driver: SqlDriver::Sqlite,
        connection: db_path.to_string(),
        query: "SELECT id, optional_text, blob_data FROM data".to_string(),
        interval: Duration::from_millis(100),
        schedule: None,
    };

    let source = SqlSource::new("sql_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    let msg = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for row")
        .expect("Failed to receive message");

    let _ = shutdown_tx.send(());
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for SQL source to shut down")
        .expect("SQL source task panicked");
    assert!(result.is_ok(), "SQL source should shut down cleanly");

    assert_eq!(msg.payload["id"], json!(1));
    assert_eq!(msg.payload["optional_text"], json!(null));
    assert_eq!(msg.payload["blob_data"], json!("0xdeadbeef"));
}

#[tokio::test]
async fn test_sql_source_polling_interval() {
    use tempfile::NamedTempFile;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY)")
        .execute(&pool)
        .await
        .expect("Failed to create table");

    sqlx::query("INSERT INTO events (id) VALUES (1)")
        .execute(&pool)
        .await
        .expect("Failed to insert row");

    pool.close().await;

    let config = SqlSourceConfig {
        driver: SqlDriver::Sqlite,
        connection: db_path.to_string(),
        query: "SELECT id FROM events".to_string(),
        interval: Duration::from_millis(200),
        schedule: None,
    };

    let source = SqlSource::new("sql_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let start = std::time::Instant::now();
    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Receive first poll (immediate)
    let _ = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for first poll")
        .expect("Failed to receive first message");

    // Receive second poll (should be after interval)
    let _ = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for second poll")
        .expect("Failed to receive second message");

    let elapsed = start.elapsed();

    let _ = shutdown_tx.send(());
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for SQL source to shut down")
        .expect("SQL source task panicked");
    assert!(result.is_ok(), "SQL source should shut down cleanly");

    // Second poll should happen at least 200ms after start
    assert!(
        elapsed >= Duration::from_millis(200),
        "Polling interval not respected: elapsed {:?}",
        elapsed
    );
}

#[tokio::test]
async fn test_sql_source_with_schedule() {
    use tempfile::NamedTempFile;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY)")
        .execute(&pool)
        .await
        .expect("Failed to create table");

    sqlx::query("INSERT INTO events (id) VALUES (42)")
        .execute(&pool)
        .await
        .expect("Failed to insert row");

    pool.close().await;

    // Use "* * * * * *" (every second) schedule
    let config = SqlSourceConfig {
        driver: SqlDriver::Sqlite,
        connection: db_path.to_string(),
        query: "SELECT id FROM events".to_string(),
        interval: Duration::ZERO, // Ignored when schedule is set
        schedule: Some("* * * * * *".to_string()),
    };

    let source = SqlSource::new("sql_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // First message should arrive within ~1 second (next cron tick)
    let msg = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for first scheduled poll")
        .expect("Failed to receive message");

    assert_eq!(msg.payload["id"], serde_json::json!(42));

    let _ = shutdown_tx.send(());
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for SQL source to shut down")
        .expect("SQL source task panicked");
    assert!(result.is_ok(), "SQL source should shut down cleanly");
}