rusty_files/storage/
migrations.rs1use crate::core::error::{Result, SearchError};
2use crate::storage::schema;
3use chrono::Utc;
4use rusqlite::Connection;
5
6pub struct MigrationManager;
7
8impl MigrationManager {
9 pub fn initialize_schema(conn: &Connection) -> Result<()> {
10 for pragma in schema::OPTIMIZE_PRAGMAS {
11 let _ = conn.query_row(pragma, [], |_| Ok(()));
13 }
14
15 conn.execute(schema::CREATE_SCHEMA_VERSION_TABLE, [])?;
16
17 let current_version = Self::get_current_version(conn)?;
18
19 if current_version == 0 {
20 Self::apply_initial_schema(conn)?;
21 } else if current_version < schema::CURRENT_SCHEMA_VERSION {
22 Self::migrate(conn, current_version, schema::CURRENT_SCHEMA_VERSION)?;
23 } else if current_version > schema::CURRENT_SCHEMA_VERSION {
24 return Err(SearchError::IndexCorrupted(format!(
25 "Database schema version {} is newer than supported version {}",
26 current_version, schema::CURRENT_SCHEMA_VERSION
27 )));
28 }
29
30 Ok(())
31 }
32
33 fn get_current_version(conn: &Connection) -> Result<i32> {
34 let version: rusqlite::Result<Option<i32>> = conn.query_row(
35 "SELECT MAX(version) FROM schema_version",
36 [],
37 |row| row.get(0),
38 );
39
40 match version {
41 Ok(Some(v)) => Ok(v),
42 Ok(None) => Ok(0), Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
44 Err(e) => Err(SearchError::Database(e)),
45 }
46 }
47
48 fn apply_initial_schema(conn: &Connection) -> Result<()> {
49 let tx = conn.unchecked_transaction()?;
50
51 for statement in schema::get_all_table_creation_statements() {
52 tx.execute(statement, [])?;
53 }
54
55 for statement in schema::get_all_index_creation_statements() {
56 tx.execute(statement, [])?;
57 }
58
59 tx.execute(
60 "INSERT INTO schema_version (version, applied_at) VALUES (?1, ?2)",
61 [schema::CURRENT_SCHEMA_VERSION.to_string(), Utc::now().to_rfc3339()],
62 )?;
63
64 tx.commit()?;
65
66 Ok(())
67 }
68
69 fn migrate(conn: &Connection, from: i32, to: i32) -> Result<()> {
70 for version in from..to {
71 Self::apply_migration(conn, version, version + 1)?;
72 }
73 Ok(())
74 }
75
76 fn apply_migration(conn: &Connection, _from: i32, to: i32) -> Result<()> {
77 let tx = conn.unchecked_transaction()?;
78
79 tx.execute(
80 "INSERT INTO schema_version (version, applied_at) VALUES (?1, ?2)",
81 [to.to_string(), Utc::now().to_rfc3339()],
82 )?;
83
84 tx.commit()?;
85
86 Ok(())
87 }
88
89 pub fn verify_schema(conn: &Connection) -> Result<bool> {
90 let current_version = Self::get_current_version(conn)?;
91 Ok(current_version == schema::CURRENT_SCHEMA_VERSION)
92 }
93
94 pub fn rebuild_indexes(conn: &Connection) -> Result<()> {
95 let tx = conn.unchecked_transaction()?;
96
97 for statement in schema::get_all_index_creation_statements() {
98 let drop_statement = statement.replace("CREATE INDEX IF NOT EXISTS", "DROP INDEX IF EXISTS");
99 let drop_statement = drop_statement.split(" ON ").next().unwrap_or("");
100
101 if !drop_statement.is_empty() {
102 let _ = tx.execute(drop_statement, []);
103 }
104
105 tx.execute(statement, [])?;
106 }
107
108 tx.commit()?;
109
110 Ok(())
111 }
112}