#![cfg(feature = "database")]
use std::time::Duration;
use pipeflow::source::{
MessageSender, Source,
sql::{SqlDriver, SqlSource, SqlSourceConfig},
};
use serde_json::json;
use tokio::sync::broadcast;
#[tokio::test]
async fn test_sql_source_sqlite_rows() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query(
"CREATE TABLE events (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
active BOOLEAN NOT NULL
)",
)
.execute(&pool)
.await
.expect("Failed to create table");
sqlx::query("INSERT INTO events (id, name, active) VALUES (1, 'alpha', 1), (2, 'beta', 0)")
.execute(&pool)
.await
.expect("Failed to insert rows");
pool.close().await;
let config = SqlSourceConfig {
driver: SqlDriver::Sqlite,
connection: db_path.to_string(),
query: "SELECT id, name, active FROM events ORDER BY id".to_string(),
interval: Duration::from_millis(100),
schedule: None,
};
let source = SqlSource::new("sql_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
let msg1 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for row")
.expect("Failed to receive message");
let msg2 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for row")
.expect("Failed to receive message");
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for SQL source to shut down")
.expect("SQL source task panicked");
assert!(result.is_ok(), "SQL source should shut down cleanly");
assert_eq!(msg1.meta.source_node, "sql_source");
assert_eq!(msg2.meta.source_node, "sql_source");
assert_eq!(
msg1.payload,
json!({"id": 1, "name": "alpha", "active": true})
);
assert_eq!(
msg2.payload,
json!({"id": 2, "name": "beta", "active": false})
);
}
#[tokio::test]
async fn test_sql_source_empty_results() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY, name TEXT)")
.execute(&pool)
.await
.expect("Failed to create table");
pool.close().await;
let config = SqlSourceConfig {
driver: SqlDriver::Sqlite,
connection: db_path.to_string(),
query: "SELECT id, name FROM events".to_string(),
interval: Duration::from_millis(100),
schedule: None,
};
let source = SqlSource::new("sql_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
tokio::time::sleep(Duration::from_millis(50)).await;
let result = tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await;
assert!(result.is_err(), "Should timeout waiting for messages");
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for SQL source to shut down")
.expect("SQL source task panicked");
assert!(result.is_ok(), "SQL source should shut down cleanly");
}
#[tokio::test]
async fn test_sql_source_invalid_query() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY)")
.execute(&pool)
.await
.expect("Failed to create table");
pool.close().await;
let config = SqlSourceConfig {
driver: SqlDriver::Sqlite,
connection: db_path.to_string(),
query: "SELECT * FROM nonexistent_table".to_string(),
interval: Duration::from_millis(100),
schedule: None,
};
let source = SqlSource::new("sql_source", config).expect("Source should build");
let (sender, _receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(
!handle.is_finished(),
"Source should continue running after query error"
);
handle.abort();
}
#[tokio::test]
async fn test_sql_source_null_and_blob_values() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query(
"CREATE TABLE data (
id INTEGER PRIMARY KEY,
optional_text TEXT,
blob_data BLOB
)",
)
.execute(&pool)
.await
.expect("Failed to create table");
sqlx::query("INSERT INTO data (id, optional_text, blob_data) VALUES (1, NULL, X'DEADBEEF')")
.execute(&pool)
.await
.expect("Failed to insert row");
pool.close().await;
let config = SqlSourceConfig {
driver: SqlDriver::Sqlite,
connection: db_path.to_string(),
query: "SELECT id, optional_text, blob_data FROM data".to_string(),
interval: Duration::from_millis(100),
schedule: None,
};
let source = SqlSource::new("sql_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
let msg = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for row")
.expect("Failed to receive message");
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for SQL source to shut down")
.expect("SQL source task panicked");
assert!(result.is_ok(), "SQL source should shut down cleanly");
assert_eq!(msg.payload["id"], json!(1));
assert_eq!(msg.payload["optional_text"], json!(null));
assert_eq!(msg.payload["blob_data"], json!("0xdeadbeef"));
}
#[tokio::test]
async fn test_sql_source_polling_interval() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY)")
.execute(&pool)
.await
.expect("Failed to create table");
sqlx::query("INSERT INTO events (id) VALUES (1)")
.execute(&pool)
.await
.expect("Failed to insert row");
pool.close().await;
let config = SqlSourceConfig {
driver: SqlDriver::Sqlite,
connection: db_path.to_string(),
query: "SELECT id FROM events".to_string(),
interval: Duration::from_millis(200),
schedule: None,
};
let source = SqlSource::new("sql_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let start = std::time::Instant::now();
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
let _ = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for first poll")
.expect("Failed to receive first message");
let _ = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for second poll")
.expect("Failed to receive second message");
let elapsed = start.elapsed();
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for SQL source to shut down")
.expect("SQL source task panicked");
assert!(result.is_ok(), "SQL source should shut down cleanly");
assert!(
elapsed >= Duration::from_millis(200),
"Polling interval not respected: elapsed {:?}",
elapsed
);
}
#[tokio::test]
async fn test_sql_source_with_schedule() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_path = temp_file.path().to_str().unwrap();
let pool = sqlx::SqlitePool::connect(&format!("sqlite:{}", db_path))
.await
.expect("Failed to connect to SQLite file");
sqlx::query("CREATE TABLE events (id INTEGER PRIMARY KEY)")
.execute(&pool)
.await
.expect("Failed to create table");
sqlx::query("INSERT INTO events (id) VALUES (42)")
.execute(&pool)
.await
.expect("Failed to insert row");
pool.close().await;
let config = SqlSourceConfig {
driver: SqlDriver::Sqlite,
connection: db_path.to_string(),
query: "SELECT id FROM events".to_string(),
interval: Duration::ZERO, schedule: Some("* * * * * *".to_string()),
};
let source = SqlSource::new("sql_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
let msg = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for first scheduled poll")
.expect("Failed to receive message");
assert_eq!(msg.payload["id"], serde_json::json!(42));
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for SQL source to shut down")
.expect("SQL source task panicked");
assert!(result.is_ok(), "SQL source should shut down cleanly");
}