1use rusqlite::Connection;
21use std::path::Path;
22
23use super::schema;
24
25pub fn open_connection(path: &Path) -> rusqlite::Result<Connection> {
43 let conn = Connection::open(path)?;
44
45 conn.execute_batch(
46 "PRAGMA journal_mode = WAL;
47 PRAGMA synchronous = NORMAL;
48 PRAGMA cache_size = -64000;
49 PRAGMA temp_store = MEMORY;
50 PRAGMA busy_timeout = 5000;",
51 )?;
52
53 Ok(conn)
54}
55
56pub fn run_migrations(conn: &Connection, mcpr_version: &str) -> rusqlite::Result<()> {
64 let version = get_schema_version(conn);
65
66 if version < 1 {
67 conn.execute_batch(schema::V1_SCHEMA)?;
68 conn.execute_batch(schema::V1_META_SEED)?;
69 }
70
71 if version < 2 {
72 conn.execute_batch(schema::V2_SCHEMA)?;
73 }
74
75 if version < 3 {
76 conn.execute_batch(schema::V3_SCHEMA)?;
77 }
78
79 if version < 4 {
80 conn.execute_batch(schema::V4_SCHEMA)?;
81 }
82
83 if version < 5 {
84 conn.execute_batch(schema::V5_SCHEMA)?;
85 }
86
87 conn.execute(schema::UPSERT_MCPR_VERSION, rusqlite::params![mcpr_version])?;
89
90 Ok(())
91}
92
93fn get_schema_version(conn: &Connection) -> u32 {
98 let table_exists: bool = conn
100 .query_row(
101 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = 'meta'",
102 [],
103 |row| row.get(0),
104 )
105 .unwrap_or(false);
106
107 if !table_exists {
108 return 0;
109 }
110
111 conn.query_row(
112 "SELECT value FROM meta WHERE key = 'schema_version'",
113 [],
114 |row| {
115 let v: String = row.get(0)?;
116 Ok(v.parse::<u32>().unwrap_or(0))
117 },
118 )
119 .unwrap_or(0)
120}
121
122#[cfg(test)]
123#[allow(non_snake_case)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn run_migrations__fresh_db() {
129 let dir = tempfile::tempdir().unwrap();
130 let db_path = dir.path().join("test.db");
131
132 let conn = open_connection(&db_path).unwrap();
133 run_migrations(&conn, "0.3.0-test").unwrap();
134
135 let count: i64 = conn
136 .query_row(
137 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name IN ('requests', 'sessions', 'meta')",
138 [],
139 |row| row.get(0),
140 )
141 .unwrap();
142 assert_eq!(count, 3);
143
144 let version: String = conn
145 .query_row(
146 "SELECT value FROM meta WHERE key = 'schema_version'",
147 [],
148 |row| row.get(0),
149 )
150 .unwrap();
151 assert_eq!(version, "5");
152
153 let mcpr_ver: String = conn
154 .query_row(
155 "SELECT value FROM meta WHERE key = 'mcpr_version'",
156 [],
157 |row| row.get(0),
158 )
159 .unwrap();
160 assert_eq!(mcpr_ver, "0.3.0-test");
161 }
162
163 #[test]
164 fn run_migrations__idempotent() {
165 let dir = tempfile::tempdir().unwrap();
166 let db_path = dir.path().join("test.db");
167
168 let conn = open_connection(&db_path).unwrap();
169 run_migrations(&conn, "0.3.0").unwrap();
170 run_migrations(&conn, "0.3.1").unwrap();
171
172 let mcpr_ver: String = conn
173 .query_row(
174 "SELECT value FROM meta WHERE key = 'mcpr_version'",
175 [],
176 |row| row.get(0),
177 )
178 .unwrap();
179 assert_eq!(mcpr_ver, "0.3.1");
180 }
181
182 #[test]
183 fn run_migrations__v3_adds_proxy() {
184 let dir = tempfile::tempdir().unwrap();
185 let db_path = dir.path().join("test.db");
186
187 let conn = open_connection(&db_path).unwrap();
188 run_migrations(&conn, "test").unwrap();
189
190 conn.execute(
191 "INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash)
192 VALUES ('search', 'http://localhost:9000', 'tools/list', '{}', 1000, 'abc')",
193 [],
194 )
195 .unwrap();
196
197 let proxy: String = conn
198 .query_row(
199 "SELECT proxy FROM server_schema WHERE upstream_url = 'http://localhost:9000'",
200 [],
201 |row| row.get(0),
202 )
203 .unwrap();
204 assert_eq!(proxy, "search");
205
206 conn.execute(
207 "INSERT INTO schema_changes (proxy, upstream_url, method, change_type, detected_at)
208 VALUES ('search', 'http://localhost:9000', 'tools/list', 'initial', 1000)",
209 [],
210 )
211 .unwrap();
212
213 let proxy: String = conn
214 .query_row(
215 "SELECT proxy FROM schema_changes WHERE upstream_url = 'http://localhost:9000'",
216 [],
217 |row| row.get(0),
218 )
219 .unwrap();
220 assert_eq!(proxy, "search");
221
222 conn.execute(
223 "INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash)
224 VALUES ('email', 'http://localhost:9000', 'tools/list', '{}', 2000, 'def')",
225 [],
226 )
227 .unwrap();
228
229 let count: i64 = conn
230 .query_row("SELECT COUNT(*) FROM server_schema", [], |row| row.get(0))
231 .unwrap();
232 assert_eq!(count, 2);
233 }
234
235 #[test]
236 fn run_migrations__v4_renames_latency() {
237 let dir = tempfile::tempdir().unwrap();
238 let db_path = dir.path().join("test.db");
239
240 let conn = open_connection(&db_path).unwrap();
241 run_migrations(&conn, "test").unwrap();
242
243 conn.execute(
244 "INSERT INTO requests (request_id, ts, proxy, method, latency_us, status)
245 VALUES ('r1', 1000, 'api', 'tools/call', 142000, 'ok')",
246 [],
247 )
248 .unwrap();
249
250 let latency: i64 = conn
251 .query_row(
252 "SELECT latency_us FROM requests WHERE request_id = 'r1'",
253 [],
254 |row| row.get(0),
255 )
256 .unwrap();
257 assert_eq!(latency, 142_000);
258 }
259
260 #[test]
261 fn run_migrations__v4_converts_ms_to_us() {
262 let dir = tempfile::tempdir().unwrap();
263 let db_path = dir.path().join("test.db");
264
265 let conn = open_connection(&db_path).unwrap();
266
267 conn.execute_batch(schema::V1_SCHEMA).unwrap();
268 conn.execute_batch(schema::V1_META_SEED).unwrap();
269 conn.execute_batch(schema::V2_SCHEMA).unwrap();
270 conn.execute_batch(schema::V3_SCHEMA).unwrap();
271
272 conn.execute(
273 "INSERT INTO requests (request_id, ts, proxy, method, latency_ms, status)
274 VALUES ('r1', 1000, 'api', 'tools/call', 42, 'ok')",
275 [],
276 )
277 .unwrap();
278 conn.execute(
279 "INSERT INTO requests (request_id, ts, proxy, method, latency_ms, status)
280 VALUES ('r2', 2000, 'api', 'tools/call', 1500, 'ok')",
281 [],
282 )
283 .unwrap();
284
285 conn.execute_batch(schema::V4_SCHEMA).unwrap();
286
287 let latency1: i64 = conn
288 .query_row(
289 "SELECT latency_us FROM requests WHERE request_id = 'r1'",
290 [],
291 |row| row.get(0),
292 )
293 .unwrap();
294 assert_eq!(latency1, 42_000);
295
296 let latency2: i64 = conn
297 .query_row(
298 "SELECT latency_us FROM requests WHERE request_id = 'r2'",
299 [],
300 |row| row.get(0),
301 )
302 .unwrap();
303 assert_eq!(latency2, 1_500_000);
304 }
305
306 #[test]
307 fn run_migrations__v4_rebuilds_slow_index() {
308 let dir = tempfile::tempdir().unwrap();
309 let db_path = dir.path().join("test.db");
310
311 let conn = open_connection(&db_path).unwrap();
312 run_migrations(&conn, "test").unwrap();
313
314 let idx_sql: String = conn
315 .query_row(
316 "SELECT sql FROM sqlite_master WHERE type = 'index' AND name = 'idx_requests_slow'",
317 [],
318 |row| row.get(0),
319 )
320 .unwrap();
321 assert!(idx_sql.contains("latency_us"));
322 }
323
324 #[test]
325 fn run_migrations__v3_defaults_proxy() {
326 let dir = tempfile::tempdir().unwrap();
327 let db_path = dir.path().join("test.db");
328
329 let conn = open_connection(&db_path).unwrap();
330
331 conn.execute_batch(super::schema::V1_SCHEMA).unwrap();
332 conn.execute_batch(super::schema::V1_META_SEED).unwrap();
333 conn.execute_batch(super::schema::V2_SCHEMA).unwrap();
334
335 conn.execute(
336 "INSERT INTO server_schema (upstream_url, method, payload, captured_at, schema_hash)
337 VALUES ('http://localhost:9000', 'tools/list', '{}', 1000, 'abc')",
338 [],
339 )
340 .unwrap();
341
342 conn.execute_batch(super::schema::V3_SCHEMA).unwrap();
343
344 let proxy: String = conn
345 .query_row(
346 "SELECT proxy FROM server_schema WHERE upstream_url = 'http://localhost:9000'",
347 [],
348 |row| row.get(0),
349 )
350 .unwrap();
351 assert_eq!(proxy, "default");
352 }
353}