offline_intelligence/memory_db/
migration.rs1use rusqlite::{Connection, Result, OptionalExtension};
4use tracing::{info, warn, error};
5use std::path::Path;
6
7use crate::memory_db::schema;
9
10pub struct MigrationManager<'a> {
12 conn: &'a mut Connection,
13}
14
15impl<'a> MigrationManager<'a> {
16 pub fn new(conn: &'a mut Connection) -> Self {
18 Self { conn }
19 }
20
21 pub fn initialize_database(&mut self) -> Result<()> {
23 info!("Initializing memory database schema...");
24
25 self.conn.execute(
27 "CREATE TABLE IF NOT EXISTS schema_version (
28 version INTEGER PRIMARY KEY,
29 applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
30 )",
31 [],
32 )?;
33
34 let current_version: i32 = self.conn
36 .query_row(
37 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
38 [],
39 |row| row.get(0),
40 )
41 .unwrap_or(0);
42
43 info!("Current database schema version: {}", current_version);
44
45 self.apply_migrations(current_version)?;
47
48 Ok(())
49 }
50
51 fn apply_migrations(&mut self, current_version: i32) -> Result<()> {
53 let migrations = get_migrations();
54
55 for (version, migration_sql) in migrations.iter() {
56 if *version > current_version {
57 info!("Applying migration {}...", version);
58
59 let tx = self.conn.transaction()?;
61
62 if let Err(e) = tx.execute_batch(migration_sql) {
64 error!("Failed to apply migration {}: {}", version, e);
65 return Err(e);
66 }
67
68 tx.execute(
70 "INSERT INTO schema_version (version) VALUES (?)",
71 [version],
72 )?;
73
74 tx.commit()?;
76
77 info!("Migration {} applied successfully", version);
78 }
79 }
80
81 Ok(())
82 }
83
84 pub fn create_connection(db_path: &Path) -> Result<Connection> {
86 let mut conn = Connection::open(db_path)?;
88
89 conn.execute_batch("
91 PRAGMA foreign_keys = ON;
92 PRAGMA journal_mode = WAL;
93 PRAGMA synchronous = NORMAL;
94 PRAGMA cache_size = -2000; -- 2MB cache
95 ")?;
96
97 let mut migrator = MigrationManager::new(&mut conn);
99 migrator.initialize_database()?;
100
101 Ok(conn)
102 }
103
104 pub fn cleanup_old_data(&mut self, older_than_days: i32) -> Result<usize> {
106 let cutoff = chrono::Utc::now() - chrono::Duration::days(older_than_days as i64);
107 let cutoff_str = cutoff.to_rfc3339();
108
109 let deleted = self.conn.execute(
111 "DELETE FROM sessions WHERE last_accessed < ?1",
112 [&cutoff_str],
113 )?;
114
115 info!("Cleaned up {} old sessions", deleted);
116
117 if deleted > 0 {
119 self.conn.execute_batch("VACUUM")?;
120 info!("Database vacuum completed");
121 }
122
123 Ok(deleted)
124 }
125
126 pub fn get_current_version(&self) -> Result<i32> {
128 self.conn
129 .query_row(
130 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
131 [],
132 |row| row.get(0),
133 )
134 .or_else(|_| Ok(0))
135 }
136
137 pub fn has_migration_applied(&self, version: i32) -> Result<bool> {
139 self.conn
140 .query_row(
141 "SELECT 1 FROM schema_version WHERE version = ?",
142 [version],
143 |_| Ok(1),
144 )
145 .optional()
146 .map(|result| result.is_some())
147 }
148}
149
150fn get_migrations() -> Vec<(i32, &'static str)> {
152 vec![
153 (1, include_str!("migrations/001_initial.sql")),
154 (2, include_str!("migrations/002_add_embeddings.sql")),
155 (3, include_str!("migrations/003_add_kv_snapshots.sql")),
156 ]
157}
158
159pub fn get_database_stats(conn: &Connection) -> Result<schema::DatabaseStats> {
162 fn get_table_count(conn: &Connection, table_name: &str) -> Result<i64> {
164 conn.query_row(&format!("SELECT COUNT(*) FROM {}", table_name), [], |row| row.get(0))
165 .or_else(|e| {
166 warn!("Failed to get count from table {}: {}", table_name, e);
167 Ok(0) })
169 }
170
171 let total_sessions = get_table_count(conn, "sessions")?;
172 let total_messages = get_table_count(conn, "messages")?;
173 let total_summaries = get_table_count(conn, "summaries")?;
174 let total_details = get_table_count(conn, "details")?;
175 let total_embeddings = get_table_count(conn, "embeddings")?;
176
177 let database_size_bytes: i64 = conn
179 .query_row(
180 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
181 [],
182 |row| row.get(0),
183 )
184 .unwrap_or(0);
185
186 Ok(schema::DatabaseStats {
187 total_sessions,
188 total_messages,
189 total_summaries,
190 total_details,
191 total_embeddings,
192 database_size_bytes,
193 })
194}
195
196pub fn get_database_stats_from_path(db_path: &Path) -> Result<schema::DatabaseStats> {
199 let conn = Connection::open(db_path)?;
200 get_database_stats(&conn)
201}
202
203pub fn run_maintenance(conn: &mut Connection) -> Result<()> {
205 info!("Running database maintenance...");
206
207 conn.execute_batch("ANALYZE")?;
209
210 conn.execute_batch("PRAGMA incremental_vacuum(100)")?;
212
213 conn.execute_batch("PRAGMA integrity_check")?;
215
216 info!("Database maintenance completed");
217 Ok(())
218}