sqlx_core_oldapi/postgres/testing/
mod.rs

1use std::fmt::Write;
2use std::str::FromStr;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use futures_core::future::BoxFuture;
7
8use once_cell::sync::OnceCell;
9
10use crate::connection::Connection;
11
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::pool::{Pool, PoolOptions};
15use crate::postgres::{PgConnectOptions, PgConnection, Postgres};
16use crate::query::query;
17use crate::query_scalar::query_scalar;
18use crate::testing::{FixtureSnapshot, TestArgs, TestContext, TestSupport};
19
20// Using a blocking `OnceCell` here because the critical sections are short.
21static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
22// Automatically delete any databases created before the start of the test binary.
23static DO_CLEANUP: AtomicBool = AtomicBool::new(true);
24
25impl TestSupport for Postgres {
26    fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
27        Box::pin(test_context(args))
28    }
29
30    fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
31        Box::pin(async move {
32            let mut conn = MASTER_POOL
33                .get()
34                .expect("cleanup_test() invoked outside `#[sqlx::test]")
35                .acquire()
36                .await?;
37
38            conn.execute(&format!("drop database if exists {0:?};", db_name)[..])
39                .await?;
40
41            query("delete from _sqlx_test.databases where db_name = $1")
42                .bind(db_name)
43                .execute(&mut conn)
44                .await?;
45
46            Ok(())
47        })
48    }
49
50    fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
51        Box::pin(async move {
52            let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
53
54            let mut conn = PgConnection::connect(&url).await?;
55            let num_deleted = do_cleanup(&mut conn).await?;
56            let _ = conn.close().await;
57            Ok(Some(num_deleted))
58        })
59    }
60
61    fn snapshot(
62        _conn: &mut Self::Connection,
63    ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
64        // TODO: I want to get the testing feature out the door so this will have to wait,
65        // but I'm keeping the code around for now because I plan to come back to it.
66        todo!()
67    }
68}
69
70async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
71    let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
72
73    let master_opts = PgConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
74
75    let pool = PoolOptions::new()
76        // Postgres' normal connection limit is 100 plus 3 superuser connections
77        // We don't want to use the whole cap and there may be fuzziness here due to
78        // concurrently running tests anyway.
79        .max_connections(20)
80        // Immediately close master connections. Tokio's I/O streams don't like hopping runtimes.
81        .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
82        .connect_lazy_with(master_opts);
83
84    let master_pool = match MASTER_POOL.try_insert(pool) {
85        Ok(inserted) => inserted,
86        Err((existing, pool)) => {
87            // Sanity checks.
88            assert_eq!(
89                existing.connect_options().host,
90                pool.connect_options().host,
91                "DATABASE_URL changed at runtime, host differs"
92            );
93
94            assert_eq!(
95                existing.connect_options().database,
96                pool.connect_options().database,
97                "DATABASE_URL changed at runtime, database differs"
98            );
99
100            existing
101        }
102    };
103
104    let mut conn = master_pool.acquire().await?;
105
106    // language=PostgreSQL
107    conn.execute(
108        // Explicit lock avoids this latent bug: https://stackoverflow.com/a/29908840
109        // I couldn't find a bug on the mailing list for `CREATE SCHEMA` specifically,
110        // but a clearly related bug with `CREATE TABLE` has been known since 2007:
111        // https://www.postgresql.org/message-id/200710222037.l9MKbCJZ098744%40wwwmaster.postgresql.org
112        r#"
113        lock table pg_catalog.pg_namespace in share row exclusive mode;
114
115        create schema if not exists _sqlx_test;
116
117        create table if not exists _sqlx_test.databases (
118            db_name text primary key,
119            test_path text not null,
120            created_at timestamptz not null default now()
121        );
122
123        create index if not exists databases_created_at 
124            on _sqlx_test.databases(created_at);
125
126        create sequence if not exists _sqlx_test.database_ids;
127    "#,
128    )
129    .await?;
130
131    // Only run cleanup if the test binary just started.
132    if DO_CLEANUP.swap(false, Ordering::SeqCst) {
133        do_cleanup(&mut conn).await?;
134    }
135
136    let new_db_name: String = query_scalar(
137        r#"
138            insert into _sqlx_test.databases(db_name, test_path)
139            select '_sqlx_test_' || nextval('_sqlx_test.database_ids'), $1
140            returning db_name
141        "#,
142    )
143    .bind(args.test_path)
144    .fetch_one(&mut conn)
145    .await?;
146
147    conn.execute(&format!("create database {:?}", new_db_name)[..])
148        .await?;
149
150    Ok(TestContext {
151        pool_opts: PoolOptions::new()
152            // Don't allow a single test to take all the connections.
153            // Most tests shouldn't require more than 5 connections concurrently,
154            // or else they're likely doing too much in one test.
155            .max_connections(5)
156            // Close connections ASAP if left in the idle queue.
157            .idle_timeout(Some(Duration::from_secs(1)))
158            .parent(master_pool.clone()),
159        connect_opts: master_pool.connect_options().clone().database(&new_db_name),
160        db_name: new_db_name,
161    })
162}
163
164async fn do_cleanup(conn: &mut PgConnection) -> Result<usize, Error> {
165    let delete_db_names: Vec<String> =
166        query_scalar("select db_name from _sqlx_test.databases where created_at < now()")
167            .fetch_all(&mut *conn)
168            .await?;
169
170    if delete_db_names.is_empty() {
171        return Ok(0);
172    }
173
174    let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
175    let delete_db_names = delete_db_names.into_iter();
176
177    let mut command = String::new();
178
179    for db_name in delete_db_names {
180        command.clear();
181        writeln!(command, "drop database if exists {:?};", db_name).ok();
182        match conn.execute(&*command).await {
183            Ok(_deleted) => {
184                deleted_db_names.push(db_name);
185            }
186            // Assume a database error just means the DB is still in use.
187            Err(Error::Database(dbe)) => {
188                eprintln!("could not clean test database {:?}: {}", db_name, dbe)
189            }
190            // Bubble up other errors
191            Err(e) => return Err(e),
192        }
193    }
194
195    query("delete from _sqlx_test.databases where db_name = any($1::text[])")
196        .bind(&deleted_db_names)
197        .execute(&mut *conn)
198        .await?;
199
200    Ok(deleted_db_names.len())
201}