use bsql::{Listener, Pool};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
const DB_URL: &str = "postgres://bsql:bsql@localhost/bsql_test";
async fn pool() -> Pool {
Pool::connect(DB_URL)
.await
.expect("Failed to connect to test database. Is PostgreSQL running?")
}
fn unique_channel(prefix: &str) -> String {
static COUNTER: AtomicU64 = AtomicU64::new(0);
format!(
"async_{}_{}",
prefix,
COUNTER.fetch_add(1, Ordering::Relaxed)
)
}
#[tokio::test]
async fn async_pool_connect() {
let pool = Pool::connect(DB_URL).await;
assert!(pool.is_ok(), "Pool::connect().await should succeed");
let pool = pool.unwrap();
let status = pool.status();
assert!(status.max_size > 0, "pool should have positive max_size");
let id = 1i32;
let user = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(user.id, 1);
assert_eq!(user.login, "alice");
}
#[tokio::test]
async fn async_concurrent_queries() {
let pool = Arc::new(pool().await);
let mut handles = Vec::new();
for task_id in 0..5u32 {
let pool = Arc::clone(&pool);
handles.push(tokio::spawn(async move {
for query_idx in 0..10u32 {
let id = 1i32;
let user = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_one(pool.as_ref())
.await;
match user {
Ok(user) => {
assert_eq!(user.id, 1, "task {task_id} query {query_idx}: id mismatch");
assert_eq!(
user.login, "alice",
"task {task_id} query {query_idx}: login mismatch"
);
}
Err(bsql::BsqlError::Pool(_)) => {
}
Err(e) => panic!("task {task_id} query {query_idx}: unexpected error: {e}"),
}
}
task_id
}));
}
for handle in handles {
let task_id = handle.await.expect("task panicked");
assert!(task_id < 5);
}
let id = 2i32;
let user = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_one(pool.as_ref())
.await
.unwrap();
assert_eq!(user.id, 2);
assert_eq!(user.login, "bob");
}
#[tokio::test]
async fn async_transaction_commit_await() {
let pool = pool().await;
let mut tx = pool.begin().await.unwrap();
let title = "async_commit_test";
let uid = 1i32;
let ticket = bsql::query!(
"INSERT INTO tickets (title, status, created_by_user_id)
VALUES ($title: &str, 'new', $uid: i32)
RETURNING id"
)
.fetch_one(&mut tx)
.await
.unwrap();
let ticket_id = ticket.id;
tx.commit().await.unwrap();
let found = bsql::query!("SELECT id, title FROM tickets WHERE id = $ticket_id: i32")
.fetch_optional(&pool)
.await
.unwrap();
assert!(
found.is_some(),
"committed row should persist after tx.commit().await"
);
assert_eq!(found.unwrap().title, "async_commit_test");
bsql::query!("DELETE FROM tickets WHERE id = $ticket_id: i32")
.execute(&pool)
.await
.unwrap();
}
#[tokio::test]
async fn async_transaction_rollback_await() {
let pool = pool().await;
let mut tx = pool.begin().await.unwrap();
let title = "async_rollback_test";
let uid = 1i32;
let ticket = bsql::query!(
"INSERT INTO tickets (title, status, created_by_user_id)
VALUES ($title: &str, 'new', $uid: i32)
RETURNING id"
)
.fetch_one(&mut tx)
.await
.unwrap();
let ticket_id = ticket.id;
tx.rollback().await.unwrap();
let found = bsql::query!("SELECT id FROM tickets WHERE id = $ticket_id: i32")
.fetch_optional(&pool)
.await
.unwrap();
assert!(
found.is_none(),
"rolled-back row should NOT persist after tx.rollback().await"
);
}
#[tokio::test]
async fn async_fetch_stream() {
let pool = pool().await;
let mut stream = bsql::query!("SELECT id, login FROM users ORDER BY id")
.fetch_stream(&pool)
.await
.unwrap();
let mut rows = Vec::new();
while let Some(user) = stream.next().await.unwrap() {
rows.push((user.id, user.login.clone()));
}
assert!(
rows.len() >= 2,
"expected at least 2 users, got {}",
rows.len()
);
assert_eq!(rows[0].1, "alice");
assert_eq!(rows[1].1, "bob");
}
#[tokio::test]
async fn async_listener_recv() {
let ch = unique_channel("listener_test");
let mut listener = Listener::connect(DB_URL).await.unwrap();
listener.listen(&ch).await.unwrap();
listener.notify(&ch, "async_hello").await.unwrap();
let notif = listener.recv().await.unwrap();
assert_eq!(notif.channel(), ch);
assert_eq!(notif.payload(), "async_hello");
}