use anyhow::Result;
use sqlx::postgres::{PgPool, PgPoolOptions};
use sqlx::Row;
use std::env;
use std::sync::Once;
use uuid::Uuid;
static INIT: Once = Once::new();
fn ensure_env_loaded() {
INIT.call_once(|| {
dotenvy::dotenv().ok();
});
}
pub struct TestDatabaseManager {
pool: Option<PgPool>,
pub test_id: String,
pub memories_table: String,
}
impl TestDatabaseManager {
pub fn new() -> Result<Self> {
ensure_env_loaded();
let test_id = Uuid::new_v4().to_string().replace("-", "_");
let memories_table = format!("test_memories_{}", test_id);
Ok(Self {
pool: None,
test_id,
memories_table,
})
}
pub fn get_test_database_url(&self) -> String {
ensure_env_loaded();
env::var("TEST_DATABASE_URL")
.expect("TEST_DATABASE_URL must be set - NEVER use production database for tests! Set TEST_DATABASE_URL=postgresql://user:pass@host:port/codex_test_db")
}
pub async fn setup_test_database(&mut self) -> Result<PgPool> {
let test_url = self.get_test_database_url();
let schema_name = format!("test_schema_{}", self.test_id);
let temp_pool = PgPoolOptions::new()
.max_connections(1)
.connect(&test_url)
.await?;
sqlx::query(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name))
.execute(&temp_pool)
.await
.ok();
sqlx::query(&format!("CREATE SCHEMA {}", schema_name))
.execute(&temp_pool)
.await?;
temp_pool.close().await;
let search_path_setting = format!("SET search_path TO {}, public", schema_name);
let pool = PgPoolOptions::new()
.max_connections(5)
.after_connect(move |conn, _meta| {
let search_path_cmd = search_path_setting.clone();
Box::pin(async move {
sqlx::query(&search_path_cmd).execute(conn).await?;
Ok(())
})
})
.connect(&test_url)
.await?;
self.run_migrations(&pool).await?;
self.pool = Some(pool.clone());
Ok(pool)
}
async fn run_migrations(&self, pool: &PgPool) -> Result<()> {
codex_memory::database::run_migrations(pool).await?;
Ok(())
}
pub async fn cleanup(&mut self) -> Result<()> {
if let Some(pool) = &self.pool {
let schema_name = format!("test_schema_{}", self.test_id);
let drop_schema = format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name);
sqlx::query(&drop_schema).execute(pool).await.ok();
println!("Cleaned up test schema: {}", schema_name);
}
Ok(())
}
}
impl Drop for TestDatabaseManager {
fn drop(&mut self) {
if let Some(pool) = &self.pool {
let _close_future = pool.close();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_database_manager_lifecycle() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
assert!(!manager.test_id.is_empty());
assert!(!manager.memories_table.is_empty());
let pool = manager.setup_test_database().await?;
let search_path_result = sqlx::query("SHOW search_path").fetch_one(&pool).await?;
let search_path: String = search_path_result.try_get("search_path")?;
println!("Current search_path: {}", search_path);
let test_schema = format!("test_schema_{}", manager.test_id);
let table_check = sqlx::query(&format!(
"SELECT COUNT(*) as count FROM information_schema.tables
WHERE table_schema = '{}' AND table_name = 'memories'",
test_schema
))
.fetch_one(&pool)
.await?;
let table_exists: i64 = table_check.try_get("count")?;
println!(
"Memories table exists in {}: {}",
test_schema,
table_exists > 0
);
let result = sqlx::query(&format!(
"SELECT COUNT(*) as count FROM {}.memories",
test_schema
))
.fetch_one(&pool)
.await?;
let count: i64 = result.try_get("count")?;
assert_eq!(count, 0);
let unique_content = format!("Test content {}", manager.test_id);
let unique_hash = format!("test_hash_{}", manager.test_id);
sqlx::query(&format!(
"INSERT INTO {}.memories (content, content_hash, context, summary) VALUES ($1, $2, $3, $4)",
test_schema
))
.bind(&unique_content)
.bind(&unique_hash)
.bind("Test context")
.bind("Test summary")
.execute(&pool)
.await?;
let result = sqlx::query(&format!(
"SELECT COUNT(*) as count FROM {}.memories",
test_schema
))
.fetch_one(&pool)
.await?;
let count: i64 = result.try_get("count")?;
assert_eq!(count, 1);
manager.cleanup().await?;
Ok(())
}
}