sqlx_core_oldapi/postgres/testing/
mod.rs1use std::fmt::Write;
2use std::str::FromStr;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use futures_core::future::BoxFuture;
7
8use once_cell::sync::OnceCell;
9
10use crate::connection::Connection;
11
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::pool::{Pool, PoolOptions};
15use crate::postgres::{PgConnectOptions, PgConnection, Postgres};
16use crate::query::query;
17use crate::query_scalar::query_scalar;
18use crate::testing::{FixtureSnapshot, TestArgs, TestContext, TestSupport};
19
20static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
22static DO_CLEANUP: AtomicBool = AtomicBool::new(true);
24
25impl TestSupport for Postgres {
26 fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
27 Box::pin(test_context(args))
28 }
29
30 fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
31 Box::pin(async move {
32 let mut conn = MASTER_POOL
33 .get()
34 .expect("cleanup_test() invoked outside `#[sqlx::test]")
35 .acquire()
36 .await?;
37
38 conn.execute(&format!("drop database if exists {0:?};", db_name)[..])
39 .await?;
40
41 query("delete from _sqlx_test.databases where db_name = $1")
42 .bind(db_name)
43 .execute(&mut conn)
44 .await?;
45
46 Ok(())
47 })
48 }
49
50 fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
51 Box::pin(async move {
52 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
53
54 let mut conn = PgConnection::connect(&url).await?;
55 let num_deleted = do_cleanup(&mut conn).await?;
56 let _ = conn.close().await;
57 Ok(Some(num_deleted))
58 })
59 }
60
61 fn snapshot(
62 _conn: &mut Self::Connection,
63 ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
64 todo!()
67 }
68}
69
70async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
71 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
72
73 let master_opts = PgConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
74
75 let pool = PoolOptions::new()
76 .max_connections(20)
80 .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
82 .connect_lazy_with(master_opts);
83
84 let master_pool = match MASTER_POOL.try_insert(pool) {
85 Ok(inserted) => inserted,
86 Err((existing, pool)) => {
87 assert_eq!(
89 existing.connect_options().host,
90 pool.connect_options().host,
91 "DATABASE_URL changed at runtime, host differs"
92 );
93
94 assert_eq!(
95 existing.connect_options().database,
96 pool.connect_options().database,
97 "DATABASE_URL changed at runtime, database differs"
98 );
99
100 existing
101 }
102 };
103
104 let mut conn = master_pool.acquire().await?;
105
106 conn.execute(
108 r#"
113 lock table pg_catalog.pg_namespace in share row exclusive mode;
114
115 create schema if not exists _sqlx_test;
116
117 create table if not exists _sqlx_test.databases (
118 db_name text primary key,
119 test_path text not null,
120 created_at timestamptz not null default now()
121 );
122
123 create index if not exists databases_created_at
124 on _sqlx_test.databases(created_at);
125
126 create sequence if not exists _sqlx_test.database_ids;
127 "#,
128 )
129 .await?;
130
131 if DO_CLEANUP.swap(false, Ordering::SeqCst) {
133 do_cleanup(&mut conn).await?;
134 }
135
136 let new_db_name: String = query_scalar(
137 r#"
138 insert into _sqlx_test.databases(db_name, test_path)
139 select '_sqlx_test_' || nextval('_sqlx_test.database_ids'), $1
140 returning db_name
141 "#,
142 )
143 .bind(args.test_path)
144 .fetch_one(&mut conn)
145 .await?;
146
147 conn.execute(&format!("create database {:?}", new_db_name)[..])
148 .await?;
149
150 Ok(TestContext {
151 pool_opts: PoolOptions::new()
152 .max_connections(5)
156 .idle_timeout(Some(Duration::from_secs(1)))
158 .parent(master_pool.clone()),
159 connect_opts: master_pool.connect_options().clone().database(&new_db_name),
160 db_name: new_db_name,
161 })
162}
163
164async fn do_cleanup(conn: &mut PgConnection) -> Result<usize, Error> {
165 let delete_db_names: Vec<String> =
166 query_scalar("select db_name from _sqlx_test.databases where created_at < now()")
167 .fetch_all(&mut *conn)
168 .await?;
169
170 if delete_db_names.is_empty() {
171 return Ok(0);
172 }
173
174 let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
175 let delete_db_names = delete_db_names.into_iter();
176
177 let mut command = String::new();
178
179 for db_name in delete_db_names {
180 command.clear();
181 writeln!(command, "drop database if exists {:?};", db_name).ok();
182 match conn.execute(&*command).await {
183 Ok(_deleted) => {
184 deleted_db_names.push(db_name);
185 }
186 Err(Error::Database(dbe)) => {
188 eprintln!("could not clean test database {:?}: {}", db_name, dbe)
189 }
190 Err(e) => return Err(e),
192 }
193 }
194
195 query("delete from _sqlx_test.databases where db_name = any($1::text[])")
196 .bind(&deleted_db_names)
197 .execute(&mut *conn)
198 .await?;
199
200 Ok(deleted_db_names.len())
201}