Skip to main content

streamling_e2e/resources/
postgres.rs

1//! PostgreSQL resource manager for creating isolated databases per test.
2
3use crate::{E2eError, Result};
4use sqlx::postgres::PgPoolOptions;
5use sqlx::{PgPool, Row};
6use std::time::Duration;
7use tracing::info;
8
9/// PostgreSQL resource manager
10pub struct PostgresResource {
11    /// Connection pool to the isolated database
12    pool: PgPool,
13    /// Connection pool to the admin database (for cleanup)
14    admin_pool: PgPool,
15    /// Name of the isolated database
16    pub database: String,
17    /// Host
18    pub host: String,
19    /// Port
20    pub port: u16,
21    /// User
22    pub user: String,
23    /// Password
24    pub password: String,
25    /// Base URL without database
26    base_url: String,
27    /// Query string (e.g., ?sslmode=disable)
28    query_string: String,
29    /// Whether to drop the database on cleanup
30    should_drop: bool,
31}
32
33impl PostgresResource {
34    /// Connect to an existing PostgreSQL database (for inspection, does not drop on cleanup)
35    pub async fn connect_existing(admin_url: &str, database: &str) -> Result<Self> {
36        // Parse the admin URL to extract components
37        let parsed = url::Url::parse(admin_url)
38            .map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
39
40        let host = parsed.host_str().unwrap_or("localhost").to_string();
41        let port = parsed.port().unwrap_or(5432);
42        let user = parsed.username().to_string();
43        let password = parsed.password().unwrap_or("").to_string();
44        let query_string = parsed
45            .query()
46            .map(|q| format!("?{}", q))
47            .unwrap_or_default();
48
49        // Build base URL without database (but preserve query params)
50        let base_url = format!("postgres://{}:{}@{}:{}", user, password, host, port);
51
52        // Connect to admin database (for compatibility, but won't be used for dropping)
53        let admin_pool = PgPoolOptions::new()
54            .max_connections(2)
55            .acquire_timeout(Duration::from_secs(30))
56            .connect(admin_url)
57            .await?;
58
59        // Connect to the existing database (preserve query params like sslmode)
60        let db_url = format!("{}/{}{}", base_url, database, query_string);
61        let pool = PgPoolOptions::new()
62            .max_connections(5)
63            .acquire_timeout(Duration::from_secs(30))
64            .connect(&db_url)
65            .await?;
66
67        info!("Connected to existing PostgreSQL database: {}", database);
68
69        Ok(Self {
70            pool,
71            admin_pool,
72            database: database.to_string(),
73            host,
74            port,
75            user,
76            password,
77            base_url,
78            query_string,
79            should_drop: false, // Don't drop when connecting to existing database
80        })
81    }
82
83    /// Create a new PostgreSQL resource with an isolated database
84    pub async fn new(admin_url: &str, database: &str) -> Result<Self> {
85        // Parse the admin URL to extract components
86        let parsed = url::Url::parse(admin_url)
87            .map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
88
89        let host = parsed.host_str().unwrap_or("localhost").to_string();
90        let port = parsed.port().unwrap_or(5432);
91        let user = parsed.username().to_string();
92        let password = parsed.password().unwrap_or("").to_string();
93        let query_string = parsed
94            .query()
95            .map(|q| format!("?{}", q))
96            .unwrap_or_default();
97
98        // Build base URL without database (but preserve query params)
99        let base_url = format!("postgres://{}:{}@{}:{}", user, password, host, port);
100
101        // Connect to admin database
102        let admin_pool = PgPoolOptions::new()
103            .max_connections(2)
104            .acquire_timeout(Duration::from_secs(30))
105            .connect(admin_url)
106            .await?;
107
108        // Create the isolated database
109        // Use IF NOT EXISTS to handle race conditions in parallel tests
110        sqlx::query(&format!(
111            "CREATE DATABASE \"{}\"",
112            database.replace('"', "\"\"")
113        ))
114        .execute(&admin_pool)
115        .await
116        .ok(); // Ignore error if database already exists
117
118        // Connect to the isolated database (preserve query params like sslmode)
119        let db_url = format!("{}/{}{}", base_url, database, query_string);
120        let pool = PgPoolOptions::new()
121            .max_connections(5)
122            .acquire_timeout(Duration::from_secs(30))
123            .connect(&db_url)
124            .await?;
125
126        info!("Connected to PostgreSQL database: {}", database);
127
128        Ok(Self {
129            pool,
130            admin_pool,
131            database: database.to_string(),
132            host,
133            port,
134            user,
135            password,
136            base_url,
137            query_string,
138            should_drop: true, // Drop when created by test
139        })
140    }
141
142    /// Get the connection string for the isolated database
143    pub fn connection_string(&self) -> String {
144        format!("{}/{}{}", self.base_url, self.database, self.query_string)
145    }
146
147    /// Get the connection pool
148    pub fn pool(&self) -> &PgPool {
149        &self.pool
150    }
151
152    /// Execute a SQL statement
153    pub async fn execute(&self, sql: &str) -> Result<()> {
154        sqlx::query(sql).execute(&self.pool).await?;
155        Ok(())
156    }
157
158    /// Execute a count query and return the result
159    pub async fn count(&self, query: &str) -> Result<i64> {
160        let row = sqlx::query(query).fetch_one(&self.pool).await?;
161        let count: i64 = row.try_get(0)?;
162        Ok(count)
163    }
164
165    /// Execute a query and return typed results
166    pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
167    where
168        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
169    {
170        let rows = sqlx::query_as::<_, T>(query).fetch_all(&self.pool).await?;
171        Ok(rows)
172    }
173
174    /// List all tables in the database
175    pub async fn list_tables(&self) -> Result<Vec<String>> {
176        #[derive(sqlx::FromRow)]
177        struct TableRow {
178            tablename: String,
179        }
180
181        let tables: Vec<TableRow> = self
182            .query("SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename")
183            .await?;
184
185        Ok(tables.into_iter().map(|t| t.tablename).collect())
186    }
187
188    /// Get sample data from a table (first N rows)
189    pub async fn get_sample_data(&self, table: &str, limit: usize) -> Result<Vec<Vec<String>>> {
190        let rows = sqlx::query(&format!(
191            "SELECT * FROM public.\"{}\" LIMIT {}",
192            table, limit
193        ))
194        .fetch_all(&self.pool)
195        .await?;
196
197        let mut results = Vec::new();
198        for row in rows {
199            let values: Vec<String> = (0..row.len())
200                .map(|i| {
201                    let val: Option<String> = row.try_get(i).ok();
202                    val.unwrap_or_else(|| "NULL".to_string())
203                })
204                .collect();
205            results.push(values);
206        }
207
208        Ok(results)
209    }
210
211    /// Get column names for a table
212    pub async fn get_column_names(&self, table: &str) -> Result<Vec<String>> {
213        let rows = sqlx::query(&format!("SELECT * FROM public.\"{}\" LIMIT 1", table))
214            .fetch_all(&self.pool)
215            .await?;
216
217        if let Some(row) = rows.first() {
218            use sqlx::Column;
219            Ok(row.columns().iter().map(|c| c.name().to_string()).collect())
220        } else {
221            // If no rows, query the schema directly
222            #[derive(sqlx::FromRow)]
223            struct ColumnRow {
224                column_name: String,
225            }
226
227            let columns: Vec<ColumnRow> = self
228                .query(&format!(
229                    "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '{}' ORDER BY ordinal_position",
230                    table
231                ))
232                .await?;
233
234            Ok(columns.into_iter().map(|c| c.column_name).collect())
235        }
236    }
237
238    /// Clean up the database (can be called explicitly if needed)
239    #[allow(dead_code)]
240    pub async fn cleanup(&self) -> Result<()> {
241        // Close the pool connection first
242        self.pool.close().await;
243
244        // Terminate all connections to the database
245        let terminate_sql = format!(
246            "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}' AND pid <> pg_backend_pid()",
247            self.database.replace('\'', "''")
248        );
249        let _ = sqlx::query(&terminate_sql).execute(&self.admin_pool).await;
250
251        // Small delay to ensure connections are terminated
252        tokio::time::sleep(Duration::from_millis(100)).await;
253
254        // Drop the database
255        let drop_sql = format!(
256            "DROP DATABASE IF EXISTS \"{}\"",
257            self.database.replace('"', "\"\"")
258        );
259        sqlx::query(&drop_sql).execute(&self.admin_pool).await?;
260
261        info!("Dropped PostgreSQL database: {}", self.database);
262        Ok(())
263    }
264}
265
266impl Drop for PostgresResource {
267    fn drop(&mut self) {
268        // Only drop if this resource was created (not connected to existing)
269        if !self.should_drop {
270            return;
271        }
272
273        // Cleanup is best-effort since Drop is synchronous but cleanup is async
274        // For reliable cleanup, call cleanup() explicitly before dropping
275        if let Ok(handle) = tokio::runtime::Handle::try_current() {
276            let database = self.database.clone();
277            let admin_pool = self.admin_pool.clone();
278            let pool = self.pool.clone();
279
280            // Spawn cleanup task (best-effort - may not complete if runtime shuts down)
281            handle.spawn(async move {
282                // Close the pool first to release connections
283                pool.close().await;
284
285                // Terminate any remaining connections to the database
286                let terminate_sql = format!(
287                    "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}' AND pid <> pg_backend_pid()",
288                    database.replace('\'', "''")
289                );
290                let _ = sqlx::query(&terminate_sql).execute(&admin_pool).await;
291
292                // Small delay to ensure connections are terminated
293                tokio::time::sleep(Duration::from_millis(200)).await;
294
295                // Drop the database
296                let drop_sql = format!(
297                    "DROP DATABASE IF EXISTS \"{}\"",
298                    database.replace('"', "\"\"")
299                );
300                if let Err(e) = sqlx::query(&drop_sql).execute(&admin_pool).await {
301                    tracing::warn!("Failed to drop database {}: {}", database, e);
302                } else {
303                    info!("Dropped PostgreSQL database: {}", database);
304                }
305
306                // Close admin pool
307                admin_pool.close().await;
308            });
309        }
310    }
311}