offline_intelligence/memory_db/
summary_store.rs1use crate::memory_db::schema::*;
5use rusqlite::{params, Result, Row};
6use chrono::{DateTime, Utc};
7use tracing::{debug, info};
8use std::sync::Arc;
9use r2d2::Pool;
10use r2d2_sqlite::SqliteConnectionManager;
11
12pub struct SummaryStore {
14 pool: Arc<Pool<SqliteConnectionManager>>,
15}
16
17impl SummaryStore {
18 pub fn new(pool: Arc<Pool<SqliteConnectionManager>>) -> Self {
20 Self { pool }
21 }
22
23 fn get_conn(&self) -> anyhow::Result<r2d2::PooledConnection<SqliteConnectionManager>> {
25 self.pool.get()
26 .map_err(|e| anyhow::anyhow!("Failed to get connection from pool: {}", e))
27 }
28
29 pub fn store_summary(&self, summary: &Summary) -> anyhow::Result<()> {
31 let conn = self.get_conn()?;
32
33 debug!(
34 "Storing summary for session {} (messages {} to {})",
35 summary.session_id,
36 summary.message_range_start,
37 summary.message_range_end
38 );
39
40 conn.execute(
41 "INSERT INTO summaries
42 (session_id, message_range_start, message_range_end, summary_text,
43 compression_ratio, key_topics, generated_at)
44 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
45 params![
46 &summary.session_id,
47 summary.message_range_start,
48 summary.message_range_end,
49 &summary.summary_text,
50 summary.compression_ratio,
51 serde_json::to_string(&summary.key_topics)?,
52 summary.generated_at.to_rfc3339(),
53 ],
54 )?;
55
56 Ok(())
57 }
58
59 pub fn get_session_summaries(&self, session_id: &str) -> anyhow::Result<Vec<Summary>> {
61 let conn = self.get_conn()?;
62 let mut stmt = conn.prepare(
63 "SELECT id, session_id, message_range_start, message_range_end, summary_text,
64 compression_ratio, key_topics, generated_at
65 FROM summaries WHERE session_id = ?1 ORDER BY generated_at DESC"
66 )?;
67
68 let mut rows = stmt.query([session_id])?;
69 let mut summaries = Vec::new();
70
71 while let Some(row) = rows.next()? {
72 summaries.push(self.row_to_summary(row)?);
73 }
74
75 Ok(summaries)
76 }
77
78 pub fn get_summary_for_range(&self, session_id: &str, start: i32, end: i32) -> anyhow::Result<Option<Summary>> {
80 let conn = self.get_conn()?;
81 let mut stmt = conn.prepare(
82 "SELECT id, session_id, message_range_start, message_range_end, summary_text,
83 compression_ratio, key_topics, generated_at
84 FROM summaries WHERE session_id = ?1 AND message_range_start = ?2 AND message_range_end = ?3"
85 )?;
86
87 let mut rows = stmt.query(params![session_id, start, end])?;
88
89 if let Some(row) = rows.next()? {
90 Ok(Some(self.row_to_summary(row)?))
91 } else {
92 Ok(None)
93 }
94 }
95
96 pub fn update_summary(&self, summary: &Summary) -> anyhow::Result<()> {
98 let conn = self.get_conn()?;
99
100 debug!("Updating summary for session {}", summary.session_id);
101
102 conn.execute(
103 "UPDATE summaries SET
104 summary_text = ?2,
105 compression_ratio = ?3,
106 key_topics = ?4,
107 generated_at = ?5
108 WHERE id = ?1",
109 params![
110 summary.id,
111 &summary.summary_text,
112 summary.compression_ratio,
113 serde_json::to_string(&summary.key_topics)?,
114 summary.generated_at.to_rfc3339(),
115 ],
116 )?;
117
118 Ok(())
119 }
120
121 pub fn delete_session_summaries(&self, session_id: &str) -> anyhow::Result<usize> {
123 let conn = self.get_conn()?;
124 let deleted = conn.execute(
125 "DELETE FROM summaries WHERE session_id = ?1",
126 [session_id],
127 )?;
128
129 info!("Deleted {} summaries for session {}", deleted, session_id);
130 Ok(deleted)
131 }
132
133 pub fn cleanup_old_summaries(&self, session_id: &str, keep_latest: usize) -> anyhow::Result<usize> {
135 let conn = self.get_conn()?;
136
137 let mut stmt = conn.prepare(
139 "SELECT id FROM summaries
140 WHERE session_id = ?1
141 ORDER BY generated_at DESC
142 LIMIT -1 OFFSET ?2"
143 )?;
144
145 let ids_to_delete: Vec<i64> = stmt
147 .query_map(params![session_id, keep_latest as i64], |row| row.get(0))?
148 .collect::<Result<Vec<_>>>()?;
149
150 if ids_to_delete.is_empty() {
151 return Ok(0);
152 }
153
154 let placeholders: Vec<String> = ids_to_delete.iter().map(|_| "?".to_string()).collect();
156 let query = format!(
157 "DELETE FROM summaries WHERE id IN ({})",
158 placeholders.join(",")
159 );
160
161 let mut stmt = conn.prepare(&query)?;
162 let deleted = stmt.execute(rusqlite::params_from_iter(ids_to_delete))?;
163
164 debug!("Cleaned up {} old summaries for session {}", deleted, session_id);
165 Ok(deleted)
166 }
167
168 fn row_to_summary(&self, row: &Row) -> anyhow::Result<Summary> {
170 let key_topics_json: String = row.get(6)?;
171 let key_topics: Vec<String> = serde_json::from_str(&key_topics_json)
172 .map_err(|e| anyhow::anyhow!("Failed to parse key_topics: {}", e))?;
173
174 let generated_at_str: String = row.get(7)?;
175 let generated_at = DateTime::parse_from_rfc3339(&generated_at_str)?
176 .with_timezone(&Utc);
177
178 Ok(Summary {
179 id: row.get(0)?,
180 session_id: row.get(1)?,
181 message_range_start: row.get(2)?,
182 message_range_end: row.get(3)?,
183 summary_text: row.get(4)?,
184 compression_ratio: row.get(5)?,
185 key_topics,
186 generated_at,
187 })
188 }
189}