1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use rusqlite::{params, Connection};
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6
7#[derive(Debug, Clone)]
8pub struct ChatEntry {
9 pub chat_id: String,
10 pub model: String,
11 pub question: String,
12 pub response: String,
13 pub timestamp: DateTime<Utc>,
14 pub input_tokens: Option<i32>,
15 pub output_tokens: Option<i32>,
16}
17
18#[derive(Debug)]
19pub struct DatabaseStats {
20 pub total_entries: usize,
21 pub unique_sessions: usize,
22 pub file_size_bytes: u64,
23 pub date_range: Option<(DateTime<Utc>, DateTime<Utc>)>,
24 pub model_usage: Vec<(String, i64)>,
25}
26
27pub struct ConnectionPool {
29 connections: Arc<Mutex<Vec<Connection>>>,
30 max_connections: usize,
31 db_path: PathBuf,
32}
33
34impl ConnectionPool {
35 pub fn new(db_path: PathBuf, max_connections: usize) -> Result<Self> {
36 let mut connections = Vec::with_capacity(max_connections);
37
38 for _ in 0..std::cmp::min(2, max_connections) {
40 let conn = Connection::open(&db_path)?;
41 Self::configure_connection(&conn)?;
42 connections.push(conn);
43 }
44
45 Ok(Self {
46 connections: Arc::new(Mutex::new(connections)),
47 max_connections,
48 db_path,
49 })
50 }
51
52 fn configure_connection(conn: &Connection) -> Result<()> {
53 conn.pragma_update(None, "journal_mode", "WAL")?;
55 conn.pragma_update(None, "cache_size", 10000)?;
57 conn.pragma_update(None, "foreign_keys", true)?;
59 conn.pragma_update(None, "synchronous", "NORMAL")?;
61 Ok(())
62 }
63
64 pub fn get_connection(&self) -> Result<PooledConnection> {
65 let mut connections = self
66 .connections
67 .lock()
68 .map_err(|_| anyhow::anyhow!("Failed to acquire connection pool lock"))?;
69
70 if let Some(conn) = connections.pop() {
71 Ok(PooledConnection {
72 conn: Some(conn),
73 pool: self.connections.clone(),
74 })
75 } else if connections.len() < self.max_connections {
76 let conn = Connection::open(&self.db_path)?;
78 Self::configure_connection(&conn)?;
79 Ok(PooledConnection {
80 conn: Some(conn),
81 pool: self.connections.clone(),
82 })
83 } else {
84 let conn = Connection::open(&self.db_path)?;
88 Self::configure_connection(&conn)?;
89 Ok(PooledConnection {
90 conn: Some(conn),
91 pool: self.connections.clone(),
92 })
93 }
94 }
95}
96
97pub struct PooledConnection {
99 conn: Option<Connection>,
100 pool: Arc<Mutex<Vec<Connection>>>,
101}
102
103impl PooledConnection {
104 pub fn execute(
105 &self,
106 sql: &str,
107 params: impl rusqlite::Params,
108 ) -> Result<usize, rusqlite::Error> {
109 self.conn
110 .as_ref()
111 .ok_or_else(|| rusqlite::Error::InvalidPath("Connection not available".into()))?
112 .execute(sql, params)
113 }
114
115 pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, rusqlite::Error>
116 where
117 P: rusqlite::Params,
118 F: FnOnce(&rusqlite::Row<'_>) -> Result<T, rusqlite::Error>,
119 {
120 self.conn
121 .as_ref()
122 .ok_or_else(|| rusqlite::Error::InvalidPath("Connection not available".into()))?
123 .query_row(sql, params, f)
124 }
125}
126
127impl Drop for PooledConnection {
128 fn drop(&mut self) {
129 if let Some(conn) = self.conn.take() {
130 if let Ok(mut connections) = self.pool.lock() {
131 connections.push(conn);
132 }
133 }
135 }
136}
137
138pub struct Database {
140 pool: ConnectionPool,
141}
142
143impl Database {
144 pub fn new() -> Result<Self> {
145 let db_path = Self::database_path()?;
146 let pool = ConnectionPool::new(db_path, 5)?; let conn = pool.get_connection()?;
150 Self::initialize_schema(&conn)?;
151
152 Ok(Database { pool })
153 }
154
155 fn initialize_schema(conn: &PooledConnection) -> Result<()> {
156 conn.execute(
158 "CREATE TABLE IF NOT EXISTS chat_logs (
159 id INTEGER PRIMARY KEY AUTOINCREMENT,
160 chat_id TEXT NOT NULL,
161 model TEXT NOT NULL,
162 question TEXT NOT NULL,
163 response TEXT NOT NULL,
164 timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
165 input_tokens INTEGER,
166 output_tokens INTEGER
167 )",
168 [],
169 )?;
170
171 let _ = conn.execute("ALTER TABLE chat_logs ADD COLUMN input_tokens INTEGER", []);
173 let _ = conn.execute("ALTER TABLE chat_logs ADD COLUMN output_tokens INTEGER", []);
174
175 conn.execute(
177 "CREATE TABLE IF NOT EXISTS session_state (
178 key TEXT PRIMARY KEY,
179 value TEXT NOT NULL
180 )",
181 [],
182 )?;
183
184 conn.execute(
186 "CREATE INDEX IF NOT EXISTS idx_chat_logs_chat_id ON chat_logs(chat_id)",
187 [],
188 )?;
189
190 conn.execute(
191 "CREATE INDEX IF NOT EXISTS idx_chat_logs_timestamp ON chat_logs(timestamp DESC)",
192 [],
193 )?;
194
195 conn.execute(
197 "CREATE INDEX IF NOT EXISTS idx_chat_logs_model ON chat_logs(model)",
198 [],
199 )?;
200
201 Ok(())
202 }
203
204 pub fn save_chat_entry_with_tokens(
205 &self,
206 chat_id: &str,
207 model: &str,
208 question: &str,
209 response: &str,
210 input_tokens: Option<i32>,
211 output_tokens: Option<i32>,
212 ) -> Result<()> {
213 let conn = self.pool.get_connection()?;
214
215 conn.execute(
216 "INSERT INTO chat_logs (chat_id, model, question, response, timestamp, input_tokens, output_tokens)
217 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
218 params![chat_id, model, question, response, Utc::now(), input_tokens, output_tokens]
219 )?;
220 Ok(())
221 }
222
223 pub fn get_chat_history(&self, chat_id: &str) -> Result<Vec<ChatEntry>> {
224 let conn = self.pool.get_connection()?;
225
226 let conn_ref = conn
227 .conn
228 .as_ref()
229 .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
230 let mut stmt = conn_ref.prepare(
231 "SELECT id, chat_id, model, question, response, timestamp, input_tokens, output_tokens
232 FROM chat_logs
233 WHERE chat_id = ?1
234 ORDER BY timestamp ASC",
235 )?;
236
237 let rows = stmt.query_map([chat_id], |row| {
238 Ok(ChatEntry {
239 chat_id: row.get(1)?,
240 model: row.get(2)?,
241 question: row.get(3)?,
242 response: row.get(4)?,
243 timestamp: row.get(5)?,
244 input_tokens: row.get(6).ok(),
245 output_tokens: row.get(7).ok(),
246 })
247 })?;
248
249 let mut entries = Vec::new();
250 for row in rows {
251 entries.push(row?);
252 }
253
254 Ok(entries)
255 }
256
257 pub fn get_all_logs(&self) -> Result<Vec<ChatEntry>> {
259 self.get_recent_logs(None)
260 }
261
262 pub fn get_recent_logs(&self, limit: Option<usize>) -> Result<Vec<ChatEntry>> {
263 let conn = self.pool.get_connection()?;
264
265 let sql = if let Some(limit) = limit {
266 format!(
267 "SELECT id, chat_id, model, question, response, timestamp, input_tokens, output_tokens
268 FROM chat_logs
269 ORDER BY timestamp DESC
270 LIMIT {}",
271 limit
272 )
273 } else {
274 "SELECT id, chat_id, model, question, response, timestamp, input_tokens, output_tokens
275 FROM chat_logs
276 ORDER BY timestamp DESC"
277 .to_string()
278 };
279
280 let conn_ref = conn
281 .conn
282 .as_ref()
283 .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
284 let mut stmt = conn_ref.prepare(&sql)?;
285
286 let rows = stmt.query_map([], |row| {
287 Ok(ChatEntry {
288 chat_id: row.get(1)?,
289 model: row.get(2)?,
290 question: row.get(3)?,
291 response: row.get(4)?,
292 timestamp: row.get(5)?,
293 input_tokens: row.get(6).ok(),
294 output_tokens: row.get(7).ok(),
295 })
296 })?;
297
298 let mut entries = Vec::new();
299 for row in rows {
300 entries.push(row?);
301 }
302
303 Ok(entries)
304 }
305
306 pub fn set_current_session_id(&self, session_id: &str) -> Result<()> {
307 let conn = self.pool.get_connection()?;
308
309 conn.execute(
310 "INSERT OR REPLACE INTO session_state (key, value) VALUES ('current_session', ?1)",
311 [session_id],
312 )?;
313 Ok(())
314 }
315
316 pub fn get_current_session_id(&self) -> Result<Option<String>> {
317 let conn = self.pool.get_connection()?;
318
319 let conn_ref = conn
320 .conn
321 .as_ref()
322 .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
323 let mut stmt =
324 conn_ref.prepare("SELECT value FROM session_state WHERE key = 'current_session'")?;
325
326 let mut rows = stmt.query_map([], |row| Ok(row.get::<_, String>(0)?))?;
327
328 if let Some(row) = rows.next() {
329 Ok(Some(row?))
330 } else {
331 Ok(None)
332 }
333 }
334
335 pub fn purge_all_logs(&self) -> Result<()> {
336 let conn = self.pool.get_connection()?;
337
338 conn.execute("BEGIN TRANSACTION", [])?;
340
341 match (|| -> Result<()> {
342 conn.execute("DELETE FROM chat_logs", [])?;
343 conn.execute("DELETE FROM session_state", [])?;
344 Ok(())
345 })() {
346 Ok(_) => {
347 conn.execute("COMMIT", [])?;
348 Ok(())
349 }
350 Err(e) => {
351 conn.execute("ROLLBACK", [])?;
352 Err(e)
353 }
354 }
355 }
356
357 pub fn purge_logs_by_age(&self, days: u32) -> Result<usize> {
359 let conn = self.pool.get_connection()?;
360
361 let cutoff_date = chrono::Utc::now() - chrono::Duration::days(days as i64);
362
363 let deleted_count =
364 conn.execute("DELETE FROM chat_logs WHERE timestamp < ?1", [cutoff_date])?;
365
366 Ok(deleted_count)
367 }
368
369 pub fn purge_logs_keep_recent(&self, keep_count: usize) -> Result<usize> {
371 let conn = self.pool.get_connection()?;
372
373 let total_count: i64 =
375 conn.query_row("SELECT COUNT(*) FROM chat_logs", [], |row| row.get(0))?;
376
377 if total_count <= keep_count as i64 {
378 return Ok(0); }
380
381 let to_delete = total_count - keep_count as i64;
382
383 let deleted_count = conn.execute(
384 "DELETE FROM chat_logs WHERE id IN (
385 SELECT id FROM chat_logs
386 ORDER BY timestamp ASC
387 LIMIT ?1
388 )",
389 [to_delete],
390 )?;
391
392 Ok(deleted_count)
393 }
394
395 pub fn purge_logs_by_size(&self, max_size_mb: u64) -> Result<usize> {
397 let db_path = Self::database_path()?;
398 let current_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
399
400 let max_size_bytes = max_size_mb * 1024 * 1024;
401
402 if current_size <= max_size_bytes {
403 return Ok(0); }
405
406 let conn = self.pool.get_connection()?;
408 let total_count: i64 =
409 conn.query_row("SELECT COUNT(*) FROM chat_logs", [], |row| row.get(0))?;
410
411 let to_delete = (total_count as f64 * 0.25) as i64;
412
413 if to_delete > 0 {
414 let deleted_count = conn.execute(
415 "DELETE FROM chat_logs WHERE id IN (
416 SELECT id FROM chat_logs
417 ORDER BY timestamp ASC
418 LIMIT ?1
419 )",
420 [to_delete],
421 )?;
422
423 conn.execute("VACUUM", [])?;
425
426 Ok(deleted_count)
427 } else {
428 Ok(0)
429 }
430 }
431
432 pub fn smart_purge(
434 &self,
435 max_age_days: Option<u32>,
436 max_entries: Option<usize>,
437 max_size_mb: Option<u64>,
438 ) -> Result<usize> {
439 let mut total_deleted = 0;
440
441 if let Some(days) = max_age_days {
443 total_deleted += self.purge_logs_by_age(days)?;
444 }
445
446 if let Some(max_count) = max_entries {
448 total_deleted += self.purge_logs_keep_recent(max_count)?;
449 }
450
451 if let Some(max_mb) = max_size_mb {
453 total_deleted += self.purge_logs_by_size(max_mb)?;
454 }
455
456 Ok(total_deleted)
457 }
458
459 pub fn clear_session(&self, session_id: &str) -> Result<()> {
460 let conn = self.pool.get_connection()?;
461
462 conn.execute("DELETE FROM chat_logs WHERE chat_id = ?1", [session_id])?;
463 Ok(())
464 }
465
466 pub fn get_stats(&self) -> Result<DatabaseStats> {
467 let conn = self.pool.get_connection()?;
468
469 let total_entries: i64 =
471 conn.query_row("SELECT COUNT(*) FROM chat_logs", [], |row| row.get(0))?;
472
473 let unique_sessions: i64 =
474 conn.query_row("SELECT COUNT(DISTINCT chat_id) FROM chat_logs", [], |row| {
475 row.get(0)
476 })?;
477
478 let db_path = Self::database_path()?;
480 let file_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
481
482 let date_range = if total_entries > 0 {
484 let (earliest, latest): (Option<DateTime<Utc>>, Option<DateTime<Utc>>) = conn
485 .query_row(
486 "SELECT MIN(timestamp), MAX(timestamp) FROM chat_logs",
487 [],
488 |row| Ok((row.get(0).ok(), row.get(1).ok())),
489 )?;
490
491 match (earliest, latest) {
492 (Some(e), Some(l)) => Some((e, l)),
493 _ => None,
494 }
495 } else {
496 None
497 };
498
499 let conn_ref = conn
501 .conn
502 .as_ref()
503 .ok_or_else(|| anyhow::anyhow!("Database connection not available"))?;
504 let mut stmt = conn_ref.prepare(
505 "SELECT model, COUNT(*) as count FROM chat_logs GROUP BY model ORDER BY count DESC",
506 )?;
507
508 let model_stats = stmt
509 .query_map([], |row| {
510 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
511 })?
512 .collect::<Result<Vec<_>, _>>()?;
513
514 Ok(DatabaseStats {
515 total_entries: total_entries as usize,
516 unique_sessions: unique_sessions as usize,
517 file_size_bytes: file_size,
518 date_range,
519 model_usage: model_stats,
520 })
521 }
522
523 fn database_path() -> Result<PathBuf> {
524 let config_dir = crate::config::Config::config_dir()?;
526 std::fs::create_dir_all(&config_dir)?;
527 Ok(config_dir.join("logs.db"))
528 }
529}
530
531#[cfg(test)]
534mod tests {
535 use super::*;
536 use tempfile::tempdir;
537
538 #[test]
539 fn test_connection_pool() {
540 let temp_dir = tempdir().unwrap();
541 let db_path = temp_dir.path().join("test.db");
542
543 let pool = ConnectionPool::new(db_path, 3).unwrap();
544
545 let conn1 = pool.get_connection().unwrap();
547 let conn2 = pool.get_connection().unwrap();
548 let conn3 = pool.get_connection().unwrap();
549
550 assert!(conn1.query_row("SELECT 1", [], |_| Ok(())).is_ok());
552 assert!(conn2.query_row("SELECT 1", [], |_| Ok(())).is_ok());
553 assert!(conn3.query_row("SELECT 1", [], |_| Ok(())).is_ok());
554 }
555
556 #[test]
557 fn test_optimized_database() {
558 let temp_dir = tempdir().unwrap();
559 let db_path = temp_dir.path().join("test.db");
560
561 let pool = ConnectionPool::new(db_path, 3).unwrap();
563 let db = Database { pool };
564
565 let conn = db.pool.get_connection().unwrap();
567 Database::initialize_schema(&conn).unwrap();
568 drop(conn);
569
570 db.save_chat_entry_with_tokens(
572 "test_session",
573 "test_model",
574 "test question",
575 "test response",
576 Some(100),
577 Some(50),
578 )
579 .unwrap();
580
581 let history = db.get_chat_history("test_session").unwrap();
582 assert_eq!(history.len(), 1);
583 assert_eq!(history[0].question, "test question");
584 assert_eq!(history[0].input_tokens, Some(100));
585 assert_eq!(history[0].output_tokens, Some(50));
586 }
587}