codex_memory/memory/
connection.rs

1use anyhow::Result;
2use sqlx::postgres::{PgPool, PgPoolOptions};
3use std::time::Duration;
4use tracing::info;
5
6#[derive(Debug, Clone)]
7pub struct ConnectionConfig {
8    pub host: String,
9    pub port: u16,
10    pub database: String,
11    pub username: String,
12    pub password: String,
13    pub max_connections: u32,
14    pub min_connections: u32,
15    pub connection_timeout_seconds: u64,
16    pub idle_timeout_seconds: u64,
17    pub max_lifetime_seconds: u64,
18    // Vector operation specific configurations
19    pub statement_timeout_seconds: u64,
20    pub enable_prepared_statements: bool,
21    pub enable_connection_validation: bool,
22}
23
24impl Default for ConnectionConfig {
25    fn default() -> Self {
26        Self {
27            host: "localhost".to_string(),
28            port: 5432,
29            database: "codex_memory".to_string(),
30            username: "postgres".to_string(),
31            password: "postgres".to_string(),
32            // Optimized for high-throughput vector operations (>1000 ops/sec)
33            max_connections: 100, // Minimum 100 as per HIGH-004 requirements
34            min_connections: 20,  // Higher minimum to reduce connection establishment overhead
35            connection_timeout_seconds: 10, // Shorter timeout for faster failure detection
36            idle_timeout_seconds: 300, // 5 minutes - prevent resource waste
37            max_lifetime_seconds: 3600, // 1 hour - balance recycling vs overhead
38            statement_timeout_seconds: 300, // 5 minutes for vector operations
39            enable_prepared_statements: true, // Optimize repeated queries
40            enable_connection_validation: true, // Ensure connection health
41        }
42    }
43}
44
45#[derive(Debug)]
46pub struct ConnectionPool {
47    pool: PgPool,
48    config: ConnectionConfig,
49}
50
51impl ConnectionPool {
52    pub async fn new(config: ConnectionConfig) -> Result<Self> {
53        let mut connection_string = format!(
54            "postgres://{}:{}@{}:{}/{}",
55            config.username, config.password, config.host, config.port, config.database
56        );
57
58        // Add vector operation optimizations to connection string
59        connection_string.push_str(&format!(
60            "?statement_timeout={}s&prepared_statement_cache_queries={}&tcp_keepalives_idle=60&tcp_keepalives_interval=30&tcp_keepalives_count=3",
61            config.statement_timeout_seconds,
62            if config.enable_prepared_statements { "64" } else { "0" }
63        ));
64
65        let pool = PgPoolOptions::new()
66            .max_connections(config.max_connections)
67            .min_connections(config.min_connections)
68            .acquire_timeout(Duration::from_secs(config.connection_timeout_seconds))
69            .idle_timeout(Some(Duration::from_secs(config.idle_timeout_seconds)))
70            .max_lifetime(Some(Duration::from_secs(config.max_lifetime_seconds)))
71            .test_before_acquire(config.enable_connection_validation)
72            .connect(&connection_string)
73            .await?;
74
75        // Test the connection
76        sqlx::query("SELECT 1").fetch_one(&pool).await?;
77
78        info!(
79            "Connected to PostgreSQL at {}:{}/{} with {} max connections",
80            config.host, config.port, config.database, config.max_connections
81        );
82
83        Ok(Self { pool, config })
84    }
85
86    pub fn pool(&self) -> &PgPool {
87        &self.pool
88    }
89
90    pub async fn check_health(&self) -> Result<bool> {
91        match sqlx::query("SELECT 1").fetch_one(&self.pool).await {
92            Ok(_) => Ok(true),
93            Err(_) => Ok(false),
94        }
95    }
96
97    pub async fn get_pool_stats(&self) -> PoolStats {
98        let size = self.pool.size();
99        let idle = self.pool.num_idle() as u32;
100        PoolStats {
101            size,
102            idle,
103            max_size: self.config.max_connections,
104            active_connections: size - idle,
105            waiting_for_connection: 0, // SQLx doesn't expose this directly
106            total_connections_created: 0, // Would need custom tracking
107            connection_errors: 0,      // Would need custom tracking
108        }
109    }
110
111    pub async fn close(&self) {
112        self.pool.close().await;
113        info!("Connection pool closed");
114    }
115}
116
117#[derive(Debug, Clone)]
118pub struct PoolStats {
119    pub size: u32,
120    pub idle: u32,
121    pub max_size: u32,
122    pub active_connections: u32,
123    pub waiting_for_connection: u32,
124    pub total_connections_created: u64,
125    pub connection_errors: u64,
126}
127
128impl PoolStats {
129    pub fn utilization_percentage(&self) -> f32 {
130        if self.max_size == 0 {
131            return 0.0;
132        }
133        (self.active_connections as f32 / self.max_size as f32) * 100.0
134    }
135
136    pub fn is_saturated(&self, threshold: f32) -> bool {
137        self.utilization_percentage() >= threshold
138    }
139
140    /// Check if pool is at warning level (70% utilization as per requirements)
141    pub fn needs_attention(&self) -> bool {
142        self.is_saturated(70.0)
143    }
144
145    /// Check if pool is critically saturated (90% utilization)
146    pub fn is_critically_saturated(&self) -> bool {
147        self.is_saturated(90.0)
148    }
149
150    /// Get health status message
151    pub fn health_status(&self) -> String {
152        let utilization = self.utilization_percentage();
153        match utilization {
154            _ if utilization >= 90.0 => "CRITICAL: Pool >90% utilized".to_string(),
155            _ if utilization >= 70.0 => "WARNING: Pool >70% utilized".to_string(),
156            _ => format!("HEALTHY: Pool {utilization:.1}% utilized"),
157        }
158    }
159}
160
161pub async fn create_connection_pool(config: ConnectionConfig) -> Result<PgPool> {
162    let connection_string = format!(
163        "postgres://{}:{}@{}:{}/{}",
164        config.username, config.password, config.host, config.port, config.database
165    );
166
167    let pool = PgPoolOptions::new()
168        .max_connections(config.max_connections)
169        .min_connections(config.min_connections)
170        .acquire_timeout(Duration::from_secs(config.connection_timeout_seconds))
171        .idle_timeout(Duration::from_secs(config.idle_timeout_seconds))
172        .max_lifetime(Duration::from_secs(config.max_lifetime_seconds))
173        .connect(&connection_string)
174        .await?;
175
176    Ok(pool)
177}
178
179// Optimized pool creation for high-throughput vector operations
180pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool> {
181    // Apply HIGH-004 optimization defaults
182    let optimized_max_connections = std::cmp::max(max_connections, 100); // Enforce minimum 100
183    let min_connections = std::cmp::max(optimized_max_connections / 5, 20); // 20% minimum, at least 20
184
185    let pool = PgPoolOptions::new()
186        .max_connections(optimized_max_connections)
187        .min_connections(min_connections)
188        .acquire_timeout(Duration::from_secs(10)) // Fast failure detection
189        .idle_timeout(Some(Duration::from_secs(300))) // 5 minutes
190        .max_lifetime(Some(Duration::from_secs(3600))) // 1 hour
191        .test_before_acquire(true) // Validate connections
192        .connect(database_url)
193        .await?;
194
195    // Test the connection with vector capability
196    sqlx::query("SELECT vector_dims('[1,2,3]'::vector)")
197        .fetch_one(&pool)
198        .await
199        .map_err(|e| anyhow::anyhow!("Vector capability test failed: {}", e))?;
200
201    info!(
202        "Connected to PostgreSQL with {} max connections ({} min) - Vector operations enabled",
203        optimized_max_connections, min_connections
204    );
205    Ok(pool)
206}
207
208pub fn get_pool(pool: &PgPool) -> &PgPool {
209    pool
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_pool_stats_utilization() {
218        let stats = PoolStats {
219            size: 50,
220            idle: 20,
221            max_size: 100,
222            active_connections: 30, // size - idle = 50 - 20 = 30
223            waiting_for_connection: 0,
224            total_connections_created: 100,
225            connection_errors: 0,
226        };
227
228        assert!((stats.utilization_percentage() - 30.0).abs() < 0.01);
229        assert!(!stats.is_saturated(70.0));
230        assert!(stats.is_saturated(30.0));
231        assert!(!stats.needs_attention());
232        assert!(!stats.is_critically_saturated());
233    }
234
235    #[test]
236    fn test_default_config() {
237        let config = ConnectionConfig::default();
238        assert_eq!(config.host, "localhost");
239        assert_eq!(config.port, 5432);
240        assert_eq!(config.max_connections, 100);
241        assert_eq!(config.min_connections, 20);
242        assert_eq!(config.statement_timeout_seconds, 300);
243        assert!(config.enable_prepared_statements);
244        assert!(config.enable_connection_validation);
245    }
246
247    #[test]
248    fn test_pool_stats_health_status() {
249        // Test healthy status
250        let healthy_stats = PoolStats {
251            size: 30,
252            idle: 20,
253            max_size: 100,
254            active_connections: 10,
255            waiting_for_connection: 0,
256            total_connections_created: 50,
257            connection_errors: 0,
258        };
259        assert!(healthy_stats.health_status().contains("HEALTHY"));
260
261        // Test warning status
262        let warning_stats = PoolStats {
263            size: 80,
264            idle: 5,
265            max_size: 100,
266            active_connections: 75,
267            waiting_for_connection: 0,
268            total_connections_created: 150,
269            connection_errors: 0,
270        };
271        assert!(warning_stats.health_status().contains("WARNING"));
272        assert!(warning_stats.needs_attention());
273
274        // Test critical status
275        let critical_stats = PoolStats {
276            size: 95,
277            idle: 2,
278            max_size: 100,
279            active_connections: 93,
280            waiting_for_connection: 5,
281            total_connections_created: 200,
282            connection_errors: 0,
283        };
284        assert!(critical_stats.health_status().contains("CRITICAL"));
285        assert!(critical_stats.is_critically_saturated());
286    }
287}