1use rusqlite::{Connection, OptionalExtension};
2
3use crate::errors::SqliteGraphError;
4
5pub const BASE_SCHEMA_VERSION: i64 = 1;
6
7struct MigrationStep {
8 target_version: i64,
9 statements: &'static [&'static str],
10}
11
12const MIGRATION_STEPS: &[MigrationStep] = &[
13 MigrationStep {
14 target_version: 2,
15 statements: &[
16 "CREATE TABLE IF NOT EXISTS graph_meta_history(version INTEGER NOT NULL, applied_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP)",
17 "INSERT INTO graph_meta_history(version) VALUES(2)",
18 ],
19 },
20 MigrationStep {
21 target_version: 3,
22 statements: &[
23 "CREATE TABLE IF NOT EXISTS hnsw_indexes (
25 id INTEGER PRIMARY KEY AUTOINCREMENT,
26 name TEXT NOT NULL UNIQUE,
27 dimension INTEGER NOT NULL,
28 m INTEGER NOT NULL,
29 ef_construction INTEGER NOT NULL,
30 distance_metric TEXT NOT NULL,
31 vector_count INTEGER NOT NULL DEFAULT 0,
32 created_at INTEGER NOT NULL,
33 updated_at INTEGER NOT NULL
34 )",
35 "CREATE TABLE IF NOT EXISTS hnsw_vectors (
37 id INTEGER PRIMARY KEY AUTOINCREMENT,
38 index_id INTEGER NOT NULL,
39 vector_data BLOB NOT NULL,
40 metadata TEXT,
41 created_at INTEGER NOT NULL,
42 updated_at INTEGER NOT NULL,
43 FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE
44 )",
45 "CREATE TABLE IF NOT EXISTS hnsw_layers (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 index_id INTEGER NOT NULL,
49 layer_level INTEGER NOT NULL,
50 node_id INTEGER NOT NULL,
51 connections BLOB NOT NULL,
52 FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE,
53 UNIQUE(index_id, layer_level, node_id)
54 )",
55 "CREATE TABLE IF NOT EXISTS hnsw_entry_points (
57 index_id INTEGER NOT NULL,
58 node_id INTEGER NOT NULL,
59 PRIMARY KEY (index_id, node_id),
60 FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE
61 )",
62 "CREATE INDEX IF NOT EXISTS idx_hnsw_vectors_index ON hnsw_vectors(index_id)",
64 "CREATE INDEX IF NOT EXISTS idx_hnsw_layers_index ON hnsw_layers(index_id, layer_level)",
65 "CREATE INDEX IF NOT EXISTS idx_hnsw_entry_points_index ON hnsw_entry_points(index_id)",
66 "INSERT INTO graph_meta_history(version) VALUES(3)",
67 ],
68 },
69];
70
71pub const SCHEMA_VERSION: i64 = BASE_SCHEMA_VERSION + MIGRATION_STEPS.len() as i64;
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct MigrationReport {
75 pub from_version: i64,
76 pub to_version: i64,
77 pub statements: Vec<&'static str>,
78 pub dry_run: bool,
79}
80
81pub fn ensure_schema(conn: &Connection) -> Result<(), SqliteGraphError> {
82 ensure_base_schema(conn)?;
83 ensure_meta(conn)?;
84 run_pending_migrations(conn, false)?;
85 Ok(())
86}
87
88pub fn ensure_schema_without_migrations(conn: &Connection) -> Result<(), SqliteGraphError> {
89 ensure_base_schema(conn)?;
90 ensure_meta(conn)?;
91 Ok(())
92}
93
94fn ensure_base_schema(conn: &Connection) -> Result<(), SqliteGraphError> {
95 conn.execute_batch(
96 r#"
97 PRAGMA foreign_keys = ON;
98 CREATE TABLE IF NOT EXISTS graph_entities (
99 id INTEGER PRIMARY KEY AUTOINCREMENT,
100 kind TEXT NOT NULL,
101 name TEXT NOT NULL,
102 file_path TEXT,
103 data TEXT NOT NULL
104 );
105 CREATE TABLE IF NOT EXISTS graph_edges (
106 id INTEGER PRIMARY KEY AUTOINCREMENT,
107 from_id INTEGER NOT NULL,
108 to_id INTEGER NOT NULL,
109 edge_type TEXT NOT NULL,
110 data TEXT NOT NULL
111 );
112 CREATE TABLE IF NOT EXISTS graph_labels (
113 entity_id INTEGER NOT NULL,
114 label TEXT NOT NULL
115 );
116 CREATE TABLE IF NOT EXISTS graph_properties (
117 entity_id INTEGER NOT NULL,
118 key TEXT NOT NULL,
119 value TEXT NOT NULL
120 );
121 CREATE INDEX IF NOT EXISTS idx_edges_from ON graph_edges(from_id);
122 CREATE INDEX IF NOT EXISTS idx_edges_to ON graph_edges(to_id);
123 CREATE INDEX IF NOT EXISTS idx_edges_type ON graph_edges(edge_type);
124 CREATE INDEX IF NOT EXISTS idx_labels_label ON graph_labels(label);
125 CREATE INDEX IF NOT EXISTS idx_labels_label_entity_id ON graph_labels(label, entity_id);
126 CREATE INDEX IF NOT EXISTS idx_props_key_value ON graph_properties(key, value);
127 CREATE INDEX IF NOT EXISTS idx_props_key_value_entity_id ON graph_properties(key, value, entity_id);
128 CREATE INDEX IF NOT EXISTS idx_entities_kind_id ON graph_entities(kind, id);
129 CREATE TABLE IF NOT EXISTS graph_meta (
130 id INTEGER PRIMARY KEY CHECK (id = 1),
131 schema_version INTEGER NOT NULL
132 );
133 "#,
134 )
135 .map_err(|e| SqliteGraphError::schema(e.to_string()))
136}
137
138pub fn read_schema_version(conn: &Connection) -> Result<i64, SqliteGraphError> {
139 conn.query_row(
140 "SELECT schema_version FROM graph_meta WHERE id=1",
141 [],
142 |row| row.get(0),
143 )
144 .map_err(|e| SqliteGraphError::schema(e.to_string()))
145}
146
147pub fn run_pending_migrations(
148 conn: &Connection,
149 dry_run: bool,
150) -> Result<MigrationReport, SqliteGraphError> {
151 let current = read_schema_version(conn)?;
152 let mut statements: Vec<&'static str> = Vec::new();
153 let mut target = current;
154 for step in MIGRATION_STEPS {
155 if step.target_version > current {
156 target = step.target_version;
157 statements.extend_from_slice(step.statements);
158 }
159 }
160 if statements.is_empty() {
161 return Ok(MigrationReport {
162 from_version: current,
163 to_version: current,
164 statements,
165 dry_run,
166 });
167 }
168 if dry_run {
169 return Ok(MigrationReport {
170 from_version: current,
171 to_version: target,
172 statements,
173 dry_run,
174 });
175 }
176 conn.execute("BEGIN IMMEDIATE", [])
177 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
178 let result: Result<(), SqliteGraphError> = (|| {
179 for sql in statements.iter().copied() {
180 conn.execute(sql, [])
181 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
182 }
183 conn.execute(
184 "UPDATE graph_meta SET schema_version=?1 WHERE id=1",
185 [target],
186 )
187 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
188 Ok(())
189 })();
190 match result {
191 Ok(()) => {
192 conn.execute("COMMIT", [])
193 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
194 }
195 Err(err) => {
196 let _ = conn.execute("ROLLBACK", []);
197 return Err(err);
198 }
199 }
200 Ok(MigrationReport {
201 from_version: current,
202 to_version: target,
203 statements,
204 dry_run,
205 })
206}
207
208fn ensure_meta(conn: &Connection) -> Result<(), SqliteGraphError> {
209 let version: Option<i64> = conn
210 .query_row(
211 "SELECT schema_version FROM graph_meta WHERE id=1",
212 [],
213 |row| row.get(0),
214 )
215 .optional()
216 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
217 match version {
218 Some(existing) => {
219 if existing > SCHEMA_VERSION {
220 return Err(SqliteGraphError::schema(format!(
221 "database schema version {existing} is newer than supported {SCHEMA_VERSION}"
222 )));
223 }
224 if existing < BASE_SCHEMA_VERSION {
225 conn.execute(
226 "UPDATE graph_meta SET schema_version=?1 WHERE id=1",
227 [BASE_SCHEMA_VERSION],
228 )
229 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
230 }
231 }
232 None => {
233 conn.execute(
234 "INSERT INTO graph_meta(id, schema_version) VALUES(1, ?1)",
235 [BASE_SCHEMA_VERSION],
236 )
237 .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
238 }
239 }
240 Ok(())
241}