pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#![cfg(feature = "database")]

//! Integration tests for SQL sink

use pipeflow::common::types::Event;
use pipeflow::config::Config;
use pipeflow::engine::Engine;
use std::time::Duration;

mod common;

/// Test SQL sink with in-memory SQLite database
#[tokio::test]
async fn test_sql_sink_sqlite_memory() {
    let yaml = r#"
pipeline:
  transforms:
    - id: pass_through
      inputs: [source::system::event]
      outputs: [sql_sink]
  sinks:
    - id: sql_sink
      type: sql
      config:
        driver: sqlite
        connection: ":memory:"
        table: events
        columns:
          - name: id
            value: "$UUID"
          - name: timestamp
            value: "$NOW"
          - name: source
            from: "$.source"
          - name: value
            from: "$.count"
"#;

    let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
    let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
    let result = engine.build().await;
    assert!(result.is_ok(), "Build failed: {:?}", result.err());
}

/// Test SQL sink config validation - missing columns
#[tokio::test]
async fn test_sql_sink_config_validation_no_columns() {
    let yaml = r#"
pipeline:
  transforms:
    - id: pass_through
      inputs: [source::system::event]
      outputs: [sql_sink]
  sinks:
    - id: sql_sink
      type: sql
      config:
        driver: sqlite
        connection: "test.db"
        table: events
        columns: []
"#;

    let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
    let mut engine = Engine::from_config(config).expect("Engine creation should succeed");

    // Build should fail due to empty columns
    let result = engine.build().await;
    assert!(result.is_err());
    assert!(
        result
            .unwrap_err()
            .to_string()
            .contains("at least one column")
    );
}

/// Test SQL sink config validation - column without from or value
#[tokio::test]
async fn test_sql_sink_config_validation_invalid_column() {
    let yaml = r#"
pipeline:
  transforms:
    - id: pass_through
      inputs: [source::system::event]
      outputs: [sql_sink]
  sinks:
    - id: sql_sink
      type: sql
      config:
        driver: sqlite
        connection: "test.db"
        table: events
        columns:
          - name: bad_column
"#;

    let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
    let mut engine = Engine::from_config(config).expect("Engine creation should succeed");

    // Build should fail due to missing from/value
    let result = engine.build().await;
    assert!(result.is_err());
    let err_msg = result.unwrap_err().to_string();
    assert!(
        err_msg.contains("from") || err_msg.contains("value"),
        "Expected error about missing from/value, got: {}",
        err_msg
    );
}

/// Test SQL sink with file-based SQLite database
#[tokio::test]
async fn test_sql_sink_sqlite_file() {
    use tempfile::NamedTempFile;

    // Create a temporary database file
    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    // Initialize the database with sqlx
    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");

    // Create table
    sqlx::query(
        "CREATE TABLE test_events (
            id TEXT PRIMARY KEY,
            message TEXT,
            count INTEGER
        )",
    )
    .execute(&pool)
    .await
    .expect("Failed to create table");

    // Close the pool so the sink can connect
    pool.close().await;

    let yaml = format!(
        r#"
pipeline:
  transforms:
    - id: pass_through
      inputs: [source::system::event]
      outputs: [sql_sink]
  sinks:
    - id: sql_sink
      type: sql
      config:
        driver: sqlite
        connection: "{}"
        table: test_events
        columns:
          - name: id
            value: "$UUID"
          - name: message
            from: "$.message"
          - name: count
            from: "$.count"
"#,
        db_path
    );

    let config = Config::from_yaml(&yaml).expect("Config parsing should succeed");
    let mut engine = Engine::from_config(config).expect("Engine creation should succeed");

    // Build should succeed (table exists)
    let result = engine.build().await;
    assert!(result.is_ok(), "Build failed: {:?}", result.err());
}

/// Hash -> SQL upsert test (insert_only should prevent updates)
#[tokio::test]
async fn test_sql_sink_upsert_insert_only_with_hash_transform() {
    use tempfile::NamedTempFile;

    // Create a temporary database file and table
    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path().to_str().unwrap();

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Failed to connect to SQLite file");
    sqlx::query(
        "CREATE TABLE test_events (
            id TEXT PRIMARY KEY,
            created_at TEXT NOT NULL,
            updated_at TEXT NOT NULL,
            title TEXT
        )",
    )
    .execute(&pool)
    .await
    .expect("Failed to create table");
    pool.close().await;

    let yaml = format!(
        r#"
pipeline:
  transforms:
    - id: add_id
      inputs: [source::system::event]
      outputs: [sql_sink]
      steps:
        - type: hash
          config:
            mappings:
              - algo: sha256
                from: "$.payload.title"
                to: "$.id"

  sinks:
    - id: sql_sink
      type: sql
      config:
        driver: sqlite
        connection: "{}"
        table: test_events
        upsert:
          conflict_columns: ["id"]
        columns:
          - name: id
            from: "$.id"
          - name: created_at
            value: "$NOW"
            insert_only: true
          - name: updated_at
            value: "$NOW"
          - name: title
            from: "$.payload.title"
"#,
        db_path
    );

    async fn run_once(yaml: &str) {
        let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
        let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
        engine.build().await.expect("Engine build should succeed");

        let channels = engine
            .system_channels()
            .expect("System channels should exist")
            .clone();

        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
        let engine_handle = tokio::spawn(async move {
            engine
                .run_with_signal(async move {
                    let _ = shutdown_rx.await;
                })
                .await
        });

        let event = Event::new("upsert", serde_json::json!({"title": "hello"}));
        channels.event.send(event).await.expect("Send event");

        // Give the pipeline time to process and write to SQLite
        tokio::time::sleep(Duration::from_millis(300)).await;

        let _ = shutdown_tx.send(());
        let result = engine_handle.await.expect("Engine task panicked");
        assert!(
            result.is_ok(),
            "Engine should shutdown cleanly: {:?}",
            result.err()
        );
    }

    // First run: INSERT
    run_once(&yaml).await;

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Reconnect to SQLite file");
    let (id1, created1, updated1, title1): (String, String, String, String) =
        sqlx::query_as("SELECT id, created_at, updated_at, title FROM test_events LIMIT 1")
            .fetch_one(&pool)
            .await
            .expect("Row should exist after first insert");
    pool.close().await;

    // Ensure time moves forward for updated_at (need >1s since $NOW is second-precision)
    tokio::time::sleep(Duration::from_millis(1100)).await;

    // Second run: UPSERT (update should NOT change created_at)
    run_once(&yaml).await;

    let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
        .await
        .expect("Reconnect to SQLite file");
    let (id2, created2, updated2, title2): (String, String, String, String) =
        sqlx::query_as("SELECT id, created_at, updated_at, title FROM test_events LIMIT 1")
            .fetch_one(&pool)
            .await
            .expect("Row should exist after upsert");

    let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM test_events")
        .fetch_one(&pool)
        .await
        .unwrap();
    pool.close().await;

    assert_eq!(count, 1, "Upsert should not create duplicate rows");
    assert_eq!(title1, "hello");
    assert_eq!(title2, "hello");
    assert_eq!(id1, id2, "Hash should produce stable id");
    assert_eq!(
        created1, created2,
        "insert_only column should not be updated"
    );
    assert_ne!(updated1, updated2, "updated_at should be updated on upsert");
}