use bsql::Pool;
async fn pool() -> Pool {
Pool::connect("postgres://bsql:bsql@localhost/bsql_test")
.await
.expect("Failed to connect to test database. Is PostgreSQL running?")
}
#[tokio::test]
async fn singleflight_fetch_one_works() {
let pool = pool().await;
let id = 1i32;
let user = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(user.id, 1);
assert_eq!(user.login, "alice");
}
#[tokio::test]
async fn singleflight_fetch_all_works() {
let pool = pool().await;
let users = bsql::query!("SELECT id, login FROM users ORDER BY id")
.fetch_all(&pool)
.await
.unwrap();
assert!(users.len() >= 2);
assert_eq!(users[0].login, "alice");
}
#[tokio::test]
async fn concurrent_identical_queries_all_succeed() {
let pool = pool().await;
let mut handles = Vec::new();
for _ in 0..10 {
let pool_ref = &pool;
handles.push(async move {
bsql::query!("SELECT id, login FROM users ORDER BY id")
.fetch_all(pool_ref)
.await
});
}
let results = futures_core_join_all(handles).await;
for result in results {
let users = result.unwrap();
assert!(users.len() >= 2);
assert_eq!(users[0].login, "alice");
}
}
#[tokio::test]
async fn parameterized_query_works_with_singleflight() {
let pool = pool().await;
let id = 1i32;
let user = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(user.id, 1);
let id = 2i32;
let user = bsql::query!("SELECT id, login FROM users WHERE id = $id: i32")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(user.id, 2);
}
#[tokio::test]
async fn transaction_queries_are_not_coalesced() {
let pool = pool().await;
let txn = pool.begin().await.unwrap();
let users = bsql::query!("SELECT id, login FROM users ORDER BY id")
.fetch_all(&txn)
.await
.unwrap();
assert!(users.len() >= 2);
txn.rollback().await.unwrap();
}
#[tokio::test]
async fn pool_connection_queries_not_coalesced() {
let pool = pool().await;
let conn = pool.acquire().await.unwrap();
let users = bsql::query!("SELECT id, login FROM users ORDER BY id")
.fetch_all(&conn)
.await
.unwrap();
assert!(users.len() >= 2);
}
#[tokio::test]
async fn execute_not_affected_by_singleflight() {
let pool = pool().await;
let desc = "singleflight-test-desc";
let id = 1i32;
let affected = bsql::query!("UPDATE tickets SET description = $desc: &str WHERE id = $id: i32")
.execute(&pool)
.await
.unwrap();
assert_eq!(affected, 1);
}
async fn futures_core_join_all<F, T>(futures: Vec<F>) -> Vec<T>
where
F: std::future::Future<Output = T>,
{
let mut results = Vec::with_capacity(futures.len());
for fut in futures {
results.push(fut.await);
}
results
}