1use anyhow::Result;
2use sqlx::postgres::{PgPool, PgPoolOptions};
3use std::time::Duration;
4use tracing::info;
5
6#[derive(Debug, Clone)]
7pub struct ConnectionConfig {
8 pub host: String,
9 pub port: u16,
10 pub database: String,
11 pub username: String,
12 pub password: String,
13 pub max_connections: u32,
14 pub min_connections: u32,
15 pub connection_timeout_seconds: u64,
16 pub idle_timeout_seconds: u64,
17 pub max_lifetime_seconds: u64,
18 pub statement_timeout_seconds: u64,
20 pub enable_prepared_statements: bool,
21 pub enable_connection_validation: bool,
22}
23
24impl Default for ConnectionConfig {
25 fn default() -> Self {
26 Self {
27 host: "localhost".to_string(),
28 port: 5432,
29 database: "codex_memory".to_string(),
30 username: "postgres".to_string(),
31 password: "postgres".to_string(),
32 max_connections: 100, min_connections: 20, connection_timeout_seconds: 10, idle_timeout_seconds: 300, max_lifetime_seconds: 3600, statement_timeout_seconds: 300, enable_prepared_statements: true, enable_connection_validation: true, }
42 }
43}
44
45#[derive(Debug)]
46pub struct ConnectionPool {
47 pool: PgPool,
48 config: ConnectionConfig,
49}
50
51impl ConnectionPool {
52 pub async fn new(config: ConnectionConfig) -> Result<Self> {
53 let mut connection_string = format!(
54 "postgres://{}:{}@{}:{}/{}",
55 config.username, config.password, config.host, config.port, config.database
56 );
57
58 connection_string.push_str(&format!(
60 "?statement_timeout={}s&prepared_statement_cache_queries={}&tcp_keepalives_idle=60&tcp_keepalives_interval=30&tcp_keepalives_count=3",
61 config.statement_timeout_seconds,
62 if config.enable_prepared_statements { "64" } else { "0" }
63 ));
64
65 let pool = PgPoolOptions::new()
66 .max_connections(config.max_connections)
67 .min_connections(config.min_connections)
68 .acquire_timeout(Duration::from_secs(config.connection_timeout_seconds))
69 .idle_timeout(Some(Duration::from_secs(config.idle_timeout_seconds)))
70 .max_lifetime(Some(Duration::from_secs(config.max_lifetime_seconds)))
71 .test_before_acquire(config.enable_connection_validation)
72 .connect(&connection_string)
73 .await?;
74
75 sqlx::query("SELECT 1").fetch_one(&pool).await?;
77
78 info!(
79 "Connected to PostgreSQL at {}:{}/{} with {} max connections",
80 config.host, config.port, config.database, config.max_connections
81 );
82
83 Ok(Self { pool, config })
84 }
85
86 pub fn pool(&self) -> &PgPool {
87 &self.pool
88 }
89
90 pub async fn check_health(&self) -> Result<bool> {
91 match sqlx::query("SELECT 1").fetch_one(&self.pool).await {
92 Ok(_) => Ok(true),
93 Err(_) => Ok(false),
94 }
95 }
96
97 pub async fn get_pool_stats(&self) -> PoolStats {
98 let size = self.pool.size();
99 let idle = self.pool.num_idle() as u32;
100 PoolStats {
101 size,
102 idle,
103 max_size: self.config.max_connections,
104 active_connections: size - idle,
105 waiting_for_connection: 0, total_connections_created: 0, connection_errors: 0, }
109 }
110
111 pub async fn close(&self) {
112 self.pool.close().await;
113 info!("Connection pool closed");
114 }
115}
116
117#[derive(Debug, Clone)]
118pub struct PoolStats {
119 pub size: u32,
120 pub idle: u32,
121 pub max_size: u32,
122 pub active_connections: u32,
123 pub waiting_for_connection: u32,
124 pub total_connections_created: u64,
125 pub connection_errors: u64,
126}
127
128impl PoolStats {
129 pub fn utilization_percentage(&self) -> f32 {
130 if self.max_size == 0 {
131 return 0.0;
132 }
133 (self.active_connections as f32 / self.max_size as f32) * 100.0
134 }
135
136 pub fn is_saturated(&self, threshold: f32) -> bool {
137 self.utilization_percentage() >= threshold
138 }
139
140 pub fn needs_attention(&self) -> bool {
142 self.is_saturated(70.0)
143 }
144
145 pub fn is_critically_saturated(&self) -> bool {
147 self.is_saturated(90.0)
148 }
149
150 pub fn health_status(&self) -> String {
152 let utilization = self.utilization_percentage();
153 match utilization {
154 _ if utilization >= 90.0 => "CRITICAL: Pool >90% utilized".to_string(),
155 _ if utilization >= 70.0 => "WARNING: Pool >70% utilized".to_string(),
156 _ => format!("HEALTHY: Pool {utilization:.1}% utilized"),
157 }
158 }
159}
160
161pub async fn create_connection_pool(config: ConnectionConfig) -> Result<PgPool> {
162 let connection_string = format!(
163 "postgres://{}:{}@{}:{}/{}",
164 config.username, config.password, config.host, config.port, config.database
165 );
166
167 let pool = PgPoolOptions::new()
168 .max_connections(config.max_connections)
169 .min_connections(config.min_connections)
170 .acquire_timeout(Duration::from_secs(config.connection_timeout_seconds))
171 .idle_timeout(Duration::from_secs(config.idle_timeout_seconds))
172 .max_lifetime(Duration::from_secs(config.max_lifetime_seconds))
173 .connect(&connection_string)
174 .await?;
175
176 Ok(pool)
177}
178
179pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool> {
181 let optimized_max_connections = std::cmp::max(max_connections, 100); let min_connections = std::cmp::max(optimized_max_connections / 5, 20); let pool = PgPoolOptions::new()
186 .max_connections(optimized_max_connections)
187 .min_connections(min_connections)
188 .acquire_timeout(Duration::from_secs(10)) .idle_timeout(Some(Duration::from_secs(300))) .max_lifetime(Some(Duration::from_secs(3600))) .test_before_acquire(true) .connect(database_url)
193 .await?;
194
195 sqlx::query("SELECT vector_dims('[1,2,3]'::vector)")
197 .fetch_one(&pool)
198 .await
199 .map_err(|e| anyhow::anyhow!("Vector capability test failed: {}", e))?;
200
201 info!(
202 "Connected to PostgreSQL with {} max connections ({} min) - Vector operations enabled",
203 optimized_max_connections, min_connections
204 );
205 Ok(pool)
206}
207
208pub fn get_pool(pool: &PgPool) -> &PgPool {
209 pool
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_pool_stats_utilization() {
218 let stats = PoolStats {
219 size: 50,
220 idle: 20,
221 max_size: 100,
222 active_connections: 30, waiting_for_connection: 0,
224 total_connections_created: 100,
225 connection_errors: 0,
226 };
227
228 assert!((stats.utilization_percentage() - 30.0).abs() < 0.01);
229 assert!(!stats.is_saturated(70.0));
230 assert!(stats.is_saturated(30.0));
231 assert!(!stats.needs_attention());
232 assert!(!stats.is_critically_saturated());
233 }
234
235 #[test]
236 fn test_default_config() {
237 let config = ConnectionConfig::default();
238 assert_eq!(config.host, "localhost");
239 assert_eq!(config.port, 5432);
240 assert_eq!(config.max_connections, 100);
241 assert_eq!(config.min_connections, 20);
242 assert_eq!(config.statement_timeout_seconds, 300);
243 assert!(config.enable_prepared_statements);
244 assert!(config.enable_connection_validation);
245 }
246
247 #[test]
248 fn test_pool_stats_health_status() {
249 let healthy_stats = PoolStats {
251 size: 30,
252 idle: 20,
253 max_size: 100,
254 active_connections: 10,
255 waiting_for_connection: 0,
256 total_connections_created: 50,
257 connection_errors: 0,
258 };
259 assert!(healthy_stats.health_status().contains("HEALTHY"));
260
261 let warning_stats = PoolStats {
263 size: 80,
264 idle: 5,
265 max_size: 100,
266 active_connections: 75,
267 waiting_for_connection: 0,
268 total_connections_created: 150,
269 connection_errors: 0,
270 };
271 assert!(warning_stats.health_status().contains("WARNING"));
272 assert!(warning_stats.needs_attention());
273
274 let critical_stats = PoolStats {
276 size: 95,
277 idle: 2,
278 max_size: 100,
279 active_connections: 93,
280 waiting_for_connection: 5,
281 total_connections_created: 200,
282 connection_errors: 0,
283 };
284 assert!(critical_stats.health_status().contains("CRITICAL"));
285 assert!(critical_stats.is_critically_saturated());
286 }
287}