rusty_files/storage/
migrations.rs

1use crate::core::error::{Result, SearchError};
2use crate::storage::schema;
3use chrono::Utc;
4use rusqlite::Connection;
5
6pub struct MigrationManager;
7
8impl MigrationManager {
9    pub fn initialize_schema(conn: &Connection) -> Result<()> {
10        for pragma in schema::OPTIMIZE_PRAGMAS {
11            // Use query_row() instead of execute() because PRAGMAs return results
12            let _ = conn.query_row(pragma, [], |_| Ok(()));
13        }
14
15        conn.execute(schema::CREATE_SCHEMA_VERSION_TABLE, [])?;
16
17        let current_version = Self::get_current_version(conn)?;
18
19        if current_version == 0 {
20            Self::apply_initial_schema(conn)?;
21        } else if current_version < schema::CURRENT_SCHEMA_VERSION {
22            Self::migrate(conn, current_version, schema::CURRENT_SCHEMA_VERSION)?;
23        } else if current_version > schema::CURRENT_SCHEMA_VERSION {
24            return Err(SearchError::IndexCorrupted(format!(
25                "Database schema version {} is newer than supported version {}",
26                current_version, schema::CURRENT_SCHEMA_VERSION
27            )));
28        }
29
30        Ok(())
31    }
32
33    fn get_current_version(conn: &Connection) -> Result<i32> {
34        let version: rusqlite::Result<Option<i32>> = conn.query_row(
35            "SELECT MAX(version) FROM schema_version",
36            [],
37            |row| row.get(0),
38        );
39
40        match version {
41            Ok(Some(v)) => Ok(v),
42            Ok(None) => Ok(0), // NULL from MAX means empty table
43            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
44            Err(e) => Err(SearchError::Database(e)),
45        }
46    }
47
48    fn apply_initial_schema(conn: &Connection) -> Result<()> {
49        let tx = conn.unchecked_transaction()?;
50
51        for statement in schema::get_all_table_creation_statements() {
52            tx.execute(statement, [])?;
53        }
54
55        for statement in schema::get_all_index_creation_statements() {
56            tx.execute(statement, [])?;
57        }
58
59        tx.execute(
60            "INSERT INTO schema_version (version, applied_at) VALUES (?1, ?2)",
61            [schema::CURRENT_SCHEMA_VERSION.to_string(), Utc::now().to_rfc3339()],
62        )?;
63
64        tx.commit()?;
65
66        Ok(())
67    }
68
69    fn migrate(conn: &Connection, from: i32, to: i32) -> Result<()> {
70        for version in from..to {
71            Self::apply_migration(conn, version, version + 1)?;
72        }
73        Ok(())
74    }
75
76    fn apply_migration(conn: &Connection, _from: i32, to: i32) -> Result<()> {
77        let tx = conn.unchecked_transaction()?;
78
79        tx.execute(
80            "INSERT INTO schema_version (version, applied_at) VALUES (?1, ?2)",
81            [to.to_string(), Utc::now().to_rfc3339()],
82        )?;
83
84        tx.commit()?;
85
86        Ok(())
87    }
88
89    pub fn verify_schema(conn: &Connection) -> Result<bool> {
90        let current_version = Self::get_current_version(conn)?;
91        Ok(current_version == schema::CURRENT_SCHEMA_VERSION)
92    }
93
94    pub fn rebuild_indexes(conn: &Connection) -> Result<()> {
95        let tx = conn.unchecked_transaction()?;
96
97        for statement in schema::get_all_index_creation_statements() {
98            let drop_statement = statement.replace("CREATE INDEX IF NOT EXISTS", "DROP INDEX IF EXISTS");
99            let drop_statement = drop_statement.split(" ON ").next().unwrap_or("");
100
101            if !drop_statement.is_empty() {
102                let _ = tx.execute(drop_statement, []);
103            }
104
105            tx.execute(statement, [])?;
106        }
107
108        tx.commit()?;
109
110        Ok(())
111    }
112}