lc/data/
database.rs

1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use rusqlite::{params, Connection};
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6
7#[derive(Debug, Clone)]
8pub struct ChatEntry {
9    pub chat_id: String,
10    pub model: String,
11    pub question: String,
12    pub response: String,
13    pub timestamp: DateTime<Utc>,
14    pub input_tokens: Option<i32>,
15    pub output_tokens: Option<i32>,
16}
17
18#[derive(Debug)]
19pub struct DatabaseStats {
20    pub total_entries: usize,
21    pub unique_sessions: usize,
22    pub file_size_bytes: u64,
23    pub date_range: Option<(DateTime<Utc>, DateTime<Utc>)>,
24    pub model_usage: Vec<(String, i64)>,
25}
26
27// Connection pool for reusing database connections
28pub struct ConnectionPool {
29    connections: Arc<Mutex<Vec<Connection>>>,
30    max_connections: usize,
31    db_path: PathBuf,
32}
33
34impl ConnectionPool {
35    pub fn new(db_path: PathBuf, max_connections: usize) -> Result<Self> {
36        let mut connections = Vec::with_capacity(max_connections);
37
38        // Pre-create initial connections
39        for _ in 0..std::cmp::min(2, max_connections) {
40            let conn = Connection::open(&db_path)?;
41            Self::configure_connection(&conn)?;
42            connections.push(conn);
43        }
44
45        Ok(Self {
46            connections: Arc::new(Mutex::new(connections)),
47            max_connections,
48            db_path,
49        })
50    }
51
52    fn configure_connection(conn: &Connection) -> Result<()> {
53        // Enable WAL mode for better concurrent performance
54        conn.pragma_update(None, "journal_mode", "WAL")?;
55        // Increase cache size for better performance
56        conn.pragma_update(None, "cache_size", 10000)?;
57        // Enable foreign keys
58        conn.pragma_update(None, "foreign_keys", true)?;
59        // Set synchronous to NORMAL for better performance
60        conn.pragma_update(None, "synchronous", "NORMAL")?;
61        Ok(())
62    }
63
64    pub fn get_connection(&self) -> Result<PooledConnection> {
65        let mut connections = self
66            .connections
67            .lock()
68            .map_err(|_| anyhow::anyhow!("Failed to acquire connection pool lock"))?;
69
70        if let Some(conn) = connections.pop() {
71            Ok(PooledConnection {
72                conn: Some(conn),
73                pool: self.connections.clone(),
74            })
75        } else if connections.len() < self.max_connections {
76            // Create new connection if under limit
77            let conn = Connection::open(&self.db_path)?;
78            Self::configure_connection(&conn)?;
79            Ok(PooledConnection {
80                conn: Some(conn),
81                pool: self.connections.clone(),
82            })
83        } else {
84            // Wait for a connection to become available
85            // In a real implementation, you might want to use a condition variable
86            // For now, create a new temporary connection
87            let conn = Connection::open(&self.db_path)?;
88            Self::configure_connection(&conn)?;
89            Ok(PooledConnection {
90                conn: Some(conn),
91                pool: self.connections.clone(),
92            })
93        }
94    }
95}
96
97// RAII wrapper for pooled connections
98pub struct PooledConnection {
99    conn: Option<Connection>,
100    pool: Arc<Mutex<Vec<Connection>>>,
101}
102
103impl PooledConnection {
104    pub fn execute(
105        &self,
106        sql: &str,
107        params: impl rusqlite::Params,
108    ) -> Result<usize, rusqlite::Error> {
109        self.conn
110            .as_ref()
111            .ok_or_else(|| rusqlite::Error::InvalidPath("Connection not available".into()))?
112            .execute(sql, params)
113    }
114
115    pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, rusqlite::Error>
116    where
117        P: rusqlite::Params,
118        F: FnOnce(&rusqlite::Row<'_>) -> Result<T, rusqlite::Error>,
119    {
120        self.conn
121            .as_ref()
122            .ok_or_else(|| rusqlite::Error::InvalidPath("Connection not available".into()))?
123            .query_row(sql, params, f)
124    }
125}
126
127impl Drop for PooledConnection {
128    fn drop(&mut self) {
129        if let Some(conn) = self.conn.take() {
130            if let Ok(mut connections) = self.pool.lock() {
131                connections.push(conn);
132            }
133            // If lock fails, connection is just dropped (acceptable for cleanup)
134        }
135    }
136}
137
138// Optimized Database struct with connection pooling
139pub struct Database {
140    pool: ConnectionPool,
141}
142
143impl Database {
144    pub fn new() -> Result<Self> {
145        let db_path = Self::database_path()?;
146        let pool = ConnectionPool::new(db_path, 5)?; // Max 5 connections
147
148        // Initialize database schema
149        let conn = pool.get_connection()?;
150        Self::initialize_schema(&conn)?;
151
152        Ok(Database { pool })
153    }
154
155    fn initialize_schema(conn: &PooledConnection) -> Result<()> {
156        // Create chat_logs table with optimized schema
157        conn.execute(
158            "CREATE TABLE IF NOT EXISTS chat_logs (
159                id INTEGER PRIMARY KEY AUTOINCREMENT,
160                chat_id TEXT NOT NULL,
161                model TEXT NOT NULL,
162                question TEXT NOT NULL,
163                response TEXT NOT NULL,
164                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
165                input_tokens INTEGER,
166                output_tokens INTEGER
167            )",
168            [],
169        )?;
170
171        // Add token columns to existing table if they don't exist (migration)
172        let _ = conn.execute("ALTER TABLE chat_logs ADD COLUMN input_tokens INTEGER", []);
173        let _ = conn.execute("ALTER TABLE chat_logs ADD COLUMN output_tokens INTEGER", []);
174
175        // Create session_state table for tracking current session
176        conn.execute(
177            "CREATE TABLE IF NOT EXISTS session_state (
178                key TEXT PRIMARY KEY,
179                value TEXT NOT NULL
180            )",
181            [],
182        )?;
183
184        // Create optimized indexes for better performance
185        conn.execute(
186            "CREATE INDEX IF NOT EXISTS idx_chat_logs_chat_id ON chat_logs(chat_id)",
187            [],
188        )?;
189
190        conn.execute(
191            "CREATE INDEX IF NOT EXISTS idx_chat_logs_timestamp ON chat_logs(timestamp DESC)",
192            [],
193        )?;
194
195        // Additional index for model statistics
196        conn.execute(
197            "CREATE INDEX IF NOT EXISTS idx_chat_logs_model ON chat_logs(model)",
198            [],
199        )?;
200
201        Ok(())
202    }
203
204    pub fn save_chat_entry_with_tokens(
205        &self,
206        chat_id: &str,
207        model: &str,
208        question: &str,
209        response: &str,
210        input_tokens: Option<i32>,
211        output_tokens: Option<i32>,
212    ) -> Result<()> {
213        let conn = self.pool.get_connection()?;
214
215        conn.execute(
216            "INSERT INTO chat_logs (chat_id, model, question, response, timestamp, input_tokens, output_tokens)
217             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
218            params![chat_id, model, question, response, Utc::now(), input_tokens, output_tokens]
219        )?;
220        Ok(())
221    }
222
223    pub fn get_chat_history(&self, chat_id: &str) -> Result<Vec<ChatEntry>> {
224        let conn = self.pool.get_connection()?;
225
226        let conn_ref = conn
227            .conn
228            .as_ref()
229            .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
230        let mut stmt = conn_ref.prepare(
231            "SELECT id, chat_id, model, question, response, timestamp, input_tokens, output_tokens
232             FROM chat_logs
233             WHERE chat_id = ?1
234             ORDER BY timestamp ASC",
235        )?;
236
237        let rows = stmt.query_map([chat_id], |row| {
238            Ok(ChatEntry {
239                chat_id: row.get(1)?,
240                model: row.get(2)?,
241                question: row.get(3)?,
242                response: row.get(4)?,
243                timestamp: row.get(5)?,
244                input_tokens: row.get(6).ok(),
245                output_tokens: row.get(7).ok(),
246            })
247        })?;
248
249        let mut entries = Vec::new();
250        for row in rows {
251            entries.push(row?);
252        }
253
254        Ok(entries)
255    }
256
257    // Optimized version with LIMIT for better performance on large datasets
258    pub fn get_all_logs(&self) -> Result<Vec<ChatEntry>> {
259        self.get_recent_logs(None)
260    }
261
262    pub fn get_recent_logs(&self, limit: Option<usize>) -> Result<Vec<ChatEntry>> {
263        let conn = self.pool.get_connection()?;
264
265        let sql = if let Some(limit) = limit {
266            format!(
267                "SELECT id, chat_id, model, question, response, timestamp, input_tokens, output_tokens
268                 FROM chat_logs
269                 ORDER BY timestamp DESC
270                 LIMIT {}",
271                limit
272            )
273        } else {
274            "SELECT id, chat_id, model, question, response, timestamp, input_tokens, output_tokens
275             FROM chat_logs
276             ORDER BY timestamp DESC"
277                .to_string()
278        };
279
280        let conn_ref = conn
281            .conn
282            .as_ref()
283            .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
284        let mut stmt = conn_ref.prepare(&sql)?;
285
286        let rows = stmt.query_map([], |row| {
287            Ok(ChatEntry {
288                chat_id: row.get(1)?,
289                model: row.get(2)?,
290                question: row.get(3)?,
291                response: row.get(4)?,
292                timestamp: row.get(5)?,
293                input_tokens: row.get(6).ok(),
294                output_tokens: row.get(7).ok(),
295            })
296        })?;
297
298        let mut entries = Vec::new();
299        for row in rows {
300            entries.push(row?);
301        }
302
303        Ok(entries)
304    }
305
306    pub fn set_current_session_id(&self, session_id: &str) -> Result<()> {
307        let conn = self.pool.get_connection()?;
308
309        conn.execute(
310            "INSERT OR REPLACE INTO session_state (key, value) VALUES ('current_session', ?1)",
311            [session_id],
312        )?;
313        Ok(())
314    }
315
316    pub fn get_current_session_id(&self) -> Result<Option<String>> {
317        let conn = self.pool.get_connection()?;
318
319        let conn_ref = conn
320            .conn
321            .as_ref()
322            .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
323        let mut stmt =
324            conn_ref.prepare("SELECT value FROM session_state WHERE key = 'current_session'")?;
325
326        let mut rows = stmt.query_map([], |row| Ok(row.get::<_, String>(0)?))?;
327
328        if let Some(row) = rows.next() {
329            Ok(Some(row?))
330        } else {
331            Ok(None)
332        }
333    }
334
335    pub fn purge_all_logs(&self) -> Result<()> {
336        let conn = self.pool.get_connection()?;
337
338        // Use transaction for atomic operation
339        conn.execute("BEGIN TRANSACTION", [])?;
340
341        match (|| -> Result<()> {
342            conn.execute("DELETE FROM chat_logs", [])?;
343            conn.execute("DELETE FROM session_state", [])?;
344            Ok(())
345        })() {
346            Ok(_) => {
347                conn.execute("COMMIT", [])?;
348                Ok(())
349            }
350            Err(e) => {
351                conn.execute("ROLLBACK", [])?;
352                Err(e)
353            }
354        }
355    }
356
357    /// Purge logs based on age (older than specified days)
358    pub fn purge_logs_by_age(&self, days: u32) -> Result<usize> {
359        let conn = self.pool.get_connection()?;
360
361        let cutoff_date = chrono::Utc::now() - chrono::Duration::days(days as i64);
362
363        let deleted_count =
364            conn.execute("DELETE FROM chat_logs WHERE timestamp < ?1", [cutoff_date])?;
365
366        Ok(deleted_count)
367    }
368
369    /// Purge logs to keep only the most recent N entries
370    pub fn purge_logs_keep_recent(&self, keep_count: usize) -> Result<usize> {
371        let conn = self.pool.get_connection()?;
372
373        // First, get the total count
374        let total_count: i64 =
375            conn.query_row("SELECT COUNT(*) FROM chat_logs", [], |row| row.get(0))?;
376
377        if total_count <= keep_count as i64 {
378            return Ok(0); // Nothing to purge
379        }
380
381        let to_delete = total_count - keep_count as i64;
382
383        let deleted_count = conn.execute(
384            "DELETE FROM chat_logs WHERE id IN (
385                SELECT id FROM chat_logs
386                ORDER BY timestamp ASC
387                LIMIT ?1
388            )",
389            [to_delete],
390        )?;
391
392        Ok(deleted_count)
393    }
394
395    /// Purge logs when database size exceeds threshold (in MB)
396    pub fn purge_logs_by_size(&self, max_size_mb: u64) -> Result<usize> {
397        let db_path = Self::database_path()?;
398        let current_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
399
400        let max_size_bytes = max_size_mb * 1024 * 1024;
401
402        if current_size <= max_size_bytes {
403            return Ok(0); // No purging needed
404        }
405
406        // Purge oldest 25% of entries to get under the size limit
407        let conn = self.pool.get_connection()?;
408        let total_count: i64 =
409            conn.query_row("SELECT COUNT(*) FROM chat_logs", [], |row| row.get(0))?;
410
411        let to_delete = (total_count as f64 * 0.25) as i64;
412
413        if to_delete > 0 {
414            let deleted_count = conn.execute(
415                "DELETE FROM chat_logs WHERE id IN (
416                    SELECT id FROM chat_logs
417                    ORDER BY timestamp ASC
418                    LIMIT ?1
419                )",
420                [to_delete],
421            )?;
422
423            // Run VACUUM to reclaim space
424            conn.execute("VACUUM", [])?;
425
426            Ok(deleted_count)
427        } else {
428            Ok(0)
429        }
430    }
431
432    /// Smart purge with configurable thresholds
433    pub fn smart_purge(
434        &self,
435        max_age_days: Option<u32>,
436        max_entries: Option<usize>,
437        max_size_mb: Option<u64>,
438    ) -> Result<usize> {
439        let mut total_deleted = 0;
440
441        // Purge by age first
442        if let Some(days) = max_age_days {
443            total_deleted += self.purge_logs_by_age(days)?;
444        }
445
446        // Then purge by count
447        if let Some(max_count) = max_entries {
448            total_deleted += self.purge_logs_keep_recent(max_count)?;
449        }
450
451        // Finally check size
452        if let Some(max_mb) = max_size_mb {
453            total_deleted += self.purge_logs_by_size(max_mb)?;
454        }
455
456        Ok(total_deleted)
457    }
458
459    pub fn clear_session(&self, session_id: &str) -> Result<()> {
460        let conn = self.pool.get_connection()?;
461
462        conn.execute("DELETE FROM chat_logs WHERE chat_id = ?1", [session_id])?;
463        Ok(())
464    }
465
466    pub fn get_stats(&self) -> Result<DatabaseStats> {
467        let conn = self.pool.get_connection()?;
468
469        // Use single query with subqueries for better performance
470        let total_entries: i64 =
471            conn.query_row("SELECT COUNT(*) FROM chat_logs", [], |row| row.get(0))?;
472
473        let unique_sessions: i64 =
474            conn.query_row("SELECT COUNT(DISTINCT chat_id) FROM chat_logs", [], |row| {
475                row.get(0)
476            })?;
477
478        // Get database file size
479        let db_path = Self::database_path()?;
480        let file_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
481
482        // Get date range in single query
483        let date_range = if total_entries > 0 {
484            let (earliest, latest): (Option<DateTime<Utc>>, Option<DateTime<Utc>>) = conn
485                .query_row(
486                    "SELECT MIN(timestamp), MAX(timestamp) FROM chat_logs",
487                    [],
488                    |row| Ok((row.get(0).ok(), row.get(1).ok())),
489                )?;
490
491            match (earliest, latest) {
492                (Some(e), Some(l)) => Some((e, l)),
493                _ => None,
494            }
495        } else {
496            None
497        };
498
499        // Get model usage statistics
500        let conn_ref = conn
501            .conn
502            .as_ref()
503            .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
504        let mut stmt = conn_ref.prepare(
505            "SELECT model, COUNT(*) as count FROM chat_logs GROUP BY model ORDER BY count DESC",
506        )?;
507
508        let model_stats = stmt
509            .query_map([], |row| {
510                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
511            })?
512            .collect::<Result<Vec<_>, _>>()?;
513
514        Ok(DatabaseStats {
515            total_entries: total_entries as usize,
516            unique_sessions: unique_sessions as usize,
517            file_size_bytes: file_size,
518            date_range,
519            model_usage: model_stats,
520        })
521    }
522
523    fn database_path() -> Result<PathBuf> {
524        // Use the same config directory logic as Config::config_dir() for test isolation
525        let config_dir = crate::config::Config::config_dir()?;
526        std::fs::create_dir_all(&config_dir)?;
527        Ok(config_dir.join("logs.db"))
528    }
529}
530
531// Thread-safe singleton for global database access
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use tempfile::tempdir;
537
538    #[test]
539    fn test_connection_pool() {
540        let temp_dir = tempdir().unwrap();
541        let db_path = temp_dir.path().join("test.db");
542
543        let pool = ConnectionPool::new(db_path, 3).unwrap();
544
545        // Test getting multiple connections
546        let conn1 = pool.get_connection().unwrap();
547        let conn2 = pool.get_connection().unwrap();
548        let conn3 = pool.get_connection().unwrap();
549
550        // All connections should be valid
551        assert!(conn1.query_row("SELECT 1", [], |_| Ok(())).is_ok());
552        assert!(conn2.query_row("SELECT 1", [], |_| Ok(())).is_ok());
553        assert!(conn3.query_row("SELECT 1", [], |_| Ok(())).is_ok());
554    }
555
556    #[test]
557    fn test_optimized_database() {
558        let temp_dir = tempdir().unwrap();
559        let db_path = temp_dir.path().join("test.db");
560
561        // Create test database with isolated path
562        let pool = ConnectionPool::new(db_path, 3).unwrap();
563        let db = Database { pool };
564
565        // Initialize schema for test database
566        let conn = db.pool.get_connection().unwrap();
567        Database::initialize_schema(&conn).unwrap();
568        drop(conn);
569
570        // Test saving and retrieving
571        db.save_chat_entry_with_tokens(
572            "test_session",
573            "test_model",
574            "test question",
575            "test response",
576            Some(100),
577            Some(50),
578        )
579        .unwrap();
580
581        let history = db.get_chat_history("test_session").unwrap();
582        assert_eq!(history.len(), 1);
583        assert_eq!(history[0].question, "test question");
584        assert_eq!(history[0].input_tokens, Some(100));
585        assert_eq!(history[0].output_tokens, Some(50));
586    }
587}