Skip to main content

streamling_e2e/resources/
mysql.rs

1//! MySQL resource manager for creating isolated databases per test.
2
3use crate::{E2eError, Result};
4use sqlx::mysql::MySqlPoolOptions;
5use sqlx::{MySqlPool, Row};
6use std::time::Duration;
7use tracing::info;
8
9/// MySQL resource manager
10pub struct MySqlResource {
11    pool: MySqlPool,
12    admin_pool: MySqlPool,
13    pub database: String,
14    pub host: String,
15    pub port: u16,
16    pub user: String,
17    pub password: String,
18    base_url: String,
19}
20
21impl MySqlResource {
22    /// Create a new MySQL resource with an isolated database
23    pub async fn new(admin_url: &str, database: &str) -> Result<Self> {
24        let parsed = url::Url::parse(admin_url)
25            .map_err(|e| E2eError::Mysql(format!("invalid MySQL URL: {}", e)))?;
26
27        let host = parsed.host_str().unwrap_or("localhost").to_string();
28        let port = parsed.port().unwrap_or(3306);
29        let user = parsed.username().to_string();
30        let password = parsed.password().unwrap_or("").to_string();
31
32        let base_url = format!("mysql://{}:{}@{}:{}", user, password, host, port);
33
34        let admin_pool = MySqlPoolOptions::new()
35            .max_connections(2)
36            .acquire_timeout(Duration::from_secs(30))
37            .connect(admin_url)
38            .await
39            .map_err(|e| E2eError::Mysql(format!("failed to connect to MySQL: {}", e)))?;
40
41        let create_db = format!("CREATE DATABASE IF NOT EXISTS `{}`", database);
42        sqlx::query(&create_db)
43            .execute(&admin_pool)
44            .await
45            .map_err(|e| E2eError::Mysql(format!("failed to create database: {}", e)))?;
46
47        let db_url = format!("{}/{}", base_url, database);
48        let pool = MySqlPoolOptions::new()
49            .max_connections(5)
50            .acquire_timeout(Duration::from_secs(30))
51            .connect(&db_url)
52            .await
53            .map_err(|e| E2eError::Mysql(format!("failed to connect to database: {}", e)))?;
54
55        info!("Created MySQL database: {}", database);
56
57        Ok(Self {
58            pool,
59            admin_pool,
60            database: database.to_string(),
61            host,
62            port,
63            user,
64            password,
65            base_url,
66        })
67    }
68
69    pub fn connection_string(&self) -> String {
70        format!("{}/{}", self.base_url, self.database)
71    }
72
73    pub fn pool(&self) -> &MySqlPool {
74        &self.pool
75    }
76
77    pub async fn execute(&self, sql: &str) -> Result<()> {
78        sqlx::query(sql)
79            .execute(&self.pool)
80            .await
81            .map_err(|e| E2eError::Mysql(format!("query failed: {}", e)))?;
82        Ok(())
83    }
84
85    pub async fn count(&self, query: &str) -> Result<i64> {
86        let row = sqlx::query(query)
87            .fetch_one(&self.pool)
88            .await
89            .map_err(|e| E2eError::Mysql(format!("count query failed: {}", e)))?;
90        let count: i64 = row.try_get(0).map_err(|e| E2eError::Mysql(e.to_string()))?;
91        Ok(count)
92    }
93
94    pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
95    where
96        T: for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow> + Send + Unpin,
97    {
98        let rows = sqlx::query_as::<_, T>(query)
99            .fetch_all(&self.pool)
100            .await
101            .map_err(|e| E2eError::Mysql(format!("query failed: {}", e)))?;
102        Ok(rows)
103    }
104}
105
106impl Drop for MySqlResource {
107    fn drop(&mut self) {
108        if let Ok(handle) = tokio::runtime::Handle::try_current() {
109            let database = self.database.clone();
110            let admin_pool = self.admin_pool.clone();
111            let pool = self.pool.clone();
112
113            handle.spawn(async move {
114                pool.close().await;
115                tokio::time::sleep(Duration::from_millis(100)).await;
116                let drop_sql = format!("DROP DATABASE IF EXISTS `{}`", database);
117                if let Err(e) = sqlx::query(&drop_sql).execute(&admin_pool).await {
118                    tracing::warn!("Failed to drop MySQL database {}: {}", database, e);
119                } else {
120                    info!("Dropped MySQL database: {}", database);
121                }
122                admin_pool.close().await;
123            });
124        }
125    }
126}