Skip to main content

awa_testing/
setup.rs

1//! Common test setup utilities for Awa integration tests.
2
3use sqlx::postgres::PgPoolOptions;
4use sqlx::PgPool;
5use std::collections::HashMap;
6use std::time::Duration;
7
8/// Default database URL for test runs.
9pub fn database_url() -> String {
10    std::env::var("DATABASE_URL")
11        .unwrap_or_else(|_| "postgres://postgres:test@localhost:15432/awa_test".to_string())
12}
13
14/// Database URL with a custom application_name parameter appended.
15pub fn database_url_with_app_name(app_name: &str) -> String {
16    let mut url = database_url();
17    let sep = if url.contains('?') { '&' } else { '?' };
18    url.push(sep);
19    url.push_str("application_name=");
20    url.push_str(app_name);
21    url
22}
23
24/// Create a connection pool.
25pub async fn pool(max_connections: u32) -> PgPool {
26    PgPoolOptions::new()
27        .max_connections(max_connections)
28        .connect(&database_url())
29        .await
30        .expect("Failed to connect to database")
31}
32
33/// Create a connection pool with a custom database URL.
34pub async fn pool_with_url(url: &str, max_connections: u32) -> PgPool {
35    PgPoolOptions::new()
36        .max_connections(max_connections)
37        .connect(url)
38        .await
39        .expect("Failed to connect to database")
40}
41
42/// Create a pool, run migrations, and return it.
43pub async fn setup(max_connections: u32) -> PgPool {
44    let pool = pool(max_connections).await;
45    awa_model::migrations::run(&pool)
46        .await
47        .expect("Failed to run migrations");
48    pool
49}
50
51/// Delete all jobs and queue metadata for a specific queue.
52pub async fn clean_queue(pool: &PgPool, queue: &str) {
53    sqlx::query("DELETE FROM awa.jobs WHERE queue = $1")
54        .bind(queue)
55        .execute(pool)
56        .await
57        .expect("Failed to clean queue jobs");
58    sqlx::query("DELETE FROM awa.queue_meta WHERE queue = $1")
59        .bind(queue)
60        .execute(pool)
61        .await
62        .expect("Failed to clean queue meta");
63}
64
65/// Query job state counts for a queue, returning a map of state -> count.
66pub async fn queue_state_counts(pool: &PgPool, queue: &str) -> HashMap<String, i64> {
67    let rows: Vec<(String, i64)> = sqlx::query_as(
68        r#"
69        SELECT state::text, count(*)::bigint
70        FROM awa.jobs
71        WHERE queue = $1
72        GROUP BY state
73        "#,
74    )
75    .bind(queue)
76    .fetch_all(pool)
77    .await
78    .expect("Failed to query state counts");
79
80    rows.into_iter().collect()
81}
82
83/// Extract a count for a given state from a state-counts map.
84pub fn state_count(counts: &HashMap<String, i64>, state: &str) -> i64 {
85    counts.get(state).copied().unwrap_or(0)
86}
87
88/// Poll queue state counts until a predicate is satisfied, or panic on timeout.
89pub async fn wait_for_counts(
90    pool: &PgPool,
91    queue: &str,
92    predicate: impl Fn(&HashMap<String, i64>) -> bool,
93    timeout: Duration,
94) -> HashMap<String, i64> {
95    let start = std::time::Instant::now();
96    loop {
97        let counts = queue_state_counts(pool, queue).await;
98        if predicate(&counts) {
99            return counts;
100        }
101        assert!(
102            start.elapsed() < timeout,
103            "Timed out waiting for queue {queue} counts; last counts: {counts:?}"
104        );
105        tokio::time::sleep(Duration::from_millis(50)).await;
106    }
107}