1use sqlx::postgres::PgPoolOptions;
4use sqlx::PgPool;
5use std::collections::HashMap;
6use std::time::Duration;
7
8pub fn database_url() -> String {
10 std::env::var("DATABASE_URL")
11 .unwrap_or_else(|_| "postgres://postgres:test@localhost:15432/awa_test".to_string())
12}
13
14pub fn database_url_with_app_name(app_name: &str) -> String {
16 let mut url = database_url();
17 let sep = if url.contains('?') { '&' } else { '?' };
18 url.push(sep);
19 url.push_str("application_name=");
20 url.push_str(app_name);
21 url
22}
23
24pub async fn pool(max_connections: u32) -> PgPool {
26 PgPoolOptions::new()
27 .max_connections(max_connections)
28 .connect(&database_url())
29 .await
30 .expect("Failed to connect to database")
31}
32
33pub async fn pool_with_url(url: &str, max_connections: u32) -> PgPool {
35 PgPoolOptions::new()
36 .max_connections(max_connections)
37 .connect(url)
38 .await
39 .expect("Failed to connect to database")
40}
41
42pub async fn setup(max_connections: u32) -> PgPool {
44 let pool = pool(max_connections).await;
45 awa_model::migrations::run(&pool)
46 .await
47 .expect("Failed to run migrations");
48 pool
49}
50
51pub async fn clean_queue(pool: &PgPool, queue: &str) {
53 sqlx::query("DELETE FROM awa.jobs WHERE queue = $1")
54 .bind(queue)
55 .execute(pool)
56 .await
57 .expect("Failed to clean queue jobs");
58 sqlx::query("DELETE FROM awa.queue_meta WHERE queue = $1")
59 .bind(queue)
60 .execute(pool)
61 .await
62 .expect("Failed to clean queue meta");
63}
64
65pub async fn queue_state_counts(pool: &PgPool, queue: &str) -> HashMap<String, i64> {
67 let rows: Vec<(String, i64)> = sqlx::query_as(
68 r#"
69 SELECT state::text, count(*)::bigint
70 FROM awa.jobs
71 WHERE queue = $1
72 GROUP BY state
73 "#,
74 )
75 .bind(queue)
76 .fetch_all(pool)
77 .await
78 .expect("Failed to query state counts");
79
80 rows.into_iter().collect()
81}
82
83pub fn state_count(counts: &HashMap<String, i64>, state: &str) -> i64 {
85 counts.get(state).copied().unwrap_or(0)
86}
87
88pub async fn wait_for_counts(
90 pool: &PgPool,
91 queue: &str,
92 predicate: impl Fn(&HashMap<String, i64>) -> bool,
93 timeout: Duration,
94) -> HashMap<String, i64> {
95 let start = std::time::Instant::now();
96 loop {
97 let counts = queue_state_counts(pool, queue).await;
98 if predicate(&counts) {
99 return counts;
100 }
101 assert!(
102 start.elapsed() < timeout,
103 "Timed out waiting for queue {queue} counts; last counts: {counts:?}"
104 );
105 tokio::time::sleep(Duration::from_millis(50)).await;
106 }
107}