#![cfg(feature = "database")]
use pipeflow::common::types::Event;
use pipeflow::config::Config;
use pipeflow::engine::Engine;
use std::time::Duration;
mod common;
#[tokio::test]
async fn test_sql_sink_sqlite_memory() {
let yaml = r#"
pipeline:
transforms:
- id: pass_through
inputs: [source::system::event]
outputs: [sql_sink]
sinks:
- id: sql_sink
type: sql
config:
driver: sqlite
connection: ":memory:"
table: events
columns:
- name: id
value: "$UUID"
- name: timestamp
value: "$NOW"
- name: source
from: "$.source"
- name: value
from: "$.count"
"#;
let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
let result = engine.build().await;
assert!(result.is_ok(), "Build failed: {:?}", result.err());
}
#[tokio::test]
async fn test_sql_sink_config_validation_no_columns() {
let yaml = r#"
pipeline:
transforms:
- id: pass_through
inputs: [source::system::event]
outputs: [sql_sink]
sinks:
- id: sql_sink
type: sql
config:
driver: sqlite
connection: "test.db"
table: events
columns: []
"#;
let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
let result = engine.build().await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("at least one column")
);
}
#[tokio::test]
async fn test_sql_sink_config_validation_invalid_column() {
let yaml = r#"
pipeline:
transforms:
- id: pass_through
inputs: [source::system::event]
outputs: [sql_sink]
sinks:
- id: sql_sink
type: sql
config:
driver: sqlite
connection: "test.db"
table: events
columns:
- name: bad_column
"#;
let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
let result = engine.build().await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("from") || err_msg.contains("value"),
"Expected error about missing from/value, got: {}",
err_msg
);
}
#[tokio::test]
async fn test_sql_sink_sqlite_file() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query(
"CREATE TABLE test_events (
id TEXT PRIMARY KEY,
message TEXT,
count INTEGER
)",
)
.execute(&pool)
.await
.expect("Failed to create table");
pool.close().await;
let yaml = format!(
r#"
pipeline:
transforms:
- id: pass_through
inputs: [source::system::event]
outputs: [sql_sink]
sinks:
- id: sql_sink
type: sql
config:
driver: sqlite
connection: "{}"
table: test_events
columns:
- name: id
value: "$UUID"
- name: message
from: "$.message"
- name: count
from: "$.count"
"#,
db_path
);
let config = Config::from_yaml(&yaml).expect("Config parsing should succeed");
let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
let result = engine.build().await;
assert!(result.is_ok(), "Build failed: {:?}", result.err());
}
#[tokio::test]
async fn test_sql_sink_upsert_insert_only_with_hash_transform() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query(
"CREATE TABLE test_events (
id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
title TEXT
)",
)
.execute(&pool)
.await
.expect("Failed to create table");
pool.close().await;
let yaml = format!(
r#"
pipeline:
transforms:
- id: add_id
inputs: [source::system::event]
outputs: [sql_sink]
steps:
- type: hash
config:
mappings:
- algo: sha256
from: "$.payload.title"
to: "$.id"
sinks:
- id: sql_sink
type: sql
config:
driver: sqlite
connection: "{}"
table: test_events
upsert:
conflict_columns: ["id"]
columns:
- name: id
from: "$.id"
- name: created_at
value: "$NOW"
insert_only: true
- name: updated_at
value: "$NOW"
- name: title
from: "$.payload.title"
"#,
db_path
);
async fn run_once(yaml: &str) {
let config = Config::from_yaml(yaml).expect("Config parsing should succeed");
let mut engine = Engine::from_config(config).expect("Engine creation should succeed");
engine.build().await.expect("Engine build should succeed");
let channels = engine
.system_channels()
.expect("System channels should exist")
.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let engine_handle = tokio::spawn(async move {
engine
.run_with_signal(async move {
let _ = shutdown_rx.await;
})
.await
});
let event = Event::new("upsert", serde_json::json!({"title": "hello"}));
channels.event.send(event).await.expect("Send event");
tokio::time::sleep(Duration::from_millis(300)).await;
let _ = shutdown_tx.send(());
let result = engine_handle.await.expect("Engine task panicked");
assert!(
result.is_ok(),
"Engine should shutdown cleanly: {:?}",
result.err()
);
}
run_once(&yaml).await;
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Reconnect to SQLite file");
let (id1, created1, updated1, title1): (String, String, String, String) =
sqlx::query_as("SELECT id, created_at, updated_at, title FROM test_events LIMIT 1")
.fetch_one(&pool)
.await
.expect("Row should exist after first insert");
pool.close().await;
tokio::time::sleep(Duration::from_millis(1100)).await;
run_once(&yaml).await;
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Reconnect to SQLite file");
let (id2, created2, updated2, title2): (String, String, String, String) =
sqlx::query_as("SELECT id, created_at, updated_at, title FROM test_events LIMIT 1")
.fetch_one(&pool)
.await
.expect("Row should exist after upsert");
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM test_events")
.fetch_one(&pool)
.await
.unwrap();
pool.close().await;
assert_eq!(count, 1, "Upsert should not create duplicate rows");
assert_eq!(title1, "hello");
assert_eq!(title2, "hello");
assert_eq!(id1, id2, "Hash should produce stable id");
assert_eq!(
created1, created2,
"insert_only column should not be updated"
);
assert_ne!(updated1, updated2, "updated_at should be updated on upsert");
}