use super::*;
use anyhow::Context;
use easy_macros::always_context;
use futures::StreamExt;
use easy_sql_macros::{query, query_lazy};
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_select_basic() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_test_data(&mut conn, default_expr_test_data()).await?;
let mut lazy_query =
query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = 1)?;
let mut result_stream = lazy_query.fetch(&mut conn);
let result_option = result_stream.next().await;
let result_result = result_option.context("Expected at least one result")?;
let result = result_result.context("Failed to fetch result")?;
assert_eq!(result.int_field, 42);
assert_eq!(result.str_field, "test");
drop(result_stream);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_with_sqlx_pool_connection() -> anyhow::Result<()> {
let pool_resource = setup_sqlx_pool_for_testing::<ExprTestTable>().await?;
let mut pool = pool_resource.pool();
let data = default_expr_test_data();
query!(pool, INSERT INTO ExprTestTable VALUES {data}).await?;
let mut lazy_query =
query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = 1)?;
let mut result_stream = lazy_query.fetch(pool);
let result_option = result_stream.next().await;
let result_result = result_option.context("Expected at least one result")?;
let result = result_result.context("Failed to fetch result")?;
assert_eq!(result.int_field, 42);
assert_eq!(result.str_field, "test");
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_con_not_borrowed_forever_check() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_multiple_test_data(
&mut conn,
vec![
expr_test_data(10, "first", true, None),
expr_test_data(20, "second", false, None),
],
)
.await?;
let mut lazy_query1 =
query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = 1)?;
let mut lazy_query2 =
query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = 1)?;
let result1 = {
let mut stream1 = lazy_query1.fetch(&mut conn);
let result1_option = stream1.next().await;
let result1_result = result1_option.context("Expected result 1")?;
result1_result.context("Failed to fetch result 1")?
};
let result2 = {
let mut stream2 = lazy_query2.fetch(&mut conn);
let result2_option = stream2.next().await;
let result2_result = result2_option.context("Expected result 2")?;
result2_result.context("Failed to fetch result 2")?
};
assert_eq!(result1.int_field, result2.int_field);
assert_eq!(result1.int_field, 10);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_select_multiple() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_multiple_test_data(
&mut conn,
vec![
expr_test_data(10, "a", true, None),
expr_test_data(20, "b", false, None),
expr_test_data(30, "c", true, None),
],
)
.await?;
let mut lazy_query = query_lazy!(
SELECT ExprTestData FROM ExprTestTable WHERE int_field > 5
)?;
let mut results = Vec::new();
{
let mut stream = lazy_query.fetch(&mut conn);
while let Some(result) = stream.next().await {
let data = result.context("Failed to fetch row")?;
results.push(data);
}
}
assert_eq!(results.len(), 3);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_select_complex() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_multiple_test_data(
&mut conn,
vec![
expr_test_data(30, "c", true, None),
expr_test_data(10, "a", false, None),
expr_test_data(20, "b", true, None),
],
)
.await?;
let mut lazy_query = query_lazy!(
SELECT ExprTestData FROM ExprTestTable
ORDER BY int_field DESC
LIMIT 2
)?;
let mut results = Vec::new();
{
let mut stream = lazy_query.fetch(&mut conn);
while let Some(result) = stream.next().await {
let data = result.context("Failed to fetch row")?;
results.push(data);
}
}
assert_eq!(results.len(), 2);
assert_eq!(results[0].int_field, 30);
assert_eq!(results[1].int_field, 20);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_insert() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
let data = default_expr_test_data();
let mut lazy_insert =
query_lazy!(INSERT INTO ExprTestTable VALUES {data} RETURNING ExprTestData)?;
let returned = {
let mut stream = lazy_insert.fetch(&mut conn);
let returned_option = stream.next().await;
let returned_result = returned_option.context("Expected INSERT to return row")?;
returned_result.context("Failed to get INSERT result")?
};
assert_eq!(returned.int_field, 42);
let results: Vec<ExprTestData> = query!(&mut conn,
SELECT Vec<ExprTestData> FROM ExprTestTable
)
.await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].int_field, 42);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_update() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_test_data(&mut conn, default_expr_test_data()).await?;
let new_value = 100;
let mut lazy_update = query_lazy!(
UPDATE ExprTestTable
SET int_field = {new_value}
WHERE id = 1
RETURNING ExprTestData
)?;
let updated = {
let mut stream = lazy_update.fetch(&mut conn);
let result_option = stream.next().await;
let result_result = result_option.context("Expected UPDATE to return row")?;
result_result.context("Failed to get UPDATE result")?
};
assert_eq!(updated.int_field, 100);
assert_eq!(updated.str_field, "test");
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_delete() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_multiple_test_data(
&mut conn,
vec![
expr_test_data(10, "a", true, None),
expr_test_data(20, "b", false, None),
expr_test_data(30, "c", true, None),
],
)
.await?;
let mut lazy_delete = query_lazy!(
DELETE FROM ExprTestTable
WHERE int_field > 15
RETURNING ExprTestData
)?;
let mut deleted_rows = Vec::new();
{
let mut stream = lazy_delete.fetch(&mut conn);
while let Some(result) = stream.next().await {
let data = result.context("Failed to fetch deleted row")?;
deleted_rows.push(data);
}
}
assert_eq!(deleted_rows.len(), 2);
assert!(deleted_rows.iter().any(|r| r.int_field == 20));
assert!(deleted_rows.iter().any(|r| r.int_field == 30));
let remaining: Vec<ExprTestData> = query!(&mut conn,
SELECT Vec<ExprTestData> FROM ExprTestTable
)
.await?;
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].int_field, 10);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_variable_capture() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_multiple_test_data(
&mut conn,
vec![
expr_test_data(10, "a", true, None),
expr_test_data(20, "b", false, None),
expr_test_data(30, "c", true, None),
],
)
.await?;
let threshold = 15;
let mut lazy_query = query_lazy!(
SELECT ExprTestData FROM ExprTestTable WHERE int_field > {threshold}
)?;
let mut results = Vec::new();
{
let mut stream = lazy_query.fetch(&mut conn);
while let Some(result) = stream.next().await {
let data = result.context("Failed to fetch row")?;
results.push(data);
}
}
assert_eq!(results.len(), 2);
assert_eq!(results[0].int_field, 20);
assert_eq!(results[1].int_field, 30);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_storage() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_test_data(&mut conn, default_expr_test_data()).await?;
let mut my_query =
query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = 1)?;
let result = {
let mut stream = my_query.fetch(&mut conn);
let result_option = stream.next().await;
let result_result = result_option.context("Expected result")?;
result_result.context("Failed to fetch result")?
};
assert_eq!(result.int_field, 42);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_complex_where() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_multiple_test_data(
&mut conn,
vec![
expr_test_data(10, "test", true, None),
expr_test_data(20, "test", false, None),
expr_test_data(30, "other", true, None),
],
)
.await?;
let mut lazy_query = query_lazy!(
SELECT ExprTestData FROM ExprTestTable
WHERE (int_field >= 10 AND int_field <= 30)
AND str_field = "test"
AND bool_field = true
)?;
let mut results = Vec::new();
{
let mut stream = lazy_query.fetch(&mut conn);
while let Some(result) = stream.next().await {
let data = result.context("Failed to fetch row")?;
results.push(data);
}
}
assert_eq!(results.len(), 1);
assert_eq!(results[0].int_field, 10);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_with_returning() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
let data = default_expr_test_data();
let mut lazy_insert = query_lazy!(
INSERT INTO ExprTestTable VALUES {data} RETURNING ExprTestData
)?;
let returned = {
let mut stream = lazy_insert.fetch(&mut conn);
let returned_option = stream.next().await;
let returned_result = returned_option.context("Expected INSERT to return row")?;
returned_result.context("Failed to get INSERT result")?
};
assert_eq!(returned.int_field, 42);
assert_eq!(returned.str_field, "test");
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_multiple_sequential() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
let data = default_expr_test_data();
query!(&mut conn, INSERT INTO ExprTestTable VALUES {data}).await?;
let new_value = 999;
let mut lazy_update = query_lazy!(
UPDATE ExprTestTable
SET int_field = {new_value}
WHERE id = 1
RETURNING ExprTestData
)?;
let updated = {
let mut stream = lazy_update.fetch(&mut conn);
let result_option = stream.next().await;
let result_result = result_option.context("Expected UPDATE to return row")?;
result_result.context("Failed to get UPDATE result")?
};
assert_eq!(updated.int_field, 999);
let mut lazy_select =
query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = 1)?;
let result = {
let mut stream = lazy_select.fetch(&mut conn);
let result_option = stream.next().await;
let result_result = result_option.context("Expected SELECT result")?;
result_result.context("Failed to fetch result")?
};
assert_eq!(result.int_field, 999);
conn.rollback().await?;
Ok(())
}
#[always_context(skip(!))]
#[tokio::test]
async fn test_query_lazy_cross_scope() -> anyhow::Result<()> {
let db = Database::setup_for_testing::<ExprTestTable>().await?;
let mut conn = db.transaction().await?;
insert_test_data(&mut conn, default_expr_test_data()).await?;
let filter_value = 1;
let mut lazy_query = query_lazy!(SELECT ExprTestData FROM ExprTestTable WHERE ExprTestTable.id = {filter_value})?;
let result = {
let mut stream = lazy_query.fetch(&mut conn);
let result_option = stream.next().await;
let result_result = result_option.context("Expected result")?;
result_result.context("Failed to fetch result")?
};
assert_eq!(result.int_field, 42);
conn.rollback().await?;
Ok(())
}