use crate::error::{Error, Result};
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::process::Command;
use std::time::Duration;
use url::Url;
pub async fn create_pool(database_url: &str) -> Result<PgPool> {
let pool = PgPoolOptions::new()
.max_connections(50) .min_connections(2) .acquire_timeout(Duration::from_secs(30)) .idle_timeout(Duration::from_secs(600)) .max_lifetime(Duration::from_secs(1800)) .test_before_acquire(true) .after_connect(|conn, _| {
Box::pin(async move {
sqlx::query("SET statement_timeout = '30s'")
.execute(&mut *conn)
.await?;
sqlx::query("SET lock_timeout = '10s'")
.execute(&mut *conn)
.await?;
Ok(())
})
})
.connect(database_url)
.await?;
Ok(pool)
}
pub struct DatabaseSetup {
pub pool: PgPool,
}
impl DatabaseSetup {
pub async fn new(database_url: &str) -> Result<Self> {
let pool = create_pool(database_url).await?;
Ok(Self { pool })
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
}
pub async fn setup_local_database() -> Result<()> {
println!("🔧 Setting up local PostgreSQL database...");
let database_url = std::env::var("DATABASE_URL")
.map_err(|_| Error::Config("DATABASE_URL environment variable must be set".to_string()))?;
let parsed_url = Url::parse(&database_url)?;
let username = parsed_url.username();
let password = parsed_url.password().unwrap_or("codex_pass");
let database = parsed_url.path().trim_start_matches('/');
let host = parsed_url
.host_str()
.ok_or_else(|| Error::Config("DATABASE_URL must contain a host".to_string()))?;
let port = parsed_url.port().unwrap_or(5432);
println!("📍 Connecting to PostgreSQL at {}:{}", host, port);
println!("👤 Creating user '{}' if not exists...", username);
let create_user_sql = format!(
"DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_user WHERE usename = '{}') THEN CREATE USER {} WITH PASSWORD '{}'; END IF; END $$;",
username, username, password
);
let mut cmd = Command::new("psql");
cmd.arg("-h")
.arg(host)
.arg("-p")
.arg(port.to_string())
.arg("-U")
.arg("postgres")
.arg("-c")
.arg(&create_user_sql);
let output = cmd.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.contains("already exists") {
eprintln!("⚠️ Failed to create user: {}", stderr);
}
} else {
println!("✅ User '{}' ready", username);
}
println!("📚 Creating database '{}' if not exists...", database);
let create_db_sql = format!("CREATE DATABASE {} OWNER {};", database, username);
let mut cmd = Command::new("psql");
cmd.arg("-h")
.arg(host)
.arg("-p")
.arg(port.to_string())
.arg("-U")
.arg("postgres")
.arg("-c")
.arg(&create_db_sql);
let output = cmd.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.contains("already exists") {
eprintln!("⚠️ Failed to create database: {}", stderr);
}
} else {
println!("✅ Database '{}' ready", database);
}
let grant_sql = format!(
"GRANT ALL PRIVILEGES ON DATABASE {} TO {};",
database, username
);
let mut cmd = Command::new("psql");
cmd.arg("-h")
.arg(host)
.arg("-p")
.arg(port.to_string())
.arg("-U")
.arg("postgres")
.arg("-c")
.arg(&grant_sql);
let output = cmd.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
eprintln!("⚠️ Failed to grant privileges: {}", stderr);
} else {
println!("✅ Privileges granted to '{}'", username);
}
let enable_uuid_sql = "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";";
let mut cmd = Command::new("psql");
cmd.arg("-h")
.arg(host)
.arg("-p")
.arg(port.to_string())
.arg("-U")
.arg(username)
.arg("-d")
.arg(database)
.arg("-c")
.arg(enable_uuid_sql);
if !password.is_empty() {
cmd.env("PGPASSWORD", password);
}
let output = cmd.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.contains("already exists") {
eprintln!("⚠️ Failed to enable UUID extension: {}", stderr);
}
} else {
println!("✅ UUID extension enabled");
}
let enable_vector_sql = "CREATE EXTENSION IF NOT EXISTS vector;";
let mut cmd = Command::new("psql");
cmd.arg("-h")
.arg(host)
.arg("-p")
.arg(port.to_string())
.arg("-U")
.arg(username)
.arg("-d")
.arg(database)
.arg("-c")
.arg(enable_vector_sql);
if !password.is_empty() {
cmd.env("PGPASSWORD", password);
}
let output = cmd.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.contains("already exists") && !stderr.contains("could not open extension") {
eprintln!("⚠️ pgvector extension not available (optional for basic functionality)");
}
} else {
println!("✅ pgvector extension enabled (for future use)");
}
Ok(())
}
pub async fn run_migrations(pool: &PgPool) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS memories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
content_hash VARCHAR(64) NOT NULL UNIQUE,
context TEXT NOT NULL,
summary TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
chunk_index INTEGER DEFAULT NULL,
total_chunks INTEGER DEFAULT NULL,
parent_id UUID DEFAULT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
"#,
)
.execute(pool)
.await?;
sqlx::query("CREATE UNIQUE INDEX IF NOT EXISTS idx_content_hash ON memories(content_hash)")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at DESC)")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_tags ON memories USING GIN(tags)")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_parent_id ON memories(parent_id)")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_parent_chunk ON memories(parent_id, chunk_index)")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_summary_fts ON memories USING GIN(to_tsvector('english', summary))")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_context_fts ON memories USING GIN(to_tsvector('english', context))")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_metadata ON memories USING GIN(metadata)")
.execute(pool)
.await
.ok();
sqlx::query("CREATE INDEX IF NOT EXISTS idx_content_created ON memories(content_hash, created_at DESC)")
.execute(pool)
.await
.ok();
sqlx::query(
r#"
CREATE OR REPLACE VIEW memory_stats AS
SELECT
(SELECT COUNT(*) FROM memories) AS total_memories,
(SELECT pg_size_pretty(pg_total_relation_size('memories'))) AS table_size,
(SELECT MAX(created_at) FROM memories) AS last_memory_created
"#,
)
.execute(pool)
.await
.ok();
Ok(())
}