codex_memory/database/
core.rs1use crate::error::Result;
2use sqlx::postgres::{PgPool, PgPoolOptions};
3use std::process::Command;
4use std::time::Duration;
5use url::Url;
6
7pub async fn create_pool(database_url: &str) -> Result<PgPool> {
9 let pool = PgPoolOptions::new()
10 .max_connections(20) .min_connections(2) .acquire_timeout(Duration::from_secs(30)) .idle_timeout(Duration::from_secs(600)) .max_lifetime(Duration::from_secs(1800)) .test_before_acquire(true) .connect(database_url)
17 .await?;
18
19 Ok(pool)
20}
21
22pub struct DatabaseSetup {
24 pub pool: PgPool,
25}
26
27impl DatabaseSetup {
28 pub async fn new(database_url: &str) -> Result<Self> {
30 let pool = create_pool(database_url).await?;
31 Ok(Self { pool })
32 }
33
34 pub fn pool(&self) -> &PgPool {
36 &self.pool
37 }
38}
39
40pub async fn setup_local_database() -> Result<()> {
42 println!("🔧 Setting up local PostgreSQL database...");
43
44 let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
46 "postgresql://codex_user:codex_pass@localhost:5432/codex_db".to_string()
47 });
48
49 let parsed_url = Url::parse(&database_url)?;
50 let username = parsed_url.username();
51 let password = parsed_url.password().unwrap_or("codex_pass");
52 let database = parsed_url.path().trim_start_matches('/');
53 let host = parsed_url.host_str().unwrap_or("localhost");
54 let port = parsed_url.port().unwrap_or(5432);
55
56 println!("📍 Connecting to PostgreSQL at {}:{}", host, port);
57 println!("👤 Creating user '{}' if not exists...", username);
58
59 let create_user_sql = format!(
61 "DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_user WHERE usename = '{}') THEN CREATE USER {} WITH PASSWORD '{}'; END IF; END $$;",
62 username, username, password
63 );
64
65 let mut cmd = Command::new("psql");
66 cmd.arg("-h")
67 .arg(host)
68 .arg("-p")
69 .arg(port.to_string())
70 .arg("-U")
71 .arg("postgres")
72 .arg("-c")
73 .arg(&create_user_sql);
74
75 let output = cmd.output()?;
76 if !output.status.success() {
77 let stderr = String::from_utf8_lossy(&output.stderr);
78 if !stderr.contains("already exists") {
79 eprintln!("⚠️ Failed to create user: {}", stderr);
80 }
81 } else {
82 println!("✅ User '{}' ready", username);
83 }
84
85 println!("📚 Creating database '{}' if not exists...", database);
86
87 let create_db_sql = format!("CREATE DATABASE {} OWNER {};", database, username);
89
90 let mut cmd = Command::new("psql");
91 cmd.arg("-h")
92 .arg(host)
93 .arg("-p")
94 .arg(port.to_string())
95 .arg("-U")
96 .arg("postgres")
97 .arg("-c")
98 .arg(&create_db_sql);
99
100 let output = cmd.output()?;
101 if !output.status.success() {
102 let stderr = String::from_utf8_lossy(&output.stderr);
103 if !stderr.contains("already exists") {
104 eprintln!("⚠️ Failed to create database: {}", stderr);
105 }
106 } else {
107 println!("✅ Database '{}' ready", database);
108 }
109
110 let grant_sql = format!(
112 "GRANT ALL PRIVILEGES ON DATABASE {} TO {};",
113 database, username
114 );
115
116 let mut cmd = Command::new("psql");
117 cmd.arg("-h")
118 .arg(host)
119 .arg("-p")
120 .arg(port.to_string())
121 .arg("-U")
122 .arg("postgres")
123 .arg("-c")
124 .arg(&grant_sql);
125
126 let output = cmd.output()?;
127 if !output.status.success() {
128 let stderr = String::from_utf8_lossy(&output.stderr);
129 eprintln!("⚠️ Failed to grant privileges: {}", stderr);
130 } else {
131 println!("✅ Privileges granted to '{}'", username);
132 }
133
134 let enable_uuid_sql = "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";";
136
137 let mut cmd = Command::new("psql");
138 cmd.arg("-h")
139 .arg(host)
140 .arg("-p")
141 .arg(port.to_string())
142 .arg("-U")
143 .arg(username)
144 .arg("-d")
145 .arg(database)
146 .arg("-c")
147 .arg(enable_uuid_sql);
148
149 if !password.is_empty() {
150 cmd.env("PGPASSWORD", password);
151 }
152
153 let output = cmd.output()?;
154 if !output.status.success() {
155 let stderr = String::from_utf8_lossy(&output.stderr);
156 if !stderr.contains("already exists") {
157 eprintln!("⚠️ Failed to enable UUID extension: {}", stderr);
158 }
159 } else {
160 println!("✅ UUID extension enabled");
161 }
162
163 let enable_vector_sql = "CREATE EXTENSION IF NOT EXISTS vector;";
165
166 let mut cmd = Command::new("psql");
167 cmd.arg("-h")
168 .arg(host)
169 .arg("-p")
170 .arg(port.to_string())
171 .arg("-U")
172 .arg(username)
173 .arg("-d")
174 .arg(database)
175 .arg("-c")
176 .arg(enable_vector_sql);
177
178 if !password.is_empty() {
179 cmd.env("PGPASSWORD", password);
180 }
181
182 let output = cmd.output()?;
183 if !output.status.success() {
184 let stderr = String::from_utf8_lossy(&output.stderr);
185 if !stderr.contains("already exists") && !stderr.contains("could not open extension") {
186 eprintln!("⚠️ pgvector extension not available (optional for basic functionality)");
187 }
188 } else {
189 println!("✅ pgvector extension enabled (for future use)");
190 }
191
192 Ok(())
193}
194
195pub async fn run_migrations(pool: &PgPool) -> Result<()> {
197 sqlx::query(
199 r#"
200 CREATE TABLE IF NOT EXISTS memories (
201 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
202 content TEXT NOT NULL,
203 content_hash VARCHAR(64) NOT NULL UNIQUE,
204 context TEXT NOT NULL,
205 summary TEXT NOT NULL,
206 metadata JSONB DEFAULT '{}',
207 tags TEXT[] DEFAULT '{}',
208 chunk_index INTEGER DEFAULT NULL,
209 total_chunks INTEGER DEFAULT NULL,
210 parent_id UUID DEFAULT NULL,
211 created_at TIMESTAMPTZ DEFAULT NOW(),
212 updated_at TIMESTAMPTZ DEFAULT NOW()
213 )
214 "#,
215 )
216 .execute(pool)
217 .await?;
218
219 sqlx::query("CREATE UNIQUE INDEX IF NOT EXISTS idx_content_hash ON memories(content_hash)")
221 .execute(pool)
222 .await
223 .ok();
224
225 sqlx::query("CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at DESC)")
226 .execute(pool)
227 .await
228 .ok();
229
230 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tags ON memories USING GIN(tags)")
231 .execute(pool)
232 .await
233 .ok();
234
235 sqlx::query("CREATE INDEX IF NOT EXISTS idx_parent_id ON memories(parent_id)")
236 .execute(pool)
237 .await
238 .ok();
239
240 sqlx::query("CREATE INDEX IF NOT EXISTS idx_parent_chunk ON memories(parent_id, chunk_index)")
241 .execute(pool)
242 .await
243 .ok();
244
245 sqlx::query("CREATE INDEX IF NOT EXISTS idx_summary ON memories USING GIN(to_tsvector('english', summary))")
246 .execute(pool)
247 .await
248 .ok();
249
250 Ok(())
254}