streamling_e2e/resources/
postgres.rs1use crate::{E2eError, Result};
4use sqlx::postgres::PgPoolOptions;
5use sqlx::{PgPool, Row};
6use std::time::Duration;
7use tracing::info;
8
9pub struct PostgresResource {
11 pool: PgPool,
13 admin_pool: PgPool,
15 pub database: String,
17 pub host: String,
19 pub port: u16,
21 pub user: String,
23 pub password: String,
25 base_url: String,
27 query_string: String,
29 should_drop: bool,
31}
32
33impl PostgresResource {
34 pub async fn connect_existing(admin_url: &str, database: &str) -> Result<Self> {
36 let parsed = url::Url::parse(admin_url)
38 .map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
39
40 let host = parsed.host_str().unwrap_or("localhost").to_string();
41 let port = parsed.port().unwrap_or(5432);
42 let user = parsed.username().to_string();
43 let password = parsed.password().unwrap_or("").to_string();
44 let query_string = parsed
45 .query()
46 .map(|q| format!("?{}", q))
47 .unwrap_or_default();
48
49 let base_url = format!("postgres://{}:{}@{}:{}", user, password, host, port);
51
52 let admin_pool = PgPoolOptions::new()
54 .max_connections(2)
55 .acquire_timeout(Duration::from_secs(30))
56 .connect(admin_url)
57 .await?;
58
59 let db_url = format!("{}/{}{}", base_url, database, query_string);
61 let pool = PgPoolOptions::new()
62 .max_connections(5)
63 .acquire_timeout(Duration::from_secs(30))
64 .connect(&db_url)
65 .await?;
66
67 info!("Connected to existing PostgreSQL database: {}", database);
68
69 Ok(Self {
70 pool,
71 admin_pool,
72 database: database.to_string(),
73 host,
74 port,
75 user,
76 password,
77 base_url,
78 query_string,
79 should_drop: false, })
81 }
82
83 pub async fn new(admin_url: &str, database: &str) -> Result<Self> {
85 let parsed = url::Url::parse(admin_url)
87 .map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
88
89 let host = parsed.host_str().unwrap_or("localhost").to_string();
90 let port = parsed.port().unwrap_or(5432);
91 let user = parsed.username().to_string();
92 let password = parsed.password().unwrap_or("").to_string();
93 let query_string = parsed
94 .query()
95 .map(|q| format!("?{}", q))
96 .unwrap_or_default();
97
98 let base_url = format!("postgres://{}:{}@{}:{}", user, password, host, port);
100
101 let admin_pool = PgPoolOptions::new()
103 .max_connections(2)
104 .acquire_timeout(Duration::from_secs(30))
105 .connect(admin_url)
106 .await?;
107
108 sqlx::query(&format!(
111 "CREATE DATABASE \"{}\"",
112 database.replace('"', "\"\"")
113 ))
114 .execute(&admin_pool)
115 .await
116 .ok(); let db_url = format!("{}/{}{}", base_url, database, query_string);
120 let pool = PgPoolOptions::new()
121 .max_connections(5)
122 .acquire_timeout(Duration::from_secs(30))
123 .connect(&db_url)
124 .await?;
125
126 info!("Connected to PostgreSQL database: {}", database);
127
128 Ok(Self {
129 pool,
130 admin_pool,
131 database: database.to_string(),
132 host,
133 port,
134 user,
135 password,
136 base_url,
137 query_string,
138 should_drop: true, })
140 }
141
142 pub fn connection_string(&self) -> String {
144 format!("{}/{}{}", self.base_url, self.database, self.query_string)
145 }
146
147 pub fn pool(&self) -> &PgPool {
149 &self.pool
150 }
151
152 pub async fn execute(&self, sql: &str) -> Result<()> {
154 sqlx::query(sql).execute(&self.pool).await?;
155 Ok(())
156 }
157
158 pub async fn count(&self, query: &str) -> Result<i64> {
160 let row = sqlx::query(query).fetch_one(&self.pool).await?;
161 let count: i64 = row.try_get(0)?;
162 Ok(count)
163 }
164
165 pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
167 where
168 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
169 {
170 let rows = sqlx::query_as::<_, T>(query).fetch_all(&self.pool).await?;
171 Ok(rows)
172 }
173
174 pub async fn list_tables(&self) -> Result<Vec<String>> {
176 #[derive(sqlx::FromRow)]
177 struct TableRow {
178 tablename: String,
179 }
180
181 let tables: Vec<TableRow> = self
182 .query("SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename")
183 .await?;
184
185 Ok(tables.into_iter().map(|t| t.tablename).collect())
186 }
187
188 pub async fn get_sample_data(&self, table: &str, limit: usize) -> Result<Vec<Vec<String>>> {
190 let rows = sqlx::query(&format!(
191 "SELECT * FROM public.\"{}\" LIMIT {}",
192 table, limit
193 ))
194 .fetch_all(&self.pool)
195 .await?;
196
197 let mut results = Vec::new();
198 for row in rows {
199 let values: Vec<String> = (0..row.len())
200 .map(|i| {
201 let val: Option<String> = row.try_get(i).ok();
202 val.unwrap_or_else(|| "NULL".to_string())
203 })
204 .collect();
205 results.push(values);
206 }
207
208 Ok(results)
209 }
210
211 pub async fn get_column_names(&self, table: &str) -> Result<Vec<String>> {
213 let rows = sqlx::query(&format!("SELECT * FROM public.\"{}\" LIMIT 1", table))
214 .fetch_all(&self.pool)
215 .await?;
216
217 if let Some(row) = rows.first() {
218 use sqlx::Column;
219 Ok(row.columns().iter().map(|c| c.name().to_string()).collect())
220 } else {
221 #[derive(sqlx::FromRow)]
223 struct ColumnRow {
224 column_name: String,
225 }
226
227 let columns: Vec<ColumnRow> = self
228 .query(&format!(
229 "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '{}' ORDER BY ordinal_position",
230 table
231 ))
232 .await?;
233
234 Ok(columns.into_iter().map(|c| c.column_name).collect())
235 }
236 }
237
238 #[allow(dead_code)]
240 pub async fn cleanup(&self) -> Result<()> {
241 self.pool.close().await;
243
244 let terminate_sql = format!(
246 "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}' AND pid <> pg_backend_pid()",
247 self.database.replace('\'', "''")
248 );
249 let _ = sqlx::query(&terminate_sql).execute(&self.admin_pool).await;
250
251 tokio::time::sleep(Duration::from_millis(100)).await;
253
254 let drop_sql = format!(
256 "DROP DATABASE IF EXISTS \"{}\"",
257 self.database.replace('"', "\"\"")
258 );
259 sqlx::query(&drop_sql).execute(&self.admin_pool).await?;
260
261 info!("Dropped PostgreSQL database: {}", self.database);
262 Ok(())
263 }
264}
265
266impl Drop for PostgresResource {
267 fn drop(&mut self) {
268 if !self.should_drop {
270 return;
271 }
272
273 if let Ok(handle) = tokio::runtime::Handle::try_current() {
276 let database = self.database.clone();
277 let admin_pool = self.admin_pool.clone();
278 let pool = self.pool.clone();
279
280 handle.spawn(async move {
282 pool.close().await;
284
285 let terminate_sql = format!(
287 "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}' AND pid <> pg_backend_pid()",
288 database.replace('\'', "''")
289 );
290 let _ = sqlx::query(&terminate_sql).execute(&admin_pool).await;
291
292 tokio::time::sleep(Duration::from_millis(200)).await;
294
295 let drop_sql = format!(
297 "DROP DATABASE IF EXISTS \"{}\"",
298 database.replace('"', "\"\"")
299 );
300 if let Err(e) = sqlx::query(&drop_sql).execute(&admin_pool).await {
301 tracing::warn!("Failed to drop database {}: {}", database, e);
302 } else {
303 info!("Dropped PostgreSQL database: {}", database);
304 }
305
306 admin_pool.close().await;
308 });
309 }
310 }
311}