Skip to main content

offline_intelligence/memory_db/
migration.rs

1//! Database migration system
2
3use rusqlite::{Connection, Result, OptionalExtension};
4use tracing::{info, warn, error};
5use std::path::Path;
6
7// Import the schema module from the same memory_db module
8use crate::memory_db::schema;
9
10/// Manages database schema migrations
11pub struct MigrationManager<'a> {
12    conn: &'a mut Connection,
13}
14
15impl<'a> MigrationManager<'a> {
16    /// Create a new migration manager
17    pub fn new(conn: &'a mut Connection) -> Self {
18        Self { conn }
19    }
20    
21    /// Initialize database with current schema
22    pub fn initialize_database(&mut self) -> Result<()> {
23        info!("Initializing memory database schema...");
24        
25        // Create schema version table if it doesn't exist
26        self.conn.execute(
27            "CREATE TABLE IF NOT EXISTS schema_version (
28                version INTEGER PRIMARY KEY,
29                applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
30            )",
31            [],
32        )?;
33        
34        // Get current version
35        let current_version: i32 = self.conn
36            .query_row(
37                "SELECT COALESCE(MAX(version), 0) FROM schema_version",
38                [],
39                |row| row.get(0),
40            )
41            .unwrap_or(0);
42        
43        info!("Current database schema version: {}", current_version);
44        
45        // Apply migrations based on current version
46        self.apply_migrations(current_version)?;
47        
48        Ok(())
49    }
50    
51    /// Apply all pending migrations
52    fn apply_migrations(&mut self, current_version: i32) -> Result<()> {
53        let migrations = get_migrations();
54        
55        for (version, migration_sql) in migrations.iter() {
56            if *version > current_version {
57                info!("Applying migration {}...", version);
58                
59                // Begin transaction - requires mutable self
60                let tx = self.conn.transaction()?;
61                
62                // Apply migration
63                if let Err(e) = tx.execute_batch(migration_sql) {
64                    error!("Failed to apply migration {}: {}", version, e);
65                    return Err(e);
66                }
67                
68                // Record migration
69                tx.execute(
70                    "INSERT INTO schema_version (version) VALUES (?)",
71                    [version],
72                )?;
73                
74                // Commit transaction
75                tx.commit()?;
76                
77                info!("Migration {} applied successfully", version);
78            }
79        }
80        
81        Ok(())
82    }
83    
84    /// Create database connection with migrations applied
85    pub fn create_connection(db_path: &Path) -> Result<Connection> {
86        // Open or create database
87        let mut conn = Connection::open(db_path)?;
88        
89        // Enable foreign keys and WAL mode for better performance
90        conn.execute_batch("
91            PRAGMA foreign_keys = ON;
92            PRAGMA journal_mode = WAL;
93            PRAGMA synchronous = NORMAL;
94            PRAGMA cache_size = -2000; -- 2MB cache
95        ")?;
96        
97        // Apply migrations - need mutable access
98        let mut migrator = MigrationManager::new(&mut conn);
99        migrator.initialize_database()?;
100        
101        Ok(conn)
102    }
103    
104    /// Clean up old data - needs mutable access
105    pub fn cleanup_old_data(&mut self, older_than_days: i32) -> Result<usize> {
106        let cutoff = chrono::Utc::now() - chrono::Duration::days(older_than_days as i64);
107        let cutoff_str = cutoff.to_rfc3339();
108        
109        // Delete old sessions and their related data (cascading delete)
110        let deleted = self.conn.execute(
111            "DELETE FROM sessions WHERE last_accessed < ?1",
112            [&cutoff_str],
113        )?;
114        
115        info!("Cleaned up {} old sessions", deleted);
116        
117        // Vacuum to reclaim space
118        if deleted > 0 {
119            self.conn.execute_batch("VACUUM")?;
120            info!("Database vacuum completed");
121        }
122        
123        Ok(deleted)
124    }
125    
126    /// Get current schema version
127    pub fn get_current_version(&self) -> Result<i32> {
128        self.conn
129            .query_row(
130                "SELECT COALESCE(MAX(version), 0) FROM schema_version",
131                [],
132                |row| row.get(0),
133            )
134            .or_else(|_| Ok(0))
135    }
136    
137    /// Check if a specific migration has been applied
138    pub fn has_migration_applied(&self, version: i32) -> Result<bool> {
139        self.conn
140            .query_row(
141                "SELECT 1 FROM schema_version WHERE version = ?",
142                [version],
143                |_| Ok(1),
144            )
145            .optional()
146            .map(|result| result.is_some())
147    }
148}
149
150/// Get all migration SQL scripts
151fn get_migrations() -> Vec<(i32, &'static str)> {
152    vec![
153        (1, include_str!("migrations/001_initial.sql")),
154        (2, include_str!("migrations/002_add_embeddings.sql")),
155        (3, include_str!("migrations/003_add_kv_snapshots.sql")),
156    ]
157}
158
159/// Get database statistics from a connection
160/// This is safe to call even with a locked connection since it only performs read queries
161pub fn get_database_stats(conn: &Connection) -> Result<schema::DatabaseStats> {
162    // Helper function to safely get count from a table
163    fn get_table_count(conn: &Connection, table_name: &str) -> Result<i64> {
164        conn.query_row(&format!("SELECT COUNT(*) FROM {}", table_name), [], |row| row.get(0))
165            .or_else(|e| {
166                warn!("Failed to get count from table {}: {}", table_name, e);
167                Ok(0) // Return 0 if table doesn't exist or query fails
168            })
169    }
170    
171    let total_sessions = get_table_count(conn, "sessions")?;
172    let total_messages = get_table_count(conn, "messages")?;
173    let total_summaries = get_table_count(conn, "summaries")?;
174    let total_details = get_table_count(conn, "details")?;
175    let total_embeddings = get_table_count(conn, "embeddings")?;
176    
177    // Get database size - this query is safe and doesn't modify anything
178    let database_size_bytes: i64 = conn
179        .query_row(
180            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
181            [],
182            |row| row.get(0),
183        )
184        .unwrap_or(0);
185    
186    Ok(schema::DatabaseStats {
187        total_sessions,
188        total_messages,
189        total_summaries,
190        total_details,
191        total_embeddings,
192        database_size_bytes,
193    })
194}
195
196/// Get database statistics with connection creation
197/// Useful when you don't have an existing connection
198pub fn get_database_stats_from_path(db_path: &Path) -> Result<schema::DatabaseStats> {
199    let conn = Connection::open(db_path)?;
200    get_database_stats(&conn)
201}
202
203/// Run database maintenance tasks
204pub fn run_maintenance(conn: &mut Connection) -> Result<()> {
205    info!("Running database maintenance...");
206    
207    // Analyze for better query optimization
208    conn.execute_batch("ANALYZE")?;
209    
210    // Incremental vacuum if needed
211    conn.execute_batch("PRAGMA incremental_vacuum(100)")?;
212    
213    // Check integrity
214    conn.execute_batch("PRAGMA integrity_check")?;
215    
216    info!("Database maintenance completed");
217    Ok(())
218}