sqlx_core_oldapi/mysql/testing/
mod.rs

1use std::fmt::Write;
2use std::str::FromStr;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use futures_core::future::BoxFuture;
7
8use once_cell::sync::OnceCell;
9
10use crate::connection::Connection;
11
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::mysql::{MySql, MySqlConnectOptions, MySqlConnection};
15use crate::pool::{Pool, PoolOptions};
16use crate::query::query;
17use crate::query_builder::QueryBuilder;
18use crate::query_scalar::query_scalar;
19use crate::testing::{FixtureSnapshot, TestArgs, TestContext, TestSupport};
20
21// Using a blocking `OnceCell` here because the critical sections are short.
22static MASTER_POOL: OnceCell<Pool<MySql>> = OnceCell::new();
23// Automatically delete any databases created before the start of the test binary.
24static DO_CLEANUP: AtomicBool = AtomicBool::new(true);
25
26impl TestSupport for MySql {
27    fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
28        Box::pin(async move {
29            let res = test_context(args).await;
30            res
31        })
32    }
33
34    fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
35        Box::pin(async move {
36            let mut conn = MASTER_POOL
37                .get()
38                .expect("cleanup_test() invoked outside `#[sqlx::test]")
39                .acquire()
40                .await?;
41
42            let db_id = db_id(db_name);
43
44            conn.execute(&format!("drop database if exists {};", db_name)[..])
45                .await?;
46
47            query("delete from _sqlx_test_databases where db_id = ?")
48                .bind(&db_id)
49                .execute(&mut conn)
50                .await?;
51
52            Ok(())
53        })
54    }
55
56    fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
57        Box::pin(async move {
58            let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
59
60            let mut conn = MySqlConnection::connect(&url).await?;
61            let num_deleted = do_cleanup(&mut conn).await?;
62            let _ = conn.close().await;
63            Ok(Some(num_deleted))
64        })
65    }
66
67    fn snapshot(
68        _conn: &mut Self::Connection,
69    ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
70        // TODO: I want to get the testing feature out the door so this will have to wait,
71        // but I'm keeping the code around for now because I plan to come back to it.
72        todo!()
73    }
74}
75
76async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
77    let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
78
79    let master_opts = MySqlConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
80
81    let pool = PoolOptions::new()
82        // MySql's normal connection limit is 150 plus 1 superuser connection
83        // We don't want to use the whole cap and there may be fuzziness here due to
84        // concurrently running tests anyway.
85        .max_connections(20)
86        // Immediately close master connections. Tokio's I/O streams don't like hopping runtimes.
87        .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
88        .connect_lazy_with(master_opts);
89
90    let master_pool = match MASTER_POOL.try_insert(pool) {
91        Ok(inserted) => inserted,
92        Err((existing, pool)) => {
93            // Sanity checks.
94            assert_eq!(
95                existing.connect_options().host,
96                pool.connect_options().host,
97                "DATABASE_URL changed at runtime, host differs"
98            );
99
100            assert_eq!(
101                existing.connect_options().database,
102                pool.connect_options().database,
103                "DATABASE_URL changed at runtime, database differs"
104            );
105
106            existing
107        }
108    };
109
110    let mut conn = master_pool.acquire().await?;
111
112    // language=MySQL
113    conn.execute(
114        r#"
115        create table if not exists _sqlx_test_databases (
116            db_id bigint unsigned primary key auto_increment,
117            test_path text not null,
118            created_at timestamp not null default current_timestamp
119        );
120    "#,
121    )
122    .await?;
123
124    // Only run cleanup if the test binary just started.
125    if DO_CLEANUP.swap(false, Ordering::SeqCst) {
126        do_cleanup(&mut conn).await?;
127    }
128
129    query("insert into _sqlx_test_databases(test_path) values (?)")
130        .bind(&args.test_path)
131        .execute(&mut conn)
132        .await?;
133
134    // MySQL doesn't have `INSERT ... RETURNING`
135    let new_db_id: u64 = query_scalar("select last_insert_id()")
136        .fetch_one(&mut conn)
137        .await?;
138
139    let new_db_name = db_name(new_db_id);
140
141    conn.execute(&format!("create database {}", new_db_name)[..])
142        .await?;
143
144    eprintln!("created database {}", new_db_name);
145
146    Ok(TestContext {
147        pool_opts: PoolOptions::new()
148            // Don't allow a single test to take all the connections.
149            // Most tests shouldn't require more than 5 connections concurrently,
150            // or else they're likely doing too much in one test.
151            .max_connections(5)
152            // Close connections ASAP if left in the idle queue.
153            .idle_timeout(Some(Duration::from_secs(1)))
154            .parent(master_pool.clone()),
155        connect_opts: master_pool.connect_options().clone().database(&new_db_name),
156        db_name: new_db_name,
157    })
158}
159
160async fn do_cleanup(conn: &mut MySqlConnection) -> Result<usize, Error> {
161    let delete_db_ids: Vec<u64> = query_scalar(
162        "select db_id from _sqlx_test_databases where created_at < current_timestamp()",
163    )
164    .fetch_all(&mut *conn)
165    .await?;
166
167    if delete_db_ids.is_empty() {
168        return Ok(0);
169    }
170
171    let mut deleted_db_ids = Vec::with_capacity(delete_db_ids.len());
172
173    let mut command = String::new();
174
175    for db_id in delete_db_ids {
176        command.clear();
177
178        let db_name = db_name(db_id);
179
180        writeln!(command, "drop database if exists {}", db_name).ok();
181        match conn.execute(&*command).await {
182            Ok(_deleted) => {
183                deleted_db_ids.push(db_id);
184            }
185            // Assume a database error just means the DB is still in use.
186            Err(Error::Database(dbe)) => {
187                eprintln!("could not clean test database {:?}: {}", db_id, dbe)
188            }
189            // Bubble up other errors
190            Err(e) => return Err(e),
191        }
192    }
193
194    let mut query = QueryBuilder::new("delete from _sqlx_test_databases where db_id in (");
195
196    let mut separated = query.separated(",");
197
198    for db_id in &deleted_db_ids {
199        separated.push_bind(db_id);
200    }
201
202    drop(separated);
203
204    query.push(")").build().execute(&mut *conn).await?;
205
206    Ok(deleted_db_ids.len())
207}
208
209fn db_name(id: u64) -> String {
210    format!("_sqlx_test_database_{}", id)
211}
212
213fn db_id(name: &str) -> u64 {
214    name.trim_start_matches("_sqlx_test_database_")
215        .parse()
216        .unwrap_or_else(|_1| panic!("failed to parse ID from database name {:?}", name))
217}
218
219#[test]
220fn test_db_name_id() {
221    assert_eq!(db_name(12345), "_sqlx_test_database_12345");
222    assert_eq!(db_id("_sqlx_test_database_12345"), 12345);
223}