streamling_e2e/resources/
mysql.rs1use crate::{E2eError, Result};
4use sqlx::mysql::MySqlPoolOptions;
5use sqlx::{MySqlPool, Row};
6use std::time::Duration;
7use tracing::info;
8
9pub struct MySqlResource {
11 pool: MySqlPool,
12 admin_pool: MySqlPool,
13 pub database: String,
14 pub host: String,
15 pub port: u16,
16 pub user: String,
17 pub password: String,
18 base_url: String,
19}
20
21impl MySqlResource {
22 pub async fn new(admin_url: &str, database: &str) -> Result<Self> {
24 let parsed = url::Url::parse(admin_url)
25 .map_err(|e| E2eError::Mysql(format!("invalid MySQL URL: {}", e)))?;
26
27 let host = parsed.host_str().unwrap_or("localhost").to_string();
28 let port = parsed.port().unwrap_or(3306);
29 let user = parsed.username().to_string();
30 let password = parsed.password().unwrap_or("").to_string();
31
32 let base_url = format!("mysql://{}:{}@{}:{}", user, password, host, port);
33
34 let admin_pool = MySqlPoolOptions::new()
35 .max_connections(2)
36 .acquire_timeout(Duration::from_secs(30))
37 .connect(admin_url)
38 .await
39 .map_err(|e| E2eError::Mysql(format!("failed to connect to MySQL: {}", e)))?;
40
41 let create_db = format!("CREATE DATABASE IF NOT EXISTS `{}`", database);
42 sqlx::query(&create_db)
43 .execute(&admin_pool)
44 .await
45 .map_err(|e| E2eError::Mysql(format!("failed to create database: {}", e)))?;
46
47 let db_url = format!("{}/{}", base_url, database);
48 let pool = MySqlPoolOptions::new()
49 .max_connections(5)
50 .acquire_timeout(Duration::from_secs(30))
51 .connect(&db_url)
52 .await
53 .map_err(|e| E2eError::Mysql(format!("failed to connect to database: {}", e)))?;
54
55 info!("Created MySQL database: {}", database);
56
57 Ok(Self {
58 pool,
59 admin_pool,
60 database: database.to_string(),
61 host,
62 port,
63 user,
64 password,
65 base_url,
66 })
67 }
68
69 pub fn connection_string(&self) -> String {
70 format!("{}/{}", self.base_url, self.database)
71 }
72
73 pub fn pool(&self) -> &MySqlPool {
74 &self.pool
75 }
76
77 pub async fn execute(&self, sql: &str) -> Result<()> {
78 sqlx::query(sql)
79 .execute(&self.pool)
80 .await
81 .map_err(|e| E2eError::Mysql(format!("query failed: {}", e)))?;
82 Ok(())
83 }
84
85 pub async fn count(&self, query: &str) -> Result<i64> {
86 let row = sqlx::query(query)
87 .fetch_one(&self.pool)
88 .await
89 .map_err(|e| E2eError::Mysql(format!("count query failed: {}", e)))?;
90 let count: i64 = row.try_get(0).map_err(|e| E2eError::Mysql(e.to_string()))?;
91 Ok(count)
92 }
93
94 pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
95 where
96 T: for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow> + Send + Unpin,
97 {
98 let rows = sqlx::query_as::<_, T>(query)
99 .fetch_all(&self.pool)
100 .await
101 .map_err(|e| E2eError::Mysql(format!("query failed: {}", e)))?;
102 Ok(rows)
103 }
104}
105
106impl Drop for MySqlResource {
107 fn drop(&mut self) {
108 if let Ok(handle) = tokio::runtime::Handle::try_current() {
109 let database = self.database.clone();
110 let admin_pool = self.admin_pool.clone();
111 let pool = self.pool.clone();
112
113 handle.spawn(async move {
114 pool.close().await;
115 tokio::time::sleep(Duration::from_millis(100)).await;
116 let drop_sql = format!("DROP DATABASE IF EXISTS `{}`", database);
117 if let Err(e) = sqlx::query(&drop_sql).execute(&admin_pool).await {
118 tracing::warn!("Failed to drop MySQL database {}: {}", database, e);
119 } else {
120 info!("Dropped MySQL database: {}", database);
121 }
122 admin_pool.close().await;
123 });
124 }
125 }
126}