use bsql::Pool;
use tokio_stream::StreamExt;
async fn pool() -> Pool {
Pool::connect("postgres://bsql:bsql@localhost/bsql_test")
.await
.expect("Failed to connect to test database. Is PostgreSQL running?")
}
#[tokio::test]
async fn stream_multiple_rows() {
let pool = pool().await;
let mut stream = bsql::query!("SELECT id, login FROM users WHERE active = true ORDER BY id")
.fetch_stream(&pool)
.await
.unwrap();
let mut rows = Vec::new();
while let Some(row) = stream.next().await {
rows.push(row.unwrap());
}
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].login, "alice");
assert_eq!(rows[1].login, "bob");
}
#[tokio::test]
async fn stream_single_row() {
let pool = pool().await;
let id = 1i32;
let mut stream = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_stream(&pool)
.await
.unwrap();
let row = stream.next().await.unwrap().unwrap();
assert_eq!(row.id, 1);
assert_eq!(row.login, "alice");
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn stream_zero_rows() {
let pool = pool().await;
let login = "nonexistent_user_for_stream_test";
let mut stream = bsql::query!("SELECT id, login FROM users WHERE login = $login: &str")
.fetch_stream(&pool)
.await
.unwrap();
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn stream_with_nullable_column() {
let pool = pool().await;
let mut stream = bsql::query!("SELECT id, middle_name FROM users ORDER BY id")
.fetch_stream(&pool)
.await
.unwrap();
let row = stream.next().await.unwrap().unwrap();
assert_eq!(row.id, 1);
assert!(row.middle_name.is_none());
}
#[tokio::test]
async fn stream_connection_returns_to_pool_on_drop() {
let pool = pool().await;
let before = pool.status();
{
let mut stream = bsql::query!("SELECT id, login FROM users ORDER BY id")
.fetch_stream(&pool)
.await
.unwrap();
let _row = stream.next().await.unwrap().unwrap();
}
let after = pool.status();
assert!(
after.available >= before.available,
"connection not returned: before={}, after={}",
before.available,
after.available,
);
}
#[tokio::test]
async fn stream_collect_all() {
let pool = pool().await;
let stream = bsql::query!("SELECT id, login FROM users WHERE active = true ORDER BY id")
.fetch_stream(&pool)
.await
.unwrap();
let rows: Vec<_> = stream
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].login, "alice");
assert_eq!(rows[1].login, "bob");
}
#[tokio::test]
async fn stream_with_params_and_join() {
let pool = pool().await;
let id = 1i32;
let mut stream = bsql::query!(
"SELECT t.id as ticket_id, t.title, u.login as creator
FROM tickets t
JOIN users u ON u.id = t.created_by_user_id
WHERE t.id = $id: i32"
)
.fetch_stream(&pool)
.await
.unwrap();
let row = stream.next().await.unwrap().unwrap();
assert_eq!(row.ticket_id, 1);
assert_eq!(row.title, "Fix login bug");
assert_eq!(row.creator, "alice");
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn stream_with_optional_clause() {
let pool = pool().await;
let dept: Option<i32> = None;
let mut stream = bsql::query!(
"SELECT id, title FROM tickets
WHERE deleted_at IS NULL
[AND department_id = $dept: Option<i32>]
ORDER BY id"
)
.fetch_stream(&pool)
.await
.unwrap();
let mut count = 0;
while let Some(row) = stream.next().await {
let _ = row.unwrap();
count += 1;
}
assert!(count >= 2);
}